Leader election in Go using the Consul KV: Part 2

In Part 1 we look at Consul, the Consul KV, and how to create, renew and track sessions in Consul with an encapsulation that we created called the session keeper. In Part 2 we are going to use sessions to acquire and release exclusive locks on KVs in order to elect a leader in a round of leader election.

This topic is for educational purposes and is not a guideline or reference implementation for how to implement a leader election algorithm suitable for use in the real world, please use github.com/hashicorp/consul/api#Lock instead.

Declaration

None of the code in the examples below has been tested or is safe for use outside this website. Use other than for educational purposes is forbidden and use of any of the code below is at own risk. Please use github.com/hashicorp/consul/api#Lock instead.

Exclusive locks

Locks are a synchronization primitive that serialize access to some shared resource. There are two kinds of lock: a shared lock and an exclusive lock. A shared lock is a lock that can be acquired by multiple processes at the same time and is often used when multiple processes want to read a shared resource without mutating it. In order to mutate a shared resource we need an exclusive lock.

An exclusive lock is a lock in which at most one process at a time can mutate a shared resource. Without an exclusive lock the operations of multiple concurrent processes can interleave such that we end up with an inconsistent state or undefined behavior.

We can use the acquire and release operations in Consul to acquire an exclusive lock on a key in the KV. When multiple processes attempt to hold an exclusive lock on a key at most one process can hold the lock at any one time and we call the process that has acquired the lock the leader. The leader can forfeit its position as leader by releasing the lock or it be can expelled by having its session invalidated.

Let's now start to write some code that can acquire and release a lock in the Consul KV. We will create a struct called Lock that is responsible for acquiring and releasing a distributed lock on a key called "lock" in the Consul KV.

// Lock is responsible for acquiring and releasing a single lock.
type Lock struct {
    kv      *api.KV
    session *SessionKeeper
}

func NewLock(kv *api.KV, session *SessionKeeper) *Lock {
    return &Lock{
        kv:      kv,
        session: session,
    }
}

The first function we will write is the Acquire function to acquire the lock. It should return true and a blocking channel if the lock is acquired, false and a nil channel if the lock is held, or an error if something unexpected happened. The blocking channel is closed when the lock is released either because the process has forfeit its position as leader or its session has been invalidated.

// Acquire attempts to acquire the lock. It returns true and a chan that is
// closed when the lock is either lost or released, false if the lock could
// not be acquired, or an error.
func (l *Lock) Acquire(ctx context.Context) (bool, <-chan struct{}, error) {
    return false, nil, nil
}

Recall that we cannot acquire a lock without a session, so we must create a session or return the existing session from the session keeper.

// Acquire attempts to acquire the lock. It returns true and a chan that is
// closed when the lock is either lost or released, false if the lock could
// not be acquired, or an error.
func (l *Lock) Acquire() (bool, <-chan struct{}, error) {
    sessionID, _, err := l.session.Create()
    if err != nil {
        return false, nil, err
    }
}

Next we can attempt to acquire the lock using the Session ID. The request to acquire the lock can fail if the session is invalidated between the time it is retrieved from the session keeper and the time it is used to acquire the lock. It can also fail due to an unexpected error such as a network failure. In either case the caller should attempt to re-acquire the lock after some timeout.

sessionID, _, err := l.session.Create()
if err != nil {
    return false, nil, err
}
acquired, _, err := l.kv.Acquire(&api.KVPair{
    Key:     "lock",
    Session: sessionID,
}, nil)
if err != nil {
    return false, nil, fmt.Errorf("failed to acquire lock: %w", err)
}

If the lock is acquired we need to create a blocking channel that is closed when the process forfeits its position as leader or its session is invalidated. We use a goroutine to call a function closeOnRelease that will close the channel when it detects the lock has been released or lost.

acquired, _, err := l.kv.Acquire(&api.KVPair{
    Key:     "lock",
    Session: sessionID,
}, nil)
if err != nil {
    return false, nil, fmt.Errorf("failed to acquire lock: %w", err)
}
if !acquired {
    return false, nil, nil
}
done := make(chan struct{})
go l.closeOnRelease(sessionID, done)
return true, done, nil

