Leader election in Go using the Consul KV: Part 1

In this post we are going to look at how to implement leader election in Go using the Consul KV. If you are unfamiliar with leader election or distributed consensus I can recommending checking out the following video from Jeffrey Richter What is leader election?.

This topic is for educational purposes and is not a guideline or reference implementation for how to implement a leader election algorithm suitable for use in the real world, please use github.com/hashicorp/consul/api#Lock instead.

Declaration

None of the code in the examples below has been tested or is safe for use outside this website. Use other than for educational purposes is forbidden and use of any of the code below is at own risk. Please use github.com/hashicorp/consul/api#Lock instead.

What is Consul?

Consul is a distributed, HA, data-center aware control plane to configure and coordinate services across infrastructure. It has a number of features including service discovery, health checking and a KV. It is the KV that we will be using to implement leader election in Go.

What is the Consul KV?

The Consul KV is the key/value store built into Consul. It allows clients to store data such as configuration, metadata, and acquire and release distributed locks. To make this possible the Consul KV provides a number of features over the expected GET, PUT and DELETE operations including blocking queries, CAS (Check and set) operations and transactions.

How can we use the Consul KV to implement leader election?

The Consul KV supports all the building blocks we need to implement lease-based leader election. If lease-based leader election sounds unfamiliar then please revisit the video of Jeffrey Richter explaining leader election from earlier.

To implement lease-based leader election we need two lower level primitives: sessions and CAS (Check-and-set).

Sessions

In Consul sessions are contracts between a client and Consul. Each session has a name, a list of health checks, a behavior should the session be invalidated, a TTL (Time-to-live) and a lock-delay. A session can be invalidated for the following reasons:

When a session is invalidated before the process has released the lock (i.e. due to one or more critical health checks) the lock is prevented from being acquired by other processes for the duration of the lock delay. The purpose of this delay is to allow the current holder of the lock to detect that their session has been invalidated and stop all work before the lock is re-acquired by another process.

It's not hard to see that managing the lifetime of sessions is naunced and requires careful consideration of a lot of edge cases. We need some kind of mechanism to create sessions and keep them alive, renewing them at least once per TTL. We also need to detect when a session has been invalidated and create a new session so the same process can attempt to re-aquire the lock again in the future.

Session Keeper

Let's encapsulate this behavior with a session keeper. The session keeper will be responsible for creating a session or returning an existing session, sending interrupts when the session has been closed or invalidated and closing the session on request. It must also be safe for concurrent use.

We will need the following packages so let's import those first:

import (
    "fmt"
    "log"
    "sync"
    "time"

    "github.com/hashicorp/consul/api"
)

Next let's create our SessionKeeper struct. This struct has the following fields:

// SessionKeeper creates, rewews and invalidates sessions in Consul.
// Each instance of a session keeper can create, renew and invalidate
// at most one session at a time. It is safe for concurrent use.
type SessionKeeper struct {
    client    *api.Session
    done      chan struct{}
    mu        sync.Mutex
    sessionID string
    stop      chan struct{}
}

func NewSessionKeeper(client *api.Session) *SessionKeeper {
    return &SessionKeeper{
        client: client,
    }
}

Let's think about how we can create a session. There are two cases we need to consider:

We know that there is no active session when the Session ID is "" so let's write an if condition that checks this. However, we need to first acquire a mutex to ensure that concurrent calls to Create do not result in duplicate sessions or an inconsistent state.

// Create creates a new session or re-uses an existing session. It returns
// the session ID and a chan that is closed when the session is invalidated.
func (s *SessionKeeper) Create() (string, <-chan struct{}, error) {
    defer s.mu.Unlock()
    s.mu.Lock()

    // there is no session when s.sessionID is ""
    if s.sessionID == "" {
    }
    return s.sessionID, s.done, nil
}

In the case where there is no active session we must create a new session. We can fail to create a session for a number of reasons (i.e. network failure). However should this happen we can return the error to the caller who can decide whether to make another attempt to create a session or do something else.

In this example we are using the values of 15 seconds for the lock-delay and 30 seconds for the TTL.

// Create creates a new session or re-uses an existing session. It returns
// the session ID and a chan that is closed when the session is invalidated.
func (s *SessionKeeper) Create() (string, <-chan struct{}, error) {
    defer s.mu.Unlock()
    s.mu.Lock()

    // there is no session when s.sessionID is ""
    if s.sessionID == "" {
        sessionID, _, err := s.client.Create(
            &api.SessionEntry{
                LockDelay: 15 * time.Second,
                TTL:       "30s",
            }, nil)
        if err != nil {
            return "", nil, fmt.Errorf("failed to create session: %w", err)
        }
    }
    return s.sessionID, s.done, nil
}

Once a session has been created we need to keep track of it. To do this we create a new blocking channel called done, a blocking channel called stop, and set the Session ID in the struct. The done channel and Session ID are then returned to the caller and all future callers of GetSessionID until the session is closed or invalidated.

