class ThreadSafe::AtomicReferenceCacheBackend
A Ruby port of the Doug Lea's jsr166e.ConcurrentHashMapV8 class version 1.59 available in public domain. Original source code available here: gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166e/ConcurrentHashMapV8.java?revision=1.59
The Ruby port skips out the TreeBin
(red-black trees for use
in bins whose size exceeds a threshold).
A hash table supporting full concurrency of retrievals and high expected concurrency for updates. However, even though all operations are thread-safe, retrieval operations do not entail locking, and there is not any support for locking the entire table in a way that prevents all access.
Retrieval operations generally do not block, so may overlap with update
operations. Retrievals reflect the results of the most recently
completed update operations holding upon their onset. (More
formally, an update operation for a given key bears a
happens-before relation with any (non nil
) retrieval
for that key reporting the updated value.) For aggregate operations such
as +clear()+, concurrent retrievals may reflect insertion or removal of
only some entries. Similarly, the each_pair
iterator yields
elements reflecting the state of the hash table at some point at or since
the start of the each_pair
. Bear in mind that the results of
aggregate status methods including +size()+ and empty?
} are
typically useful only when a map is not undergoing concurrent updates in
other threads. Otherwise the results of these methods reflect transient
states that may be adequate for monitoring or estimation purposes, but not
for program control.
The table is dynamically expanded when there are too many collisions (i.e.,
keys that have distinct hash codes but fall into the same slot modulo the
table size), with the expected average effect of maintaining roughly two
bins per mapping (corresponding to a 0.75 load factor threshold for
resizing). There may be much variance around this average as mappings are
added and removed, but overall, this maintains a commonly accepted
time/space tradeoff for hash tables. However, resizing this or any other
kind of hash table may be a relatively slow operation. When possible, it is
a good idea to provide a size estimate as an optional :initial_capacity
initializer argument. An additional optional :load_factor constructor
argument provides a further means of customizing initial table capacity by
specifying the table density to be used in calculating the amount of space
to allocate for the given number of elements. Note that using many keys
with exactly the same hash
is a sure way to slow down
performance of any hash table.
Design overview¶ ↑
The primary design goal of this hash table is to maintain concurrent
readability (typically method +[]+, but also iteration and related methods)
while minimizing update contention. Secondary goals are to keep space
consumption about the same or better than plain Hash
, and to
support high initial insertion rates on an empty table by many threads.
Each key-value mapping is held in a Node
. The validation-based
approach explained below leads to a lot of code sprawl because
retry-control precludes factoring into smaller methods.
The table is lazily initialized to a power-of-two size upon the first
insertion. Each bin in the table normally contains a list of +Node+s (most
often, the list has only zero or one Node
). Table accesses require
volatile/atomic reads, writes, and CASes. The lists of nodes within bins
are always accurately traversable under volatile reads, so long as lookups
check hash code and non-nullness of value before checking key equality.
We use the top two bits of Node
hash fields for control
purposes – they are available anyway because of addressing constraints. As
explained further below, these top bits are used as follows:
00 - Normal 01 - Locked 11 - Locked and may have a thread waiting for lock 10 - +Node+ is a forwarding node
The lower 28 bits of each Node
's hash field contain a the
key's hash code, except for forwarding nodes, for which the lower bits
are zero (and so always have hash field == MOVED
).
Insertion (via +[]=+ or its variants) of the first node in an empty bin is
performed by just CASing it to the bin. This is by far the most common
case for put operations under most key/hash distributions. Other update
operations (insert, delete, and replace) require locks. We do not want to
waste the space required to associate a distinct lock object with each bin,
so instead use the first node of a bin list itself as a lock. Blocking
support for these locks relies +Util::CheapLockable. However, we also need
a try_lock
construction, so we overlay these by using bits of
the Node
hash field for lock control (see above), and so
normally use builtin monitors only for blocking and signalling using
cheap_wait
/cheap_broadcast
constructions. See
+Node#try_await_lock+.
Using the first node of a list as a lock does not by itself suffice though: When a node is locked, any update must first validate that it is still the first node after locking it, and retry if not. Because new nodes are always appended to lists, once a node is first in a bin, it remains first until deleted or the bin becomes invalidated (upon resizing). However, operations that only conditionally update may inspect nodes until the point of update. This is a converse of sorts to the lazy locking technique described by Herlihy & Shavit.
The main disadvantage of per-bin locks is that other update operations on
other nodes in a bin list protected by the same lock can stall, for example
when user eql?
or mapping functions take a long time.
However, statistically, under random hash codes, this is not a common
problem. Ideally, the frequency of nodes in bins follows a Poisson
distribution (en.wikipedia.org/wiki/Poisson_distribution)
with a parameter of about 0.5 on average, given the resizing threshold of
0.75, although with a large variance because of resizing granularity.
Ignoring variance, the expected occurrences of list size k are (exp(-0.5) *
pow(0.5, k) / factorial(k)). The first values are:
0: 0.60653066 1: 0.30326533 2: 0.07581633 3: 0.01263606 4: 0.00157952 5: 0.00015795 6: 0.00001316 7: 0.00000094 8: 0.00000006 more: less than 1 in ten million
Lock contention probability for two threads accessing distinct elements is roughly 1 / (8 * elements) under random hashes.
The table is resized when occupancy exceeds a percentage threshold
(nominally, 0.75, but see below). Only a single thread performs the resize
(using field size_control
, to arrange exclusion), but the
table otherwise remains usable for reads and updates. Resizing proceeds by
transferring bins, one by one, from the table to the next table. Because
we are using power-of-two expansion, the elements from each bin must either
stay at same index, or move with a power of two offset. We eliminate
unnecessary node creation by catching cases where old nodes can be reused
because their next fields won't change. On average, only about
one-sixth of them need cloning when a table doubles. The nodes they replace
will be garbage collectable as soon as they are no longer referenced by any
reader thread that may be in the midst of concurrently traversing table.
Upon transfer, the old table bin contains only a special forwarding node
(with hash field MOVED
) that contains the next table as its
key. On encountering a forwarding node, access and update operations
restart, using the new table.
Each bin transfer requires its bin lock. However, unlike other cases, a
transfer can skip a bin if it fails to acquire its lock, and revisit it
later. Method rebuild
maintains a buffer of TRANSFER_BUFFER_SIZE
bins that have been skipped because of failure to acquire a lock, and
blocks only if none are available (i.e., only very rarely). The transfer
operation must also ensure that all accessible bins in both the old and new
table are usable by any traversal. When there are no lock acquisition
failures, this is arranged simply by proceeding from the last bin
(+table.size - 1+) up towards the first. Upon seeing a forwarding node,
traversals arrange to move to the new table without revisiting nodes.
However, when any node is skipped during a transfer, all earlier table bins
may have become visible, so are initialized with a reverse-forwarding node
back to the old table until the new ones are established. (This sometimes
requires transiently locking a forwarding node, which is possible under the
above encoding.) These more expensive mechanics trigger only when
necessary.
The traversal scheme also applies to partial traversals of ranges of bins (via an alternate Traverser constructor) to support partitioned aggregate operations. Also, read-only operations give up if ever forwarded to a null table, which provides support for shutdown-style clearing, which is also not currently implemented.
Lazy table initialization minimizes footprint until first use.
The element count is maintained using a
ThreadSafe::Util::Adder
, which avoids contention on updates
but can encounter cache thrashing if read too frequently during concurrent
access. To avoid reading so often, resizing is attempted either when a bin
lock is contended, or upon adding to a bin already holding two or more
nodes (checked before adding in the x_if_absent
methods, after
adding in others). Under uniform hash distributions, the probability of
this occurring at threshold is around 13%, meaning that only about 1 in 8
puts check threshold (and after resizing, many fewer do so). But this
approximation has high variance for small table sizes, so we check on any
collision for sizes <= 64. The bulk putAll operation further reduces
contention by only committing count updates upon these size checks.
Constants
- DEFAULT_CAPACITY
- HASH_BITS
- LOCKED
- MAX_CAPACITY
- MOVED
shorthands
- NOW_RESIZING
- TRANSFER_BUFFER_SIZE
The buffer size for skipped bins during transfers. The value is arbitrary but should be large enough to avoid most locking stalls during resizes.
- WAITING
Public Class Methods
# File lib/thread_safe/atomic_reference_cache_backend.rb, line 370 def initialize(options = nil) super() @counter = Util::Adder.new initial_capacity = options && options[:initial_capacity] || DEFAULT_CAPACITY self.size_control = (capacity = table_size_for(initial_capacity)) > MAX_CAPACITY ? MAX_CAPACITY : capacity end
Public Instance Methods
# File lib/thread_safe/atomic_reference_cache_backend.rb, line 395 def [](key) get_or_default(key) end
# File lib/thread_safe/atomic_reference_cache_backend.rb, line 403 def []=(key, value) get_and_set(key, value) value end
Implementation for clear. Steps through each bin, removing all nodes.
# File lib/thread_safe/atomic_reference_cache_backend.rb, line 542 def clear return self unless current_table = table current_table_size = current_table.size deleted_count = i = 0 while i < current_table_size if !(node = current_table.volatile_get(i)) i += 1 elsif (node_hash = node.hash) == MOVED current_table = node.key current_table_size = current_table.size elsif Node.locked_hash?(node_hash) decrement_size(deleted_count) # opportunistically update count deleted_count = 0 node.try_await_lock(current_table, i) else current_table.try_lock_via_hash(i, node, node_hash) do begin deleted_count += 1 if NULL != node.value # recheck under lock node.value = nil end while node = node.next current_table.volatile_set(i, nil) i += 1 end end end decrement_size(deleted_count) self end
# File lib/thread_safe/atomic_reference_cache_backend.rb, line 443 def compute(key) internal_compute(key) do |old_value| if (new_value = yield(NULL == old_value ? nil : old_value)).nil? NULL else new_value end end end
# File lib/thread_safe/atomic_reference_cache_backend.rb, line 408 def compute_if_absent(key) hash = key_hash(key) current_table = table || initialize_table while true if !(node = current_table.volatile_get(i = current_table.hash_to_index(hash))) succeeded, new_value = current_table.try_to_cas_in_computed(i, hash, key) { yield } if succeeded increment_size return new_value end elsif (node_hash = node.hash) == MOVED current_table = node.key elsif NULL != (current_value = find_value_in_node_list(node, key, hash, node_hash & HASH_BITS)) return current_value elsif Node.locked_hash?(node_hash) try_await_lock(current_table, i, node) else succeeded, value = attempt_internal_compute_if_absent(key, hash, current_table, i, node, node_hash) { yield } return value if succeeded end end end
# File lib/thread_safe/atomic_reference_cache_backend.rb, line 431 def compute_if_present(key) new_value = nil internal_replace(key) do |old_value| if (new_value = yield(NULL == old_value ? nil : old_value)).nil? NULL else new_value end end new_value end
# File lib/thread_safe/atomic_reference_cache_backend.rb, line 493 def delete(key) replace_if_exists(key, NULL) end
# File lib/thread_safe/atomic_reference_cache_backend.rb, line 497 def delete_pair(key, value) result = internal_replace(key, value) { NULL } if result && NULL != result !!result else false end end
# File lib/thread_safe/atomic_reference_cache_backend.rb, line 506 def each_pair return self unless current_table = table current_table_size = base_size = current_table.size i = base_index = 0 while base_index < base_size if node = current_table.volatile_get(i) if node.hash == MOVED current_table = node.key current_table_size = current_table.size else begin if NULL != (value = node.value) # skip deleted or special nodes yield node.key, value end end while node = node.next end end if (i_with_base = i + base_size) < current_table_size i = i_with_base # visit upper slots if present else i = base_index += 1 end end self end
# File lib/thread_safe/atomic_reference_cache_backend.rb, line 537 def empty? size == 0 end
# File lib/thread_safe/atomic_reference_cache_backend.rb, line 473 def get_and_set(key, value) # internalPut in the original CHMV8 hash = key_hash(key) current_table = table || initialize_table while true if !(node = current_table.volatile_get(i = current_table.hash_to_index(hash))) if current_table.cas_new_node(i, hash, key, value) increment_size break end elsif (node_hash = node.hash) == MOVED current_table = node.key elsif Node.locked_hash?(node_hash) try_await_lock(current_table, i, node) else succeeded, old_value = attempt_get_and_set(key, value, hash, current_table, i, node, node_hash) break old_value if succeeded end end end
# File lib/thread_safe/atomic_reference_cache_backend.rb, line 377 def get_or_default(key, else_value = nil) hash = key_hash(key) current_table = table while current_table node = current_table.volatile_get_by_hash(hash) current_table = while node if (node_hash = node.hash) == MOVED break node.key elsif (node_hash & HASH_BITS) == hash && node.key?(key) && NULL != (value = node.value) return value end node = node.next end end else_value end
# File lib/thread_safe/atomic_reference_cache_backend.rb, line 399 def key?(key) get_or_default(key, NULL) != NULL end
# File lib/thread_safe/atomic_reference_cache_backend.rb, line 453 def merge_pair(key, value) internal_compute(key) do |old_value| if NULL == old_value || !(value = yield(old_value)).nil? value else NULL end end end
# File lib/thread_safe/atomic_reference_cache_backend.rb, line 467 def replace_if_exists(key, new_value) if (result = internal_replace(key) { new_value }) && NULL != result result end end
# File lib/thread_safe/atomic_reference_cache_backend.rb, line 463 def replace_pair(key, old_value, new_value) NULL != internal_replace(key, old_value) { new_value } end
# File lib/thread_safe/atomic_reference_cache_backend.rb, line 533 def size (sum = @counter.sum) < 0 ? 0 : sum # ignore transient negative values end
Private Instance Methods
# File lib/thread_safe/atomic_reference_cache_backend.rb, line 691 def attempt_compute(key, hash, current_table, i, node, node_hash) added = false current_table.try_lock_via_hash(i, node, node_hash) do predecessor_node = nil while true if node.matches?(key, hash) && NULL != (value = node.value) if NULL == (node.value = value = yield(value)) current_table.delete_node_at(i, node, predecessor_node) decrement_size value = nil end return true, value end predecessor_node = node unless node = node.next if NULL == (value = yield(NULL)) value = nil else predecessor_node.next = Node.new(hash, key, value) added = true increment_size end return true, value end end end ensure check_for_resize if added end
# File lib/thread_safe/atomic_reference_cache_backend.rb, line 721 def attempt_get_and_set(key, value, hash, current_table, i, node, node_hash) node_nesting = nil current_table.try_lock_via_hash(i, node, node_hash) do node_nesting = 1 old_value = nil found_old_value = false while node if node.matches?(key, hash) && NULL != (old_value = node.value) found_old_value = true node.value = value break end last = node unless node = node.next last.next = Node.new(hash, key, value) break end node_nesting += 1 end return true, old_value if found_old_value increment_size true end ensure check_for_resize if node_nesting && (node_nesting > 1 || current_table.size <= 64) end
# File lib/thread_safe/atomic_reference_cache_backend.rb, line 671 def attempt_internal_compute_if_absent(key, hash, current_table, i, node, node_hash) added = false current_table.try_lock_via_hash(i, node, node_hash) do while true if node.matches?(key, hash) && NULL != (value = node.value) return true, value end last = node unless node = node.next last.next = Node.new(hash, key, value = yield) added = true increment_size return true, value end end end ensure check_for_resize if added end
# File lib/thread_safe/atomic_reference_cache_backend.rb, line 607 def attempt_internal_replace(key, expected_old_value, hash, current_table, i, node, node_hash) current_table.try_lock_via_hash(i, node, node_hash) do predecessor_node = nil old_value = NULL begin if node.matches?(key, hash) && NULL != (current_value = node.value) if NULL == expected_old_value || expected_old_value == current_value # NULL == expected_old_value means whatever value old_value = current_value if NULL == (node.value = yield(old_value)) current_table.delete_node_at(i, node, predecessor_node) decrement_size end end break end predecessor_node = node end while node = node.next return true, old_value end end
If table is too small and not already resizing, creates next table and transfers bins. Rechecks occupancy after a transfer to see if another resize is already needed because resizings are lagging additions.
# File lib/thread_safe/atomic_reference_cache_backend.rb, line 793 def check_for_resize while (current_table = table) && MAX_CAPACITY > (table_size = current_table.size) && NOW_RESIZING != (size_ctrl = size_control) && size_ctrl < @counter.sum try_in_resize_lock(current_table, size_ctrl) do self.table = rebuild(current_table) (table_size << 1) - (table_size >> 1) # 75% load factor end end
# File lib/thread_safe/atomic_reference_cache_backend.rb, line 630 def find_value_in_node_list(node, key, hash, pure_hash) do_check_for_resize = false while true if pure_hash == hash && node.key?(key) && NULL != (value = node.value) return value elsif node = node.next do_check_for_resize = true # at least 2 nodes -> check for resize pure_hash = node.pure_hash else return NULL end end ensure check_for_resize if do_check_for_resize end
# File lib/thread_safe/atomic_reference_cache_backend.rb, line 749 def initialize_copy(other) super @counter = Util::Adder.new self.table = nil self.size_control = (other_table = other.table) ? other_table.size : DEFAULT_CAPACITY self end
Initializes table, using the size recorded in size_control
.
# File lib/thread_safe/atomic_reference_cache_backend.rb, line 774 def initialize_table until current_table ||= table if (size_ctrl = size_control) == NOW_RESIZING Thread.pass # lost initialization race; just spin else try_in_resize_lock(current_table, size_ctrl) do initial_size = size_ctrl > 0 ? size_ctrl : DEFAULT_CAPACITY current_table = self.table = Table.new(initial_size) initial_size - (initial_size >> 2) # 75% load factor end end end current_table end
# File lib/thread_safe/atomic_reference_cache_backend.rb, line 646 def internal_compute(key, &block) hash = key_hash(key) current_table = table || initialize_table while true if !(node = current_table.volatile_get(i = current_table.hash_to_index(hash))) succeeded, new_value = current_table.try_to_cas_in_computed(i, hash, key, &block) if succeeded if NULL == new_value break nil else increment_size break new_value end end elsif (node_hash = node.hash) == MOVED current_table = node.key elsif Node.locked_hash?(node_hash) try_await_lock(current_table, i, node) else succeeded, new_value = attempt_compute(key, hash, current_table, i, node, node_hash, &block) break new_value if succeeded end end end
Internal versions of the insertion methods, each a little more complicated than the last. All have the same basic structure:
1. If table uninitialized, create 2. If bin empty, try to CAS new node 3. If bin stale, use new table 4. Lock and validate; if valid, scan and add or update
The others interweave other checks and/or alternative actions:
* Plain +get_and_set+ checks for and performs resize after insertion. * compute_if_absent prescans for mapping without lock (and fails to add if present), which also makes pre-emptive resize checks worthwhile.
Someday when details settle down a bit more, it might be worth some factoring to reduce sprawl.
# File lib/thread_safe/atomic_reference_cache_backend.rb, line 587 def internal_replace(key, expected_old_value = NULL, &block) hash = key_hash(key) current_table = table while current_table if !(node = current_table.volatile_get(i = current_table.hash_to_index(hash))) break elsif (node_hash = node.hash) == MOVED current_table = node.key elsif (node_hash & HASH_BITS) != hash && !node.next # precheck break # rules out possible existence elsif Node.locked_hash?(node_hash) try_await_lock(current_table, i, node) else succeeded, old_value = attempt_internal_replace(key, expected_old_value, hash, current_table, i, node, node_hash, &block) return old_value if succeeded end end NULL end
# File lib/thread_safe/atomic_reference_cache_backend.rb, line 762 def key_hash(key) key.hash & HASH_BITS end
Returns a power of two table size for the given desired capacity.
# File lib/thread_safe/atomic_reference_cache_backend.rb, line 767 def table_size_for(entry_count) size = 2 size <<= 1 while size < entry_count size end
# File lib/thread_safe/atomic_reference_cache_backend.rb, line 757 def try_await_lock(current_table, i, node) check_for_resize # try resizing if can't get lock node.try_await_lock(current_table, i) end