Combinations

Combinations, or k-combinations, are the unordered sets of k elements chosen from a set of size n.

For example, there are 10 3-combinations of the 5-set {0, 1, 2, 3, 4}:

{0, 1, 2}
{0, 1, 3}
{0, 1, 4}
{0, 2, 3}
{0, 2, 4}
{0, 3, 4}
{1, 2, 3}
{1, 2, 4}
{1, 3, 4}
{2, 3, 4}

The set of all combinations 0 ≤ k ≤ n is the power set.

Combinations that can contain duplicates are called multicombinations.

Combinations of a set containing duplicates are combinations of a multiset.

Combinations where order is significant are called k-permutations.

This algorithm produces the combinations in lexicographic order, with the elements in each combination strictly increasing in magnitude.

Begin with the combination containing the the numbers from 0 to k – 1, and at each step:

  1. Find the rightmost element ar[i] that is less than the maximum value it can have (which is (n – 1) – (k – 1) – i)
  2. Increment it
  3. Turn the elements after it into a linear sequence continuing from ar[i]
unsigned int next_combination(unsigned int *ar, size_t n, unsigned int k)
{
	unsigned int finished = 0;
	unsigned int changed = 0;
	unsigned int i;

	if (k > 0) {
		for (i = k - 1; !finished && !changed; i--) {
			if (ar[i] < (n - 1) - (k - 1) + i) {
				/* Increment this element */
				ar[i]++;
				if (i < k - 1) {
					/* Turn the elements after it into a linear sequence */
					unsigned int j;
					for (j = i + 1; j < k; j++) {
						ar[j] = ar[j - 1] + 1;
					}
				}
				changed = 1;
			}
			finished = i == 0;
		}
		if (!changed) {
			/* Reset to first combination */
			for (i = 0; i < k; i++) {
				ar[i] = i;
			}
		}
	}
	return changed;
}

Fibonacci numbers in C

The recurrence formula for the \(n\)th Fibonacci number is:

\(
F_{n} = \begin{cases} F_{n – 1} + F_{n – 2} & \textrm{if}\;n > 1\\ 1 &\textrm{if}\;n = 1\\ 0 &\textrm{if}\;n = 0.\end{cases}
\)

A naive recursive implementation of this would be:

unsigned int fib(unsigned int n)
{
	if (n == 0) return 0;
	if (n == 1) return 1;
	return fib(n - 1) + fib(n - 2);
}

But this runs in exponential time because it keeps recalculating the same results in the recursion.

If we add some instrumentation, we can see what it’s doing:

struct calltree {
    unsigned int value;
    struct calltree *left;
    struct calltree *right;
};
typedef struct calltree calltree;

typedef void (*calltreefn)(unsigned int, unsigned int, void *);

calltree *calltree_create(unsigned int value)
{
    calltree *tree = malloc(sizeof(calltree));
    if (tree) {
        tree->value = value;
        tree->left = NULL;
        tree->right = NULL;
    }
    return tree;
}

void calltree_delete(calltree *tree)
{
    if (tree != NULL) {
        calltree_delete(tree->left);
        calltree_delete(tree->right);
        free(tree);
    }
}

void calltree_for_each(const calltree *tree, unsigned int level, calltreefn fun, void *data)
{
    if (tree != NULL) {
        fun(level, tree->value, data);
        calltree_for_each(tree->left, level + 1, fun, data);
        calltree_for_each(tree->right, level + 1, fun, data);
    }
}

unsigned int fib(unsigned int n, calltree **tree)
{
    *tree = calltree_create(n);
	if (n == 0) return 0;
	if (n == 1) return 1;
	return fib(n - 1, &((*tree)->left)) + fib(n - 2, &((*tree)->right));
}

Here is an example program to calculate \(F_{7}\):

void print_call_text(unsigned int level, unsigned int value, void *data)
{
    unsigned int i;
    for (i = 0; i < level; i++) {
        putchar(' ');
    }
    printf("fib(%d)\n", value);
}

int main(void)
{
    calltree *tree;
    fib(7, &tree);
    calltree_for_each(tree, 0, print_call_text, NULL);
    calltree_delete(tree);
    printf("%u\n", result);
	return 0;
}

The call tree produced:

