{ martowen.com }

Leader Election in Kubernetes with the Official C# Client

Jul 22, 2022
5 minutes

Leader Election is useful in distributed systems as it gives sole responsiblilty to a single node for things which can’t be performed concurrently in a safe manner. This is particularly useful in Kubernetes where we will be running multiple instances of the same app spread across pods. An example would be reserving resources on behalf of the entire deployment.

Official Kubernetes C# LeaderElector

The official Kubernetes C# client has a namespace devoted to this: k8s.LeaderElection and provides a LeaderElector class which allows a number of event handlers to be registered:

/// <summary>
/// OnStartedLeading is called when a LeaderElector client starts leading
/// </summary>
public event Action OnStartedLeading;

/// <summary>
/// OnStoppedLeading is called when a LeaderElector client stops leading
/// </summary>
public event Action OnStoppedLeading;

/// <summary>
/// OnNewLeader is called when the client observes a leader that is
/// not the previously observed leader. This includes the first observed
/// leader when the client starts.
/// </summary>
public event Action<string> OnNewLeader;

The LeaderElector uses a mutually exclusive lock on a ConfigMap to elect the Leader.

The code below shows the infinite loop which attempts to either acquire a lease on the ConfigMap, or renew it if it already has the lease:

try
{
OnStartedLeading?.Invoke();

// renew loop
for (; ; )
{
cancellationToken.ThrowIfCancellationRequested();
var acq = Task.Run(async () =>
{
try
{
while (!await TryAcquireOrRenew(cancellationToken).ConfigureAwait(false))
{
await Task.Delay(config.RetryPeriod, cancellationToken).ConfigureAwait(false);
MaybeReportTransition();
}
}
catch
{
// ignore
return false;
}

return true;
});


if (await Task.WhenAny(acq, Task.Delay(config.RenewDeadline, cancellationToken))
.ConfigureAwait(false) == acq)
{
var succ = await acq.ConfigureAwait(false);

if (succ)
{
await Task.Delay(config.RetryPeriod, cancellationToken).ConfigureAwait(false);
// retry
continue;
}

// renew failed
}

// timeout
break;
}
}
finally
{
OnStoppedLeading?.Invoke();
}

Leasing ConfigMaps

The TryAcquireOrRenew() method attempts to obtain a short term lease on a ConfigMap. You specify the name of this ConfigMap when configuring the LeaderElector.

If the current leader is able to renew then there will be no need for another leader to take over, but if it isn’t able to then a new Leader will be promoted, and the relevant event handlers will be called.

Kubernetes role

As your deployment in Kubernetes will be modifying a ConfigMap, it will require a serviceAccount with the permissions to do that:

kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: { { include "app.serviceAccountName" . } }
rules:
- apiGroups: [""] # "" indicates the core API group
resources: ["configmaps"] # This rule is required for creating a Leader Election ConfigMap lock
verbs: ["get", "create", "update", "patch"]

My ILeaderElectionService and its implementation

I wanted a very simple ILeaderElectionService which would be started by a BackgroundService and provide a boolean IsLeader property. The IsLeader property could then be called by classes with processes that should only be run on the Leader instance.

The class diagram below illustrates this:

---
title: Leader Election Service
---
classDiagram
direction RL
class ILeaderElectionService{
  bool: +IsLeader

  Task: +ExecuteAsync(CancellationToken stoppingToken)
}

class LeaderElectionService{
  bool: +IsLeader

  Task: +ExecuteAsync(CancellationToken stoppingToken)
  -HandleStartedLeadingEvent()
  -HandleStoppedLeadingEvent()
  -HandleNewLeaderEvent(String newLeader)
}

class LeaderElectionWorker{
  Task: #ExecuteAsync(CancellationToken stoppingToken)
}

class ResourceReservationWorker{
  Task: #ExecuteAsync(CancellationToken stoppingToken)
  Task: -ReserveResourceAsync(CancellationToken stoppingToken)
}

LeaderElectionService <|-- ILeaderElectionService
LeaderElectionWorker ..> ILeaderElectionService
ResourceReservationWorker ..> ILeaderElectionService

The LeaderElectionService

public class LeaderElectionService : ILeaderElectionService
{
private readonly IKubernetes _kubernetesClient;
private readonly IOptionsMonitor<KubernetesSettings> _settingsMonitor;
private readonly IEnvironment _environment;
private readonly ILogger<LeaderElectionService> _logger;

private string? _hostname;
private LeaderElector? _leaderElector;

public bool IsLeader => _leaderElector?.IsLeader() ?? false;

public LeaderElectionService(IKubernetes kubernetesClient, IOptionsMonitor<KubernetesSettings> settingsMonitor, IEnvironment environment, ILogger<LeaderElectionService> logger)
{
_kubernetesClient = kubernetesClient;
_settingsMonitor = settingsMonitor;
_environment = environment;
_logger = logger;
}

public async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Executing");

var kubernetesSettings = _settingsMonitor.CurrentValue;

_hostname = _environment.HostName;

var configLock = new ConfigMapLock(_kubernetesClient, kubernetesSettings.Namespace, leaderElectionSettings.ConfigMapName, _hostname);

var leaderElectionConfig = new LeaderElectionConfig(configLock)
{
LeaseDuration = leaderElectionSettings.LeaseDuration,
RetryPeriod = leaderElectionSettings.RetryPeriod,
RenewDeadline = leaderElectionSettings.RenewDeadline
};

_leaderElector = new LeaderElector(leaderElectionConfig);

_leaderElector.OnStartedLeading += HandleStartedLeadingEvent;
_leaderElector.OnStoppedLeading += HandleStoppedLeadingEvent;
_leaderElector.OnNewLeader += HandleNewLeaderEvent;

while (!stoppingToken.IsCancellationRequested)
{
try
{
_logger.LogTrace("Running LeaderElector");
await _leaderElector.RunAsync(stoppingToken);
}
catch (OperationCanceledException)
{
_logger.LogError("LeaderElector.RunAsync cancelled");
}
catch (Exception ex)
{
_logger.LogError(ex, "LeaderElector.RunAsync failed");
}
_logger.LogTrace("Finished loop of LeaderElector");
}

_logger.LogInformation("Stopped");
}
}

Some extra methods to log the state changes:

    private void HandleStartedLeadingEvent()
{
_logger.LogInformation("Started leading");
}

private void HandleStoppedLeadingEvent()
{
_logger.LogInformation("Stopped leading");
}

private void HandleNewLeaderEvent(string newLeader)
{
_logger.LogInformation("New leader: {NewLeader}", newLeader);
}

And a Dispose() method:

    public void Dispose()
{
_kubernetesClient.Dispose();

if (_leaderElector == null) return;

_leaderElector.OnStartedLeading -= HandleStartedLeadingEvent;
_leaderElector.OnStoppedLeading -= HandleStoppedLeadingEvent;
_leaderElector.OnNewLeader -= HandleNewLeaderEvent;
_leaderElector.Dispose();
}

Using the ILeaderElectionService

Classes that require functionality to be restricted only to Leaders can then inject the ILeaderElectionService and regularly check its IsLeader method to know if they are safe to execute:

public class ResourceReservationWorker : BackgroundService
{
private readonly ILeaderElectionService _leaderElectionService;
private readonly ILogger<ResourceReservationWorker> _logger;

public ResourceReservationService(ILeaderElectionService leaderElectionService, ILogger<ResourceReservationWorker> logger)
{
_leaderElectionService = leaderElectionService;
_logger = logger;
}

protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Start...");

while (!cancellationToken.IsCancellationRequested)
{
if (_leaderElectionService.IsLeader)
{
await ReserveResourcesAsync(cancellationToken);
}

await Task.Delay(TimeSpan.FromMilliseconds(500), cancellationToken); // Only hardcoded for this simple example
}

if (cancellationToken.IsCancellationRequested)
{
_logger.LogInformation("CancellationRequested");
}

_logger.LogInformation("End");
}

private Task ReserveResourcesAsync(CancellationToken ct)
{
// ...
}
}

The LeaderElectionWorker

Finally, to call the _leaderElectionService.ExecuteAsync() method at startup, we need a LeaderElectionWorker:

public class LeaderElectionWorker : BackgroundService
{
private readonly ILeaderElectionService _leaderElectionService;
private readonly ILogger<LeaderElectionWorker> _logger;

public LeaderElectionWorker(ILeaderElectionService leaderElectionService, ILogger<LeaderElectionWorker> logger)
{
_leaderElectionService = leaderElectionService;
_logger = logger;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Starting LeaderElectionService");

while (!stoppingToken.IsCancellationRequested)
{
try
{
await _leaderElectionService.ExecuteAsync(stoppingToken);
}
catch (Exception e)
{
_logger.LogError(e, "Exception thrown by ExecuteAsync");
}
}

_logger.LogInformation("Stopped LeaderElectionService");
}
}

The ChatGPT implementation

As an interesting aside, I asked ChatGPT how to implement this, and it suggested I use Microsoft.Extensions.Hosting.Kubernetes client library that looks great, but doesn’t actually exist:

using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Hosting.Kubernetes;

namespace MyApp
{
public class Program
{
public static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
}

public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.UseKubernetes()
.ConfigureLogging(logging =>
{
logging.AddConsole();
logging.AddDebug();
})
.ConfigureServices((context, services) =>
{
services.AddLeaderElection(context.Configuration);
services.AddHostedService<Worker>();
});
}

public class Worker : BackgroundService
{
private readonly ILogger<Worker> _logger;
private readonly LeaderElection _leaderElection;

public Worker(ILogger<Worker> logger, LeaderElection leaderElection)
{
_logger = logger;
_leaderElection = leaderElection;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);

// Check if this instance is the leader.
if (_leaderElection.IsLeader)
{
// Perform leader tasks.
}
else
{
// Perform follower tasks.
}

await Task.Delay(TimeSpan.FromSeconds(30), stoppingToken);
}
}
}

I initially thought I had been pretty dim in not using a standard Microsoft K8s library. I did explain to ChatGPT that the library didn’t exist and it accepted the mistake:

Correcting ChatGPT's Leader Election implementation