Here is the complete Acquire function:

// Acquire attempts to acquire the lock. It returns true and a chan that is
// closed when the lock is either lost or released, false if the lock could
// not be acquired, or an error.
func (l *Lock) Acquire() (bool, <-chan struct{}, error) {
    sessionID, _, err := l.session.Create()
    if err != nil {
        return false, nil, err
    }
    acquired, _, err := l.kv.Acquire(&api.KVPair{
        Key:     "lock",
        Session: sessionID,
    }, nil)
    if err != nil {
        return false, nil, fmt.Errorf("failed to acquire lock: %w", err)
    }
    if !acquired {
        return false, nil, nil
    }
    done := make(chan struct{})
    go l.waitForRelease(sessionID, done)
    return true, done, nil
}

The closeOnRelease function takes the Session ID and the blocking channel to close. It starts a for loop and runs blocking queries against the key that return whenever there is a change to the status of the lock. If the Session ID in the metadata no longer matches the Session ID that was used to acquire the lock we know that the lock has either been released or lost and that the done channel can be closed.

func (l *Lock) closeOnRelease(sessionID string, done chan struct{}) {
    var waitIndex uint64
    for {
        kv, _, err := l.kv.Get("lock", &api.QueryOptions{
            WaitIndex: waitIndex,
        })
        if err != nil {
            log.Printf("failed to check if lock is still acquired: %s, waiting 1 second(s) before checking again", err)
            <-time.After(time.Second)
        } else {
            if sessionID != kv.Session {
                close(done)
                return
            }
            waitIndex = kv.ModifyIndex
        }
    }
}

Last we need to implement the Release function. This function attempts to release a held lock. It returns true if the lock was released, false if the lock was not released, or an error.

// Release attempts to release the lock. It returns true if the lock was
// released, false if the lock was not released, or an error.
func (l *Lock) Release() (bool, error) {
}

First we need to get the Session ID of the current session. If there is no active session then we can assume that either no lock was acquired or the lock was acquired but the session has been invalidated in which case the lock will be released at the end of the session TTL.

// Release attempts to release the lock. It returns true if the lock was
// released, false if the lock was not released, or an error.
func (l *Lock) Release() (bool, error) {
    sessionID, _ := l.session.GetSessionID()
    if sessionID == "" {
        return false, nil
    }
}

If a session is active we can attempt to release the lock using the Session ID. Like when acquiring the lock the request to release the lock can fail if the session is invalidated between the time it is retrieved from the session keeper and the time it is used to release the lock. It can also fail due to an unexpected error such as a network failure. In either case the caller should attempt to re-release the lock after some timeout.

sessionID, _ := l.session.GetSessionID()
if sessionID == "" {
    return false, nil
}
released, _, err := l.kv.Release(&api.KVPair{
    Key:     "lock",
    Session: sessionID,
}, nil)
if err != nil {
    return false, fmt.Errorf("failed to release lock: %w", err)
}
return released, nil
}

Here is the complete Release function:

// Release attempts to release the lock. It returns true if the lock was
// released, false if the lock was not released, or an error.
func (l *Lock) Release() (bool, error) {
    sessionID, _ := l.session.GetSessionID()
    if sessionID == "" {
        return false, nil
    }
    released, _, err := l.kv.Release(&api.KVPair{
        Key:     "lock",
        Session: sessionID,
    }, nil)
    if err != nil {
        return false, fmt.Errorf("failed to release lock: %w", err)
    }
    return released, nil
}

A simple leader election example

func main() {
    client, err := api.NewClient(api.DefaultConfig())
    if err != nil {
        log.Fatal(err)
    }
    lock := NewLock(client.KV(), NewSessionKeeper(client.Session()))
    for {
        acquired, done, err := lock.Acquire()
        if err != nil {
            log.Printf("failed to attempt lock acquisition: %w", err)
        } else if acquired {
            doLeaderWork(done)
            released, err := lock.Release()
            if err != nil {
                log.Printf("failed to release lock: %w", err)
            } else if !released {
                log.Println("lock was not released")
            }
        }
        <-time.After(5 * time.Second)
    }
}