Vaughan

ConcurrentDictionary internals

Internals

The nice thing about .net being opensource is that at any time you can look at the source of interesting classes that you rely on. In this post I look at the ConcurrentDictionary class to get a better idea of how Microsoft writes threadsafe classes. As a recap: generally a normal dictionary is implemented internally as an array of buckets. When you add an element, the hash of the key will get you to the right bucket and then each item in the bucket will be compared for equality to see if it exists. It is for this reason that you need to be careful of the hashing algorithm that you implement because ideally for quicker lookups you want more buckets with less items than a small amount of buckets with many elements. A worst case example would be a dictionary resolving to a single bucket for all its items which will then have a o(n) lookup instead of a o(1) lookup. ConcurrentDictionary is quite a large and complex class and so I will try cover the core parts of it. Central to how it works is in this internal class within it.

private class Tables
{
      internal readonly Node[] m_buckets; // A singly-linked list for each bucket.
      internal readonly object[] m_locks; // A set of locks, each guarding a section of the table.
      internal volatile int[] m_countPerLock; // The number of elements guarded by each lock.
      internal readonly IEqualityComparer<TKey> m_comparer; // Key equality comparer

      internal Tables(Node[] buckets, object[] locks, int[] countPerLock, IEqualityComparer<TKey> comparer)    
      {
            m_buckets = buckets;
            m_locks = locks;
            m_countPerLock = countPerLock;
            m_comparer = comparer;
      }
}

So reading that, you can see that it contains an array of buckets, array of locks and an array of the number of elements guarded by each lock. When an item is added, a lock related to that items bucket is taken. So in effect the lock should only cover a small section of the datastructure if the hashing is working well. What is interesting is that it is implemented in a way that it wants a couple of items in each bucket and essentially compresses the buckets together. I think the reason for this is that having a single lock per key is also inefficient. The way it handles how many items is under each lock is the m_countPerLock array. Each time an item is added to a bucket, this number is compared to a configured max size. If the number is exceeded then the whole table is resized.

public bool TryGetValue(TKey key, out TValue value)
{
    if (key == null) throw new ArgumentNullException(key);

    int bucketNo, lockNoUnused;

    // We must capture the m_buckets field in a local variable. It is set to a new table on each table resize.
    Tables tables = m_tables;
    IEqualityComparer<TKey> comparer = tables.m_comparer;
    GetBucketAndLockNo(comparer.GetHashCode(key), out bucketNo, out lockNoUnused, tables.m_buckets.Length, tables.m_locks.Length);

    // We can get away w/out a lock here.
    // The Volatile.Read ensures that the load of the fields of 'n' doesn't move before the load from buckets[i].
    Node n = Volatile.Read<Node>(ref tables.m_buckets[bucketNo]);

    while (n != null)
    {
        if (comparer.Equals(n.m_key, key))
        {
            value = n.m_value;
            return true;
        }
        n = n.m_next;
    }

    value = default(TValue);
    return false;
}

private void GetBucketAndLockNo(int hashcode, out int bucketNo, out int lockNo, int bucketCount, int lockCount)
{
    bucketNo = (hashcode & 0x7fffffff) % bucketCount;
    lockNo = bucketNo % lockCount;

    Assert(bucketNo >= 0 && bucketNo < bucketCount);
    Assert(lockNo >= 0 && lockNo < lockCount);
}

A few things I found interesting: bucketNo uses a bitmask to remove the sign bit (so always positive) and then uses the mod operator to get a number between 0 and the bucket count. Very clever manipulation even if it lacks a bit of readability. The read of the nodes is done without a lock. Volatile.Read will create a memory barrier that will ensure that the node retrieved will be the latest version in a mutlithreaded context. So with processors with a weak memory model the read won't be reordered and you can assume that threads on other cpus shouldn't use a cached value. TryAddInternal is also really interesting in its implementation.


