Leader Election in Kubernetes with the Official C# Client
Leader Election is useful in distributed systems as it gives sole responsiblilty to a single node for things which can’t be performed concurrently in a safe manner. This is particularly useful in Kubernetes where we will be running multiple instances of the same app spread across pods. An example would be reserving resources on behalf of the entire deployment.
Official Kubernetes C# LeaderElector
The official Kubernetes C# client has a namespace devoted to this: k8s.LeaderElection and provides a LeaderElector
class which allows a number of event handlers to be registered:
/// <summary>
/// OnStartedLeading is called when a LeaderElector client starts leading
/// </summary>
public event Action OnStartedLeading;
/// <summary>
/// OnStoppedLeading is called when a LeaderElector client stops leading
/// </summary>
public event Action OnStoppedLeading;
/// <summary>
/// OnNewLeader is called when the client observes a leader that is
/// not the previously observed leader. This includes the first observed
/// leader when the client starts.
/// </summary>
public event Action<string> OnNewLeader;
The LeaderElector
uses a mutually exclusive lock on a ConfigMap
to elect the Leader.
The code below shows the infinite loop which attempts to either acquire a lease on the ConfigMap
, or renew it if it already has the lease:
try
{
OnStartedLeading?.Invoke();
// renew loop
for (; ; )
{
cancellationToken.ThrowIfCancellationRequested();
var acq = Task.Run(async () =>
{
try
{
while (!await TryAcquireOrRenew(cancellationToken).ConfigureAwait(false))
{
await Task.Delay(config.RetryPeriod, cancellationToken).ConfigureAwait(false);
MaybeReportTransition();
}
}
catch
{
// ignore
return false;
}
return true;
});
if (await Task.WhenAny(acq, Task.Delay(config.RenewDeadline, cancellationToken))
.ConfigureAwait(false) == acq)
{
var succ = await acq.ConfigureAwait(false);
if (succ)
{
await Task.Delay(config.RetryPeriod, cancellationToken).ConfigureAwait(false);
// retry
continue;
}
// renew failed
}
// timeout
break;
}
}
finally
{
OnStoppedLeading?.Invoke();
}
Leasing ConfigMaps
The TryAcquireOrRenew()
method attempts to obtain a short term lease on a ConfigMap
. You specify the name of this ConfigMap
when configuring the LeaderElector
.
If the current leader is able to renew then there will be no need for another leader to take over, but if it isn’t able to then a new Leader will be promoted, and the relevant event handlers will be called.
Kubernetes role
As your deployment in Kubernetes will be modifying a ConfigMap
, it will require a serviceAccount with the permissions to do that:
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: { { include "app.serviceAccountName" . } }
rules:
- apiGroups: [""] # "" indicates the core API group
resources: ["configmaps"] # This rule is required for creating a Leader Election ConfigMap lock
verbs: ["get", "create", "update", "patch"]
My ILeaderElectionService and its implementation
I wanted a very simple ILeaderElectionService
which would be started by a BackgroundService
and provide a boolean IsLeader
property. The IsLeader
property could then be called by classes with processes that should only be run on the Leader instance.
The class diagram below illustrates this:
--- title: Leader Election Service --- classDiagram direction RL class ILeaderElectionService{ bool: +IsLeader Task: +ExecuteAsync(CancellationToken stoppingToken) } class LeaderElectionService{ bool: +IsLeader Task: +ExecuteAsync(CancellationToken stoppingToken) -HandleStartedLeadingEvent() -HandleStoppedLeadingEvent() -HandleNewLeaderEvent(String newLeader) } class LeaderElectionWorker{ Task: #ExecuteAsync(CancellationToken stoppingToken) } class ResourceReservationWorker{ Task: #ExecuteAsync(CancellationToken stoppingToken) Task: -ReserveResourceAsync(CancellationToken stoppingToken) } LeaderElectionService <|-- ILeaderElectionService LeaderElectionWorker ..> ILeaderElectionService ResourceReservationWorker ..> ILeaderElectionService
The LeaderElectionService
public class LeaderElectionService : ILeaderElectionService
{
private readonly IKubernetes _kubernetesClient;
private readonly IOptionsMonitor<KubernetesSettings> _settingsMonitor;
private readonly IEnvironment _environment;
private readonly ILogger<LeaderElectionService> _logger;
private string? _hostname;
private LeaderElector? _leaderElector;
public bool IsLeader => _leaderElector?.IsLeader() ?? false;
public LeaderElectionService(IKubernetes kubernetesClient, IOptionsMonitor<KubernetesSettings> settingsMonitor, IEnvironment environment, ILogger<LeaderElectionService> logger)
{
_kubernetesClient = kubernetesClient;
_settingsMonitor = settingsMonitor;
_environment = environment;
_logger = logger;
}
public async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Executing");
var kubernetesSettings = _settingsMonitor.CurrentValue;
_hostname = _environment.HostName;
var configLock = new ConfigMapLock(_kubernetesClient, kubernetesSettings.Namespace, leaderElectionSettings.ConfigMapName, _hostname);
var leaderElectionConfig = new LeaderElectionConfig(configLock)
{
LeaseDuration = leaderElectionSettings.LeaseDuration,
RetryPeriod = leaderElectionSettings.RetryPeriod,
RenewDeadline = leaderElectionSettings.RenewDeadline
};
_leaderElector = new LeaderElector(leaderElectionConfig);
_leaderElector.OnStartedLeading += HandleStartedLeadingEvent;
_leaderElector.OnStoppedLeading += HandleStoppedLeadingEvent;
_leaderElector.OnNewLeader += HandleNewLeaderEvent;
while (!stoppingToken.IsCancellationRequested)
{
try
{
_logger.LogTrace("Running LeaderElector");
await _leaderElector.RunAsync(stoppingToken);
}
catch (OperationCanceledException)
{
_logger.LogError("LeaderElector.RunAsync cancelled");
}
catch (Exception ex)
{
_logger.LogError(ex, "LeaderElector.RunAsync failed");
}
_logger.LogTrace("Finished loop of LeaderElector");
}
_logger.LogInformation("Stopped");
}
}
Some extra methods to log the state changes:
private void HandleStartedLeadingEvent()
{
_logger.LogInformation("Started leading");
}
private void HandleStoppedLeadingEvent()
{
_logger.LogInformation("Stopped leading");
}
private void HandleNewLeaderEvent(string newLeader)
{
_logger.LogInformation("New leader: {NewLeader}", newLeader);
}
And a Dispose()
method:
public void Dispose()
{
_kubernetesClient.Dispose();
if (_leaderElector == null) return;
_leaderElector.OnStartedLeading -= HandleStartedLeadingEvent;
_leaderElector.OnStoppedLeading -= HandleStoppedLeadingEvent;
_leaderElector.OnNewLeader -= HandleNewLeaderEvent;
_leaderElector.Dispose();
}
Using the ILeaderElectionService
Classes that require functionality to be restricted only to Leaders can then inject the ILeaderElectionService
and regularly check its IsLeader
method to know if they are safe to execute:
public class ResourceReservationWorker : BackgroundService
{
private readonly ILeaderElectionService _leaderElectionService;
private readonly ILogger<ResourceReservationWorker> _logger;
public ResourceReservationService(ILeaderElectionService leaderElectionService, ILogger<ResourceReservationWorker> logger)
{
_leaderElectionService = leaderElectionService;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Start...");
while (!cancellationToken.IsCancellationRequested)
{
if (_leaderElectionService.IsLeader)
{
await ReserveResourcesAsync(cancellationToken);
}
await Task.Delay(TimeSpan.FromMilliseconds(500), cancellationToken); // Only hardcoded for this simple example
}
if (cancellationToken.IsCancellationRequested)
{
_logger.LogInformation("CancellationRequested");
}
_logger.LogInformation("End");
}
private Task ReserveResourcesAsync(CancellationToken ct)
{
// ...
}
}
The LeaderElectionWorker
Finally, to call the _leaderElectionService.ExecuteAsync()
method at startup, we need a LeaderElectionWorker:
public class LeaderElectionWorker : BackgroundService
{
private readonly ILeaderElectionService _leaderElectionService;
private readonly ILogger<LeaderElectionWorker> _logger;
public LeaderElectionWorker(ILeaderElectionService leaderElectionService, ILogger<LeaderElectionWorker> logger)
{
_leaderElectionService = leaderElectionService;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Starting LeaderElectionService");
while (!stoppingToken.IsCancellationRequested)
{
try
{
await _leaderElectionService.ExecuteAsync(stoppingToken);
}
catch (Exception e)
{
_logger.LogError(e, "Exception thrown by ExecuteAsync");
}
}
_logger.LogInformation("Stopped LeaderElectionService");
}
}
The ChatGPT implementation
As an interesting aside, I asked ChatGPT how to implement this, and it suggested I use Microsoft.Extensions.Hosting.Kubernetes
client library that looks great, but doesn’t actually exist:
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Hosting.Kubernetes;
namespace MyApp
{
public class Program
{
public static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
}
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.UseKubernetes()
.ConfigureLogging(logging =>
{
logging.AddConsole();
logging.AddDebug();
})
.ConfigureServices((context, services) =>
{
services.AddLeaderElection(context.Configuration);
services.AddHostedService<Worker>();
});
}
public class Worker : BackgroundService
{
private readonly ILogger<Worker> _logger;
private readonly LeaderElection _leaderElection;
public Worker(ILogger<Worker> logger, LeaderElection leaderElection)
{
_logger = logger;
_leaderElection = leaderElection;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
// Check if this instance is the leader.
if (_leaderElection.IsLeader)
{
// Perform leader tasks.
}
else
{
// Perform follower tasks.
}
await Task.Delay(TimeSpan.FromSeconds(30), stoppingToken);
}
}
}
I initially thought I had been pretty dim in not using a standard Microsoft K8s library. I did explain to ChatGPT that the library didn’t exist and it accepted the mistake: