Distributed lock with Couchbase

A while back I have started to brake down a monolithic process into microservices (I promise to share this adventure in the near future).

As part of the breakdown i had to deal with multiple micro-services, that can be hosted in different server nodes requiring I/O to a shared resource, in this case it was an Elsaticsearch Index.
I found myself looking at at different solutions describing how to use a lock object placed in a distributed cache DB like REDIS or Couchbase but all these were not written elegant enough for me.

So I decided I want a distributed lock that is similar in syntax to the familiar thread lock

using System;
using System.Threading;
using Couchbase.Core;

namespace DisterbutedLock
{
	public class CouchbaseLocker : IDisposable
	{
		private readonly string _LockKey;
		private readonly IBucket _Bucket;

		/// <summary&gt;
		/// Create a disterbuted lock using the provided couchbase bucket
		/// </summary&gt;
		/// <param name="bucket"&gt;Bucket to use for creating lock document</param&gt;
		/// <param name="lockKey"&gt;Lock document Key</param&gt;
		/// <param name="numOfRetries"&gt;Default set to 60 retries</param&gt;
		/// <param name="retryIntervalMillis"&gt;default set to every second</param&gt;
		/// <param name="ttl"&gt;default set to 5 minutes</param&gt;
		public CouchbaseLocker(IBucket bucket, string lockKey , int numOfRetries = 60, int retryIntervalMillis = 1000, uint ttl = 300)
		{
			var lockAttempt = 0;
			_LockKey = lockKey;
			_Bucket = bucket;

			Console.WriteLine($"Acquiring lock on key:[{_LockKey}], in bucket:[{_Bucket.Name}]...");
			var addResult = _Bucket.Insert(_LockKey, DateTime.UtcNow, expiration: ttl);
			while (!addResult.Success &amp;&amp; lockAttempt < numOfRetries)
			{
				addResult = _Bucket.Insert(_LockKey, DateTime.UtcNow, expiration: ttl);
				Console.WriteLine($"Lock attempt [{++lockAttempt}/{numOfRetries}]: Could not acquire lock on key:[{_LockKey}], in bucket:[{bucket.Name}], trying again in:[{retryIntervalMillis}]");
				Thread.Sleep(retryIntervalMillis);
			}

			if (lockAttempt &gt;= numOfRetries)
			{
				throw new Exception($"Could not acquired lock on process with lock key:[{_LockKey}], another process is currently running and wait time has been exceeded.");
			}

			Console.WriteLine($"Acquired lock on key:[{_LockKey}]...");

		}

		public void Dispose()
		{
			var removeResult = _Bucket.Remove(_LockKey);
			if (removeResult.Success)
			{
				Console.WriteLine($"Lock on key was removed:[{_LockKey}]");

			}
			else
			{
				Console.WriteLine($"Could not remove lock on key:[{_LockKey}]");
				// This should not happen but if it does for some odd network reason you might want
				// to add some retries here maybe with your own configuration
			}

		}
	}
}

Let me explain a little what we saw here:
This class implements IDisposable interface which will allow us to use the wonderful syntax of Using (...) { }
The constructor is trying to add a document to a given Couchbase bucket for the duration provided by numOfRetries and retryIntervalMillis. while the lock is not acquired (a document with the given lock key is not added successfully) the constructor will not Finnish which will cause the calling code to wait until such lock can be acquired.
On disposal it will remove this document which will cause any other process requesting to add it to the bucket to succeed and thus being granted into the mutually excluded section of the code.
here is a usage example:

using Couchbase;
using Couchbase.Authentication;
using Couchbase.Configuration.Client;
using System;
using System.Collections.Generic;
using System.Threading;

namespace DisterbutedLock
{
	public class Program
	{
		public static void Main(string[] args)
		{
			var cluster = new Cluster(new ClientConfiguration
			{
				Servers = new List<Uri&gt; { new Uri("http://couchbase1:8091/pools") }
			});

			var bucket = cluster.OpenBucket("statistics");

			using (var locker = new CouchbaseLocker(bucket, "MyExclusiveLock"))
			{
				Console.WriteLine("Here I can do I/O without interruption");
				Thread.Sleep(5000);
				Console.WriteLine("Im done now, you can release the lock");
			}

			Console.WriteLine("At this point the lock should be release and other process can acquire it");
		}
	}
}
Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.