if s.sessionID == "" {
    sessionID, _, err := s.client.Create(
        &api.SessionEntry{
            LockDelay: 15 * time.Second,
            TTL:       "30s",
        }, nil)
    if err != nil {
        return "", nil, fmt.Errorf("failed to create session: %w", err)
    }
    s.done = make(chan struct{})
    s.stop = make(chan struct{})
    s.sessionID = sessionID
}

Last but not least we need to make sure that we renew the session at least once per TTL to prevent it from being invalidated. To do this we can spawn a goroutine that calls RenewPeriodic with the initial TTL, the Session ID, and our blocking channel called stop. The stop channel is important as when someone calls Close we want RenewPeriodic to destroy the session and stop attempting to renew a now invalidated session.

The RenewPeriodic function renews the session once per TTL / 2 seconds. It stops and returns an error when any one of the following conditions are satisifed:

It stops and returns nil when any one of these conditions are satisifed:

In this example we are using a done channel, but you could also use a context, or both.

go func(sessionID string, stop chan<- struct{}) {
    if err := s.client.RenewPeriodic(
        "30s",
        sessionID,
        nil,
        s.stop); err != nil {
            // do something with the error
    }
}(s.sessionID, s.stop)

Let's take a second to think about what we want to do for each of these cases.

To achieve this we add a defered call to a reset function that is invoked as soon as the goroutine exits.

go func(sessionID string, stop chan<- struct{}) {
    defer s.reset()
    if err := s.client.RenewPeriodic(
        "30s",
        sessionID,
        nil,
        s.stop); err != nil {
        log.Printf("failed to renew session: %s", err)
    }
}(s.sessionID, s.stop)

The reset function is quite simple in that it must acquire a mutex to prevent race conditions with other functions such as Create or GetSessionID and then resets the state such that we can acquire a new session. That means closing the done channel, resetting the Session ID and setting both channels back to nil.

func (s *SessionKeeper) reset() {
    defer s.mu.Unlock()
    s.mu.Lock()
    close(s.done)
    s.done = nil
    s.stop = nil
    s.sessionID = ""
}

Here is the complete code for creating a session.

// Create creates a new session or re-uses an existing session. It returns
// the session ID and a chan that is closed when the session is invalidated.
func (s *SessionKeeper) Create() (string, <-chan struct{}, error) {
    defer s.mu.Unlock()
    s.mu.Lock()

    // there is no session when s.sessionID is ""
    if s.sessionID == "" {
        sessionID, _, err := s.client.Create(
            &api.SessionEntry{
                LockDelay: 15 * time.Second,
                TTL:       "30s",
            }, nil)
        if err != nil {
            return "", nil, fmt.Errorf("failed to create session: %w", err)
        }
        s.done = make(chan struct{})
        s.stop = make(chan struct{})
        s.sessionID = sessionID
        // keep the session alive until it is closed or the session has
        // been invalidated despite our best efforts
        go func(sessionID string, stop chan<- struct{}) {
            defer s.reset()
            if err := s.client.RenewPeriodic(
                "30s",
                sessionID,
                nil,
                s.stop); err != nil {
                log.Printf("failed to renew session: %s", err)
            }
        }(s.sessionID, s.stop)
    }
    return s.sessionID, s.done, nil
}

Let's look at how we can close a session. To close a session all we have to do is close the blocking channel called stop.

// Close closes the session.
func (s *SessionKeeper) Close() error {
    defer s.mu.Unlock()
    s.mu.Lock()
    if s.stop != nil {
        close(s.stop)
    }
    return nil
}

This function is not re-rentrant because it is an error to close a channel more than once. However it can be made re-entrant by checking a boolean that is set to true on close. The examples do not do this in an attempt to keep them as simple as possible.

// Close closes the session.
func (s *SessionKeeper) Close() {
    defer s.mu.Unlock()
    s.mu.Lock()
    if !s.isStopped {
        close(s.stop)
        s.isStopped = true
    }
}

You might have noticed that in Close we close the blocking channel called stop but we leave the Session ID and done channel. How come?

The issue here is that we also need to think about how we keep the existing session alive via RenewPeriodic. If we reset the Session ID and close the done channel in Close then it becomes possible to create a new session before the goroutine that renews the session has shutdown and reset the state. This could create a race condition where the process that keeps the session renewed has detected that the previous session is invalidated, resets the Session ID and closes the done channel thinking we are still in the previous session when in fact we are now in a new session. The reason the done channel is closed in reset and not in Close is because the session is not closed until it is closed by RenewPeriodic and a new session cannot be created until the state has been reset.

Last it would also be convenient if a client of the session keeper could discover if a session is active and if so re-use the existing session. Let's create a function called GetSessionID that returns the Session ID and the blocking channel that is closed when the session is invalidated. If no session is active then it returns "" and a nil channel.

// GetSessionID returns the session ID and a chan that is closed when the
// session is invalidated.
func (s *SessionKeeper) GetSessionID() (string, <-chan struct{}) {
    defer s.mu.Unlock()
    s.mu.Lock()
    return s.sessionID, s.done
}

In Part 2 we will look at how we can acquire and release distributed locks to elect a leader in a round of leader election.