private bool TryAddInternal(TKey key, TValue value, bool updateIfExists, bool acquireLock, out TValue resultingValue)
        {
            while (true)
            {
                int bucketNo, lockNo;
                int hashcode;

                Tables tables = m_tables;
                IEqualityComparer<TKey> comparer = tables.m_comparer;
                hashcode = comparer.GetHashCode(key);
                GetBucketAndLockNo(hashcode, out bucketNo, out lockNo, tables.m_buckets.Length, tables.m_locks.Length);

                bool resizeDesired = false;
                bool lockTaken = false;
#if FEATURE_RANDOMIZED_STRING_HASHING
#if !FEATURE_CORECLR                
                bool resizeDueToCollisions = false;
#endif // !FEATURE_CORECLR
#endif

                try
                {
                    if (acquireLock)
                        Monitor.Enter(tables.m_locks[lockNo], ref lockTaken);

                    // If the table just got resized, we may not be holding the right lock, and must retry.
                    // This should be a rare occurence.
                    if (tables != m_tables)
                    {
                        continue;
                    }

#if FEATURE_RANDOMIZED_STRING_HASHING
#if !FEATURE_CORECLR
                    int collisionCount = 0;
#endif // !FEATURE_CORECLR
#endif

                    // Try to find this key in the bucket
                    Node prev = null;
                    for (Node node = tables.m_buckets[bucketNo]; node != null; node = node.m_next)
                    {
                        Assert((prev == null && node == tables.m_buckets[bucketNo]) || prev.m_next == node);
                        if (comparer.Equals(node.m_key, key))
                        {
                            // The key was found in the dictionary. If updates are allowed, update the value for that key.
                            // We need to create a new node for the update, in order to support TValue types that cannot
                            // be written atomically, since lock-free reads may be happening concurrently.
                            if (updateIfExists)
                            {
                                if (s_isValueWriteAtomic)
                                {
                                    node.m_value = value;
                                }
                                else
                                {
                                    Node newNode = new Node(node.m_key, value, hashcode, node.m_next);
                                    if (prev == null)
                                    {
                                        tables.m_buckets[bucketNo] = newNode;
                                    }
                                    else
                                    {
                                        prev.m_next = newNode;
                                    }
                                }
                                resultingValue = value;
                            }
                            else
                            {
                                resultingValue = node.m_value;
                            }
                            return false;
                        }
                        prev = node;

#if FEATURE_RANDOMIZED_STRING_HASHING
#if !FEATURE_CORECLR
                        collisionCount++;
#endif // !FEATURE_CORECLR
#endif
                    }

#if FEATURE_RANDOMIZED_STRING_HASHING
#if !FEATURE_CORECLR
                    if(collisionCount > HashHelpers.HashCollisionThreshold && HashHelpers.IsWellKnownEqualityComparer(comparer)) 
                    {
                        resizeDesired = true;
                        resizeDueToCollisions = true;
                    }
#endif // !FEATURE_CORECLR
#endif

                    // The key was not found in the bucket. Insert the key-value pair.
                    Volatile.Write<Node>(ref tables.m_buckets[bucketNo], new Node(key, value, hashcode, tables.m_buckets[bucketNo]));
                    checked
                    {
                        tables.m_countPerLock[lockNo]++;
                    }

                    //
                    // If the number of elements guarded by this lock has exceeded the budget, resize the bucket table.
                    // It is also possible that GrowTable will increase the budget but won't resize the bucket table.
                    // That happens if the bucket table is found to be poorly utilized due to a bad hash function.
                    //
                    if (tables.m_countPerLock[lockNo] > m_budget)
                    {
                        resizeDesired = true;
                    }
                }
                finally
                {
                    if (lockTaken)
                        Monitor.Exit(tables.m_locks[lockNo]);
                }

                //
                // The fact that we got here means that we just performed an insertion. If necessary, we will grow the table.
                //
                // Concurrency notes:
                // - Notice that we are not holding any locks at when calling GrowTable. This is necessary to prevent deadlocks.
                // - As a result, it is possible that GrowTable will be called unnecessarily. But, GrowTable will obtain lock 0
                //   and then verify that the table we passed to it as the argument is still the current table.
                //
                if (resizeDesired)
                {
#if FEATURE_RANDOMIZED_STRING_HASHING
#if !FEATURE_CORECLR
                    if (resizeDueToCollisions)
                    {
                        GrowTable(tables, (IEqualityComparer<TKey>)HashHelpers.GetRandomizedEqualityComparer(comparer), true, m_keyRehashCount);
                    }
                    else
#endif // !FEATURE_CORECLR
                    {
                        GrowTable(tables, tables.m_comparer, false, m_keyRehashCount);
                    }
#else
                    GrowTable(tables, tables.m_comparer, false, m_keyRehashCount);
#endif
                }

                resultingValue = value;
                return true;
            }
        }

It runs in a loop with checks to see if other threads have mutated the table its working on. If it detects that then it just starts again. Otherwise in the same way as above, it finds a bucket and adds the value to a linked list. Also in the same way as the get method, it uses a memory barrier to write a new node instead of a lock. At the end of adding an item, it checks if the table needs to be resized again. This class does quite a bit more than these few methods but these do give a taste of what is involved in writing something this complex!

BMC logoBuy me a coffee