fib(7)
 fib(6)
  fib(5)
   fib(4)
    fib(3)
     fib(2)
      fib(1)
      fib(0)
     fib(1)
    fib(2)
     fib(1)
     fib(0)
   fib(3)
    fib(2)
     fib(1)
     fib(0)
    fib(1)
  fib(4)
   fib(3)
    fib(2)
     fib(1)
     fib(0)
    fib(1)
   fib(2)
    fib(1)
    fib(0)
 fib(5)
  fib(4)
   fib(3)
    fib(2)
     fib(1)
     fib(0)
    fib(1)
   fib(2)
    fib(1)
    fib(0)
  fib(3)
   fib(2)
    fib(1)
    fib(0)
   fib(1)

We can get rid of all of these recursive calls by caching the results instead:

unsigned int fib(unsigned int n)
{
    unsigned int *numbers;
    unsigned int i;
    unsigned int result;
    if (n == 0) return 0;
    numbers = malloc((n + 1) * sizeof(int));
    if (numbers == NULL) {
        return 0;
    }
    numbers[0] = 0;
    numbers[1] = 1;
    for (i = 2; i <= n; i++) {
        numbers[i] = numbers[i - 2] + numbers[i - 1];
    }
    result = numbers[n];
    free(numbers); 
    return result;
}

If we were going to calculate a lot of Fibonacci numbers we could store the cache between calls rather than creating it anew each time.

Permutations

The permutations of a set are the ways of arranging its elements.

For example, there are 24 permutations of a set of 4 elements:

[0, 1, 2, 3]
[0, 1, 3, 2]
[0, 2, 1, 3]
[0, 2, 3, 1]
[0, 3, 1, 2]
[0, 3, 2, 1]
[1, 0, 2, 3]
[1, 0, 3, 2]
[1, 2, 0, 3]
[1, 2, 3, 0]
[1, 3, 0, 2]
[1, 3, 2, 0]
[2, 0, 1, 3]
[2, 0, 3, 1]
[2, 1, 0, 3]
[2, 1, 3, 0]
[2, 3, 0, 1]
[2, 3, 1, 0]
[3, 0, 1, 2]
[3, 0, 2, 1]
[3, 1, 0, 2]
[3, 1, 2, 0]
[3, 2, 0, 1]
[3, 2, 1, 0]

To find the next permutation of an array ar:

  1. Find the highest index, i1 such that ar[i1] is the first of a pair of elements in ascending order.
    If there isn’t one, the sequence is the highest permutation, so reverse the whole thing to begin again.

  2. Find the highest index i2, such that i2 > i1 and ar[i2] > ar[i1].
  3. Swap ar[i1] and ar[i2].
  4. The elements from ar[i1 + 1] to the end are now in descending order (a later permutation), so reverse them.
static void swap(unsigned int *ar, unsigned int first, unsigned int second)
{
	unsigned int temp = ar[first];
	ar[first] = ar[second];
	ar[second] = temp;
}

static void reverse(unsigned int *ar, size_t len)
{
	unsigned int i, j;

	for (i = 0, j = len - 1; i < j; i++, j--) {
		swap(ar, i, j);
	}
}

unsigned int next_permutation(unsigned int *ar, size_t len)
{
	unsigned int i1, i2;
	unsigned int result = 0;
	
	/* Find the rightmost element that is the first in a pair in ascending order */
	for (i1 = len - 2, i2 = len - 1; ar[i2] <= ar[i1] && i1 != 0; i1--, i2--);
	if (ar[i2] <= ar[i1]) {
		/* If not found, array is highest permutation */
		reverse(ar, len);
	}
	else {
		/* Find the rightmost element to the right of i1 that is greater than ar[i1] */
		for (i2 = len - 1; i2 > i1 && ar[i2] <= ar[i1]; i2--);
		/* Swap it with the first one */
		swap(ar, i1, i2);
		/* Reverse the remainder */
		reverse(ar + i1 + 1, len - i1 - 1);
		result = 1;
	}
	return result;
}

Cartesian Product

The cartesian product of n sets is the set of ordered n-tuples containing one element from each set.

For example, the cartesian product of the sets {a, b}, {p, q, r} and {w, x, y, z} is:

