Building reliable distributed software solutions is not an easy task. We should expect both software and hardware failures at any given time. In most cases, this results in running multiple copies of our software services at the same time. However, some services are not designed to process data simultaneously on multiple hosted service instances. Implementing service leader election allows us to get the best of both worlds. One of the service instances is assigned to be the leader and process the data while the rest become passive followers. If the leader fails, one of the followers takes its place.
Apache ZooKeeper is an open-source centralized service that enables highly reliable distributed service coordination. It does a wonderful job handling distributed service leader elections.
Code
It only takes a few minutes to set up and test ZooKeeper.
Building ZooKeper client
To build our ZooKeeper client, we will use ZooKeeperNetEx and ZooKeeperNetEx.Recipes NuGet packages. These libraries follow the official Java ZooKeeper client to the letter. They provide a LeaderElectionSupport recipe, which is exactly what we need. The ConnectionWatcher class is responsible for tracking the connection state. Other ZooKeeper connection states, such as Disconnected or Expired, are absent in the code example. Nevertheless, you may want to consider them when running the ZooKeeper client in a production environment.
public sealed class ZooKeeperClient : IDisposable
{
private const string RootNode = "/leader-election";
private readonly string _connectionString;
private readonly int _sessionTimeout;
private ZooKeeper? _zookeeper;
private LeaderElectionSupport? _leaderElection;
public ZooKeeperClient(string connectionString, int sessionTimeout)
{
_connectionString = connectionString;
_sessionTimeout = sessionTimeout;
}
public async Task<bool> CheckLeaderAsync(string hostName)
{
if (_leaderElection is null)
{
var watcher = new ConnectionWatcher();
_zookeeper = new ZooKeeper(_connectionString, _sessionTimeout, watcher);
await watcher.WaitForConnectionAsync();
if (await _zookeeper.existsAsync(RootNode) is null)
await _zookeeper.createAsync(RootNode, Array.Empty<byte>(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
_leaderElection = new LeaderElectionSupport(_zookeeper, RootNode, hostName);
await _leaderElection.start();
}
var leaderHostName = await _leaderElection.getLeaderHostName();
return leaderHostName == hostName;
}
public void Dispose()
{
if (_leaderElection is not null)
_leaderElection.stop().Wait();
if (_zookeeper is not null)
_zookeeper.closeAsync().Wait();
}
private sealed class ConnectionWatcher : Watcher
{
private readonly TaskCompletionSource _tcs = new();
public Task WaitForConnectionAsync() => _tcs.Task;
public override Task process(WatchedEvent @event)
{
if (@event.getState() is KeeperState.SyncConnected)
_tcs.TrySetResult();
return Task.CompletedTask;
}
}
}
Building worker services
To test our ZooKeeper client, we will create two background worker services.
// Worker.cs of service 1.
public sealed class Worker : BackgroundService
{
private readonly ILogger<Worker> _logger;
private readonly ZooKeeperClient _zooKeeperClient;
public Worker(ILogger<Worker> logger, ZooKeeperClient zooKeeperClient)
{
_logger = logger;
_zooKeeperClient = zooKeeperClient;
}
protected override async Task ExecuteAsync(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
if (await _zooKeeperClient.CheckLeaderAsync("service1"))
_logger.LogInformation($"Processing... {DateTime.Now}");
await Task.Delay(1000, token);
}
}
}
// Worker.cs of service 2.
public sealed class Worker : BackgroundService
{
private readonly ILogger<Worker> _logger;
private readonly ZooKeeperClient _zooKeeperClient;
public Worker(ILogger<Worker> logger, ZooKeeperClient zooKeeperClient)
{
_logger = logger;
_zooKeeperClient = zooKeeperClient;
}
protected override async Task ExecuteAsync(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
if (await _zooKeeperClient.CheckLeaderAsync("service2"))
_logger.LogInformation($"Processing... {DateTime.Now}");
await Task.Delay(1000, token);
}
}
}
// Program.cs of both workers.
public sealed class Program
{
public static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
}
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
services.AddSingleton(new ZooKeeperClient(
connectionString: "zoo1:2181,zoo2:2181,zoo3:2181",
sessionTimeout: 10000));
services.AddHostedService<Worker>();
});
}
Setting up Docker Compose
We are going to use Docker Compose to start ZooKeeper in replicated mode.
version: '3.4'
services:
service1:
image: ${DOCKER_REGISTRY-}service1
build:
context: .
dockerfile: Service1/Dockerfile
depends_on:
- zoo1
- zoo2
- zoo3
service2:
image: ${DOCKER_REGISTRY-}service2
build:
context: .
dockerfile: Service2/Dockerfile
depends_on:
- zoo1
- zoo2
- zoo3
zoo1:
image: zookeeper
restart: always
hostname: zoo1
ports:
- 2181:2181
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181
zoo2:
image: zookeeper
restart: always
hostname: zoo2
ports:
- 2182:2181
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181
zoo3:
image: zookeeper
restart: always
hostname: zoo3
ports:
- 2183:2181
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181
Running the code
To start our service, we are going to use the following console command:
docker-compose up
The command will install ZooKeeper and start two background worker services on your local Docker platform. Only one of those background services should be processing (logging text). If we were to stop it, the other service would take over and start processing instead.
Thank you for reading. What service leader election tools are you using? I'd love to hear about them in the comments.