Service Leader Election With .NET and Apache ZooKeeper

Building reliable distributed software solutions is not an easy task. We should expect both software and hardware failures at any given time. In most cases, this results in running multiple copies of our software services at the same time. However, some services are not designed to process data simultaneously on multiple hosted service instances. Implementing service leader election allows us to get the best of both worlds. One of the service instances is assigned to be the leader and process the data while the rest become passive followers. If the leader fails, one of the followers takes its place.

Apache ZooKeeper is an open-source centralized service that enables highly reliable distributed service coordination. It does a wonderful job handling distributed service leader elections.

Code

It only takes a few minutes to set up and test ZooKeeper.

Building ZooKeper Client

To build our ZooKeeper client, we will use ZooKeeperNetEx and ZooKeeperNetEx.Recipes NuGet packages. These libraries follow the official Java ZooKeeper client to the letter. They provide a LeaderElectionSupport recipe, which is exactly what we need. The ConnectionWatcher class is responsible for tracking the connection state. Other ZooKeeper connection states, such as Disconnected or Expired, are absent in the code example. Nevertheless, you may want to consider them when running the ZooKeeper client in a production environment.

public sealed class ZooKeeperClient : IDisposable
{
    private const string RootNode = "/leader-election";

    private readonly string _connectionString;
    private readonly int _sessionTimeout;

    private ZooKeeper? _zookeeper;
    private LeaderElectionSupport? _leaderElection;

    public ZooKeeperClient(string connectionString, int sessionTimeout)
    {
        _connectionString = connectionString;
        _sessionTimeout = sessionTimeout;
    }

    public async Task<bool> CheckLeaderAsync(string hostName)
    {
        if (_leaderElection is null)
        {
            var watcher = new ConnectionWatcher();
            _zookeeper = new ZooKeeper(_connectionString, _sessionTimeout, watcher);

            await watcher.WaitForConnectionAsync();

            if (await _zookeeper.existsAsync(RootNode) is null)
                await _zookeeper.createAsync(RootNode, Array.Empty<byte>(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

            _leaderElection = new LeaderElectionSupport(_zookeeper, RootNode, hostName);
            await _leaderElection.start();
        }

        var leaderHostName = await _leaderElection.getLeaderHostName();
        return leaderHostName == hostName;
    }

    public void Dispose()
    {
        if (_leaderElection is not null)
            _leaderElection.stop().Wait();

        if (_zookeeper is not null)
            _zookeeper.closeAsync().Wait();
    }

    private sealed class ConnectionWatcher : Watcher
    {
        private readonly TaskCompletionSource _tcs = new();
        public Task WaitForConnectionAsync() => _tcs.Task;

        public override Task process(WatchedEvent @event)
        {
            if (@event.getState() is KeeperState.SyncConnected)
                _tcs.TrySetResult();

            return Task.CompletedTask;
        }
    }
}

Building Worker Services

To test our ZooKeeper client, we will create two background worker services.

// Worker.cs of service 1.
public sealed class Worker : BackgroundService
{
    private readonly ILogger<Worker> _logger;
    private readonly ZooKeeperClient _zooKeeperClient;

    public Worker(ILogger<Worker> logger, ZooKeeperClient zooKeeperClient)
    {
        _logger = logger;
        _zooKeeperClient = zooKeeperClient;
    }

    protected override async Task ExecuteAsync(CancellationToken token)
    {
        while (!token.IsCancellationRequested)
        {
            if (await _zooKeeperClient.CheckLeaderAsync("service1"))
                _logger.LogInformation($"Processing... {DateTime.Now}");

            await Task.Delay(1000, token);
        }
    }
}

// Worker.cs of service 2.
public sealed class Worker : BackgroundService
{
    private readonly ILogger<Worker> _logger;
    private readonly ZooKeeperClient _zooKeeperClient;

    public Worker(ILogger<Worker> logger, ZooKeeperClient zooKeeperClient)
    {
        _logger = logger;
        _zooKeeperClient = zooKeeperClient;
    }

    protected override async Task ExecuteAsync(CancellationToken token)
    {
        while (!token.IsCancellationRequested)
        {
            if (await _zooKeeperClient.CheckLeaderAsync("service2"))
                _logger.LogInformation($"Processing... {DateTime.Now}");

            await Task.Delay(1000, token);
        }
    }
}

// Program.cs of both workers.
public sealed class Program
{
    public static void Main(string[] args)
    {
        CreateHostBuilder(args).Build().Run();
    }

    public static IHostBuilder CreateHostBuilder(string[] args) =>
        Host.CreateDefaultBuilder(args)
            .ConfigureServices((hostContext, services) =>
            {
                services.AddSingleton(new ZooKeeperClient(
                    connectionString: "zoo1:2181,zoo2:2181,zoo3:2181",
                    sessionTimeout: 10000));

                services.AddHostedService<Worker>();
            });
}

Setting Up Docker Compose

We are going to use Docker Compose to start ZooKeeper in replicated mode.

version: '3.4'

services:
  service1:
    image: ${DOCKER_REGISTRY-}service1
    build:
      context: .
      dockerfile: Service1/Dockerfile
    depends_on:
      - zoo1
      - zoo2
      - zoo3

  service2:
    image: ${DOCKER_REGISTRY-}service2
    build:
      context: .
      dockerfile: Service2/Dockerfile
    depends_on:
      - zoo1
      - zoo2
      - zoo3

  zoo1:
    image: zookeeper
    restart: always
    hostname: zoo1
    ports:
      - 2181:2181
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181

  zoo2:
    image: zookeeper
    restart: always
    hostname: zoo2
    ports:
      - 2182:2181
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181

  zoo3:
    image: zookeeper
    restart: always
    hostname: zoo3
    ports:
      - 2183:2181
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181

Running The Code

To start our service, we are going to use the following console command:

docker-compose up

The command will install ZooKeeper and start two background worker services on your local Docker platform. Only one of those background services should be processing (logging text). If we were to stop it, the other service would take over and start processing instead.


Thank you for reading. What service leader election tools are you using? I'd love to hear about them in the comments.

Related post