[a, p, w]
[a, p, x]
[a, p, y]
[a, p, z]
[a, q, w]
[a, q, x]
[a, q, y]
[a, q, z]
[a, r, w]
[a, r, x]
[a, r, y]
[a, r, z]
[b, p, w]
[b, p, x]
[b, p, y]
[b, p, z]
[b, q, w]
[b, q, x]
[b, q, y]
[b, q, z]
[b, r, w]
[b, r, x]
[b, r, y]
[b, r, z]

The algorithm uses an array of n integers. Each number in the array represents the index number within the corresponding set of that element.

The algorithm then simply needs to count upwards from n zeroes, with the cardinality of each set determining when a digit rolls over.

For example, these are the correspondences between the array and the n-tuple for the sets in the example:

[0, 0, 0] = [a, p, w]
[0, 0, 1] = [a, p, x]
[0, 0, 2] = [a, p, y]
[0, 0, 3] = [a, p, z]
[0, 1, 0] = [a, q, w]
[0, 1, 1] = [a, q, x]
[0, 1, 2] = [a, q, y]
[0, 1, 3] = [a, q, z]
[0, 2, 0] = [a, r, w]
[0, 2, 1] = [a, r, x]
[0, 2, 2] = [a, r, y]
[0, 2, 3] = [a, r, z]
[1, 0, 0] = [b, p, w]
[1, 0, 1] = [b, p, x]
[1, 0, 2] = [b, p, y]
[1, 0, 3] = [b, p, z]
[1, 1, 0] = [b, q, w]
[1, 1, 1] = [b, q, x]
[1, 1, 2] = [b, q, y]
[1, 1, 3] = [b, q, z]
[1, 2, 0] = [b, r, w]
[1, 2, 1] = [b, r, x]
[1, 2, 2] = [b, r, y]
[1, 2, 3] = [b, r, z]
unsigned int next_n_tuple(unsigned int *ar, size_t len, const size_t *sizes)
{
	unsigned int changed = 0;
	unsigned int finished = 0;
	unsigned int i;

	for (i = len - 1; !changed && !finished; i--) {
		if (ar[i] < sizes[i] - 1) {
			/* Increment */
			ar[i]++;
			changed = 1;
		}
		else {
			/* Roll over */
			ar[i] = 0;
		}
		finished = i == 0;
	}

	return changed;
}	

Partitions

A partition of a set is a division of it into disjoint subsets whose union is the set itself.

For example, the partitions of the set {a, b, c, d} are:

{{a, b, c, d}}
{{a, b, c}, {d}}
{{a, b, d}, {c}}
{{a, b}, {c, d}}
{{a, b}, {c}, {d}}
{{a, c, d}, {b}}
{{a, c}, {b, d}}
{{a, c}, {b}, {d}}
{{a, d}, {b, c}}
{{a}, {b, c, d}}
{{a}, {b, c}, {d}}
{{a, d}, {b}, {c}}
{{a}, {b, d}, {c}}
{{a}, {b}, {c, d}}
{{a}, {b}, {c}, {d}}

The scheme I am using is as follows: a partition is represented by an array of the same size as the set, and each element in the array is an integer specifying the subset in the partition in which the corresponding element occurs. For example, the array [0, 0, 0, 0] indicates that every element occurs in set 0, the first set, i.e., the partition {{a, b, c, d}}, while the array [0, 1, 2, 3] puts every element in its own set, i.e., the partition {{a}, {b}, {c}, {d}}.

The correspondences for the partitions of a 4-set are as follows:

[0, 0, 0, 0] = {{a, b, c, d}}
[0, 0, 0, 1] = {{a, b, c}, {d}}
[0, 0, 1, 0] = {{a, b, d}, {c}}
[0, 0, 1, 1] = {{a, b}, {c, d}}
[0, 0, 1, 2] = {{a, b}, {c}, {d}}
[0, 1, 0, 0] = {{a, c, d}, {b}}
[0, 1, 0, 1] = {{a, c}, {b, d}}
[0, 1, 0, 2] = {{a, c}, {b}, {d}}
[0, 1, 1, 0] = {{a, d}, {b, c}}
[0, 1, 1, 1] = {{a}, {b, c, d}}
[0, 1, 1, 2] = {{a}, {b, c}, {d}}
[0, 1, 2, 0] = {{a, d}, {b}, {c}}
[0, 1, 2, 1] = {{a}, {b, d}, {c}}
[0, 1, 2, 2] = {{a}, {b}, {c, d}}
[0, 1, 2, 3] = {{a}, {b}, {c}, {d}}

Having established how to represent partitions, the algorithm simply needs to produce the arrays shown above. At each step:

  1. Find the rightmost element that is no more than any element to its left
  2. Increment it
  3. Set all of the elements after it to 0
unsigned int next_partition(unsigned int *ar, size_t len)
{	
	unsigned int result = 0;
	if (ar[len - 1] < len - 1) {
		unsigned int i;
		unsigned int finished = 0;
		unsigned int changed = 0;
		/* Find the rightmost element no more than the other elements */
		for (i = len - 1; !finished && !changed; i--) {
			unsigned int j, max = 0;
			/* Find the highest element to the left of this one */
			for (j = 0; j < i; j++) {
				if (ar[j] > max) {
					max = ar[j];
				}
			}
			if (ar[i] <= max) {
				/* Increment */
				ar[i]++;
				changed = 1;
				/* Set the following elements to 0 */
				for (j = i + 1; j < len; j++) {
					ar[j] = 0;
				}
			}
			finished = i == 1;
		}
		result = 1;
	}

	return result;
}

Subsets using Gray codes

When calculations are being performed on each element of the power set, it can be advantageous if each set differs from its predecessor by as few elements as possible. One can produce the subsets in order such that each subset differs from its predecessor by only one element. The characteristic vectors of sets in this order are called Gray codes.

These are the Gray codes in order for a set of 3 elements:

[0, 0, 0]
[1, 0, 0]
[1, 1, 0]
[0, 1, 0]
[0, 1, 1]
[1, 1, 1]
[1, 0, 1]
[0, 0, 1]

They correspond to the following subsets. Notice how each subset differs from its predecessor by only 1 element:

{}
{a}
{a, b}
{b}
{b, c}
{a, b, c}
{a, c}
{c}

In order to go from one Gray code to its successor, it is necessary to find which single value in the array to change from 1 to 0, or vice-versa.

The rules for finding the index of this element j are based on the number of 1s in the current code, k, as follows:

  • If k is even, then j = 0
  • If k is odd, then j is the index of the first cell that follows the first 1
unsigned int next_subset_gray(unsigned int *ar, size_t n)
{
    /* Find the number of 1s, and the first 1 */
	unsigned int i;
    unsigned int k = 0;
    int first_1 = -1;
    unsigned int finished = 0;
    for (i = 0; i < n; i++) {
        if (ar[i] == 1) {
            k++;
            if (first_1 == -1) {
                first_1 = i;
            }
        }
    }
    /* Flip the relevant digit */
    if (k % 2 == 0) {
        /* First digit */
        ar[0] ^= 1;
    }
    else if (first_1 < n - 1) {
        /* Digit after the first 1 */
        ar[first_1 + 1] ^= 1;
    }
    else {
        /* Last digit */
        ar[n - 1] = 0;
        finished = 1;
    }
    return finished == 0;
}

Subsets

The set of all subsets of a set is called the power set.

For example, the power set of the set {a, b, c} consists of the sets:

{}
{a}
{b}
{c}
{a, b}
{a, c}
{b, c}
{a, b, c}

Note that:

  • The empty set {} is in the power set
  • The set itself is in the power set

The set of all subsets of a particular size, or k-subsets, are combinations.

A subset can be represented as an array of boolean values of the same size as the set, called a characteristic vector. Each boolean value indicates whether the corresponding element in the set is present or absent in the subset.

This gives following correspondences for the set {a, b, c}:

[0, 0, 0] = {}
[1, 0, 0] = {a}
[0, 1, 0] = {b}
[0, 0, 1] = {c}
[1, 1, 0] = {a, b}
[1, 0, 1] = {a, c}
[0, 1, 1] = {b, c}
[1, 1, 1] = {a, b, c}

The algorithm then simply needs to produce the arrays shown above. Observing how they change, one can derive the following algorithm:

  • Find the rightmost 1 with a 0 on its right. If it exists:
    1. Move it one cell to the right
    2. Move all of the 1s on its right, which are packed on the right hand side, to the cells immediately behind it
  • Otherwise, If all of the 1s are on the right hand side and ar[0] is 0, add another 1 and move the other 1s immediately behind it
  • Otherwise, all subsets have been produced, so return the array to its starting arrangement of all 0s
unsigned int next_subset(unsigned int *ar, size_t n)
{
	int i, ones = 0;
	unsigned int found = 0;
	unsigned int result = 1;

	/* Find the rightmost 1 with a 0 on its right, or the number of 1s */
	for (i = n - 2; !found && i >= 0; i--) {
		found = ar[i] == 1 && ar[i + 1] == 0;
		ones += ar[i + 1] == 1;
	}
	if (found) {
		/* Move the 1 right */
		ar[++i] = 0;
		ar[++i] = 1;
        /* Pack the 1s to its right immediately behind it */
        i++;
        for (; i < n; i++, ones--) {
            if (ones > 0) {
                ar[i] = 1;
            }
            else {
                ar[i] = 0;
            }
        }
	}
	else  if (ar[0] == 0) {
		/* Add a new 1 and place them all at the beginning */
		for (i = 0; i < n; i++) {
			if (i < ones + 1) {
				ar[i] = 1;
			}
			else {
				ar[i] = 0;
			}
		}
	}
	else {
		/* Finished, return to the starting arrangement */
		for (i = 0; i < n; i++) {
			ar[i] = 0;
		}
		result = 0;
	}
	return result;
}

Circular linked list in C

A circular linked list is a linked list in which the head node’s previous pointer points to the tail node, and the tail node’s next pointer points to the head node. This means that the list only needs to keep track of one pointer for both head and tail.

A circular linked list containing elements labelled A to F

Looping over a circular linked list is slightly different from looping over an ordinary linked list because there is no NULL node to detect, and instead you are trying to get back to where you started. There are 2 methods of doing this.

Method 1: Use a do loop

void iterate1(const circularlist *list)
{
    cnode *node = list->head;
    if (node != NULL) {
        do {
            printf("%s\n", (const char*)node->data);
            node = node->next;
        } while (node != list->head);
    }
}

Method 2: Use a counter

void iterate2(const circularlist *list)
{
    cnode *node;
    unsigned int i;
    for (i = 0, node = list->head; i < list->count; i++, node = node->next) {
        printf("%s\n", (const char*)node->data);
    }
}

Here is the header file for the circular linked list

#ifndef CIRCULARLIST_H
#define CIRCULARLIST_H

struct cnode {
	struct cnode * next;
	struct cnode * previous;
	void * data;
};
typedef struct cnode cnode;

struct circularlist {
	cnode * head;
	unsigned int count;
};
typedef struct circularlist circularlist;

typedef void (*circularlist_forfn)(void *);

circularlist * circularlist_create(void);
void circularlist_delete(circularlist * list);

void circularlist_add_head(circularlist * list, void * data);
void circularlist_add_tail(circularlist * list, void * data);
void * circularlist_remove_head(circularlist * list);
void * circularlist_remove_tail(circularlist * list);
void * circularlist_remove_node(circularlist * list, cnode * node);
void circularlist_empty(circularlist * list);

void circularlist_insert_before(circularlist * list, cnode * node, void * data);
void circularlist_insert_after(circularlist * list, cnode * node, void * data);

void circularlist_for_each(const circularlist * list, circularlist_forfn fun);
unsigned int circularlist_get_count(const circularlist * list);

#endif /* CIRCULARLIST_H */

Here is the implementation

#include <stdlib.h>

#include <circularlist.h>

static cnode * cnode_create(void * data)
{
	cnode * node = malloc(sizeof(cnode));
	if(node) {
		node->next = node;
		node->previous = node;
		node->data = data;
	}

	return node;
}

static void cnode_delete(cnode * node)
{
	free(node);
}

circularlist * circularlist_create(void)
{
	circularlist * list = malloc(sizeof(circularlist));
	if (list) {
		list->head = NULL;
		list->count = 0;
	}

	return list;
}


void circularlist_delete(circularlist * list)
{
	if (list) {
		circularlist_empty(list);
		free(list);
	}
}

void circularlist_add_head(circularlist * list, void * data)
{
	circularlist_insert_before(list, list->head, data);
	list->head = list->head->previous;
}

void circularlist_add_tail(circularlist * list, void * data)
{
	circularlist_insert_before(list, list->head, data);
}

void * circularlist_remove_head(circularlist * list)
{
	return circularlist_remove_node(list, list->head);
}

void * circularlist_remove_tail(circularlist * list)
{
	void * data = NULL;
	if (list->head) {
		data = circularlist_remove_node(list, list->head->previous);
	}
	return data;
}

void * circularlist_remove_node(circularlist * list, cnode * node)
{
	void * data = NULL;
	if (list->count > 0) {
		if (node != node->next) { /* Or, list->count > 1 */
			node->next->previous = node->previous;
			node->previous->next = node->next;
			if (node == list->head)
				list->head = list->head->next;
		}
		else {
			/* Removing the last element */
			list->head = NULL;
		}
		data = node->data;
		cnode_delete(node);
		list->count--;
	}
	return data;
}

void circularlist_empty(circularlist * list)
{
	while (circularlist_remove_tail(list) != NULL);
}

void circularlist_insert_before(circularlist * list, cnode * node, void * data)
{
	cnode * newnode = cnode_create(data);

	if (newnode) {
		if (list->count > 0) {
			newnode->next = node;
			newnode->previous = node->previous;
			newnode->previous->next = newnode;
			node->previous = newnode;
		}
		else {
			/* Adding the first element */
			list->head = newnode;
		}
		list->count++;
	}
}

void circularlist_insert_after(circularlist * list, cnode * node, void * data)
{
	circularlist_insert_before(list, node->next, data);
}

void circularlist_for_each(const circularlist * list, circularlist_forfn fun)
{
	cnode * node = list->head;

	if (node != NULL) {
		do {
			fun(node->data);
			node = node->next;
		} while (node != list->head);
	}
}

unsigned int circularlist_get_count(const circularlist *list)
{
	return list->count;
}

Here is a test program

#include <stdio.h>
#include <string.h>

#include <circularlist.h>

int main(void)
{
    char * elements[] = {"A", "B", "C", "D", "E", "F"};
    const unsigned int n = sizeof(elements) / sizeof(const char*);
    circularlist * list;
    unsigned int i;
    cnode * node;
    unsigned int removed = 0, added = 0;

    list = circularlist_create();
    if (list) {
        /* Populate the list */
        for (i = 0; i < n; i++) {
            circularlist_add_tail(list, elements[i]);
        }

        /* Remove "C" */
        node = list->head;
        do {
            if (strcmp((const char*)node->data, "C") == 0) {
                circularlist_remove_node(list, node);
                removed = 1;
            }
            else {
                node = node->next;
            }
        } while (node != list->head && !removed);

        /* Add "X" and "Y" either side of "D" */
        node = list->head;
        do {
            if (strcmp((const char*)node->data, "D") == 0) {
                circularlist_insert_before(list, node, "X");
                circularlist_insert_after(list, node, "Y");
                added = 1;
            }
            else {
                node = node->next;
            }
        } while (node != list->head && !added);

        /* Print */
        printf("Count is %d\n", circularlist_get_count(list));
        circularlist_for_each(list, (circularlist_forfn)puts);

        circularlist_delete(list);
    }
    return 0;
}

Finding the height of a binary tree recursively

The height of a node in a binary tree is simply the maximum of the height of its left and right subtrees, plus one.

This lends itself to a simple recursive algorithm for finding the height of a binary tree.

static int max(int a, int b)
{
    if (a >= b) {
        return a;
    }
    return b;
}

unsigned int binarytree_height_recursive(const btnode *node)
{
    unsigned int height = 0;
    if (node->left || node->right) {
        height = max(node->left ? binarytree_height_recursive(node->left) : 0,
                node->right ? binarytree_height_recursive(node->right) : 0) + 1;
    }
    return height;
}

unsigned int binarytree_height(const binarytree *tree)
{
	unsigned int height = 0;
	if (tree->root) {
		height = binarytree_height_recursive(tree->root);
	}
	return height;
}

Related

Consistent hash ring

Consistent hashing was first described in a paper, Consistent hashing and random trees: Distributed caching protocols for relieving hot spots on the World Wide Web (1997) by David Karger et al. It is used in distributed storage systems like Amazon Dynamo and memcached.

Consistent hashing is a very simple solution to a common problem: how can you find a server in a distributed system to store or retrieve a value identified by a key, while at the same time being able to cope with server failures and network partitions?

Simply finding a server for value is easy; just number your set of s servers from 0 to s – 1. When you want to store or retrieve a value, hash the value’s key modulo s, and that gives you the server.

The problem comes when servers fail or become unreachable through a network partition. At that point, the servers no longer fill the hash space, so the only option is to invalidate the caches on all servers, renumber them, and start again. Given that, in a system with hundreds or thousands of servers, failures are commonplace, this solution is not feasible.
The solution

In consistent hashing, the servers, as well as the keys, are hashed, and it is by this hash that they are looked up. The hash space is large, and is treated as if it wraps around to form a circle – hence hash ring. The process of creating a hash for each server is equivalent to placing it at a point on the circumference of this circle. When a key needs to be looked up, it is hashed, which again corresponds to a point on the circle. In order to find its server, one then simply moves round the circle clockwise from this point until the next server is found. If no server is found from that point to end of the hash space, the first server is used – this is the “wrapping round” that makes the hash space circular.

The only remaining problem is that in practice hashing algorithms are likely to result in clusters of servers on the ring (or, to be more precise, some servers with a disproportionately large space before them), and this will result in greater load on the first server in the cluster and less on the remainder. This can be ameliorated by adding each server to the ring a number of times in different places. This is achieved by having a replica count, which applies to all servers in the ring, and when adding a server, looping from 0 to the count – 1, and hashing a string made from both the server and the loop variable to produce the position. This has the effect of distributing the servers more evenly over the ring. Note that this has nothing to do with server replication; each of the replicas represents the same physical server, and replication of data between servers is an entirely unrelated issue.

I’ve written an example implementation of consistent hashing in C++. As you can imagine from the description above, it isn’t terribly complicated. Here is the main class:

template <class Node, class Data, class Hash = HASH_NAMESPACE::hash<const char*> >
class HashRing
{
public:
    typedef std::map<size_t, Node> NodeMap;

    HashRing(unsigned int replicas)
        : replicas_(replicas), hash_(HASH_NAMESPACE::hash<const char*>())
    {
    }

    HashRing(unsigned int replicas, const Hash& hash)
        : replicas_(replicas), hash_(hash)
    {
    }

    size_t AddNode(const Node& node);
    void RemoveNode(const Node& node);
    const Node& GetNode(const Data& data) const;

private:
    NodeMap ring_;
    const unsigned int replicas_;
    Hash hash_;
};

template <class Node, class Data, class Hash>
size_t HashRing<Node, Data, Hash>::AddNode(const Node& node)
{
    size_t hash;
    std::string nodestr = Stringify(node);
    for (unsigned int r = 0; r < replicas_; r++) {
        hash = hash_((nodestr + Stringify(r)).c_str());
        ring_[hash] = node;
    }
    return hash;
}

template <class Node, class Data, class Hash>
void HashRing<Node, Data, Hash>::RemoveNode(const Node& node)
{
    std::string nodestr = Stringify(node);
    for (unsigned int r = 0; r < replicas_; r++) {
        size_t hash = hash_((nodestr + Stringify(r)).c_str());
        ring_.erase(hash);
    }
}

template <class Node, class Data, class Hash>
const Node& HashRing<Node, Data, Hash>::GetNode(const Data& data) const
{
    if (ring_.empty()) {
        throw EmptyRingException();
    }
    size_t hash = hash_(Stringify(data).c_str());
    typename NodeMap::const_iterator it;
    // Look for the first node >= hash
    it = ring_.lower_bound(hash);
    if (it == ring_.end()) {
        // Wrapped around; get the first node
        it = ring_.begin();
    }
    return it->second;
}

A few points to note:

  • The default hash function is hash from <map>.
    In practice you probably don’t want to use this. Something like MD5 would probably be best.
  • I had to define HASH_NAMESPACE because g++ puts the non-standard hash in a different namespace than that which other compilers do.
    Hopefully this will all be resolved with the widespread availablity of std::unordered_map.
  • The Node and Data types need to have operator << defined for a std::ostream.
    This is because I write them to an ostringstream in order to "stringify" them before getting the hash.