This note talks about Optimistic Concurrency, Concurrent Queue data structure, the notion about Linearizability and Concurrent Hash Map. Finally, this week’s homework is about a concurrent algorithm to find a Minimum Spaning Tree in an undirected graph called Boruvka algorithm.
– Concurrent Programming in Java, Week4, Coursera
Concurrent Data Structure
Optimistic Concurrency
As an example, we considered how the getAndAdd() method is typically implemented for a shared AtomicInteger object. The basic idea is to allow multiple threads to read the existing value of the shared object (curVal) without any synchronization, and also to compute its new value after the addition (newVal) without synchronization. These computations are performed optimistically under the assumption that no interference will occur with other threads during the period between reading curVal and computing newVal. However, it is necessary for each thread to confirm this assumption by using the compareAndSet() method (Atomically). Furthermore, each thread will invoke an operation like compareAndSet() repeatedly in a loop until the operation succeeds.
In general, so long as the contention on a single shared object like A is not high, the number of calls to compareAndSet() that return false will be very small, and the optimistic concurrency approach can perform much better in practice (but at the cost of more complex code logic) than using locks, isolation, or actors.
https://en.wikipedia.org/wiki/Optimistic_concurrency_control
Cocurrent Queues
A common approach for such an implementation(Optimistic Concurrency) is to replace an object reference like tail by an AtomicReference. Since the compareAndSet() method can also be invoked on AtomicReference objects, we can use it to support (for example) concurrent calls to enq() by identifying which calls to compareAndSet() succeeded, and repeating the calls that failed.
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/atomic/AtomicReference.html
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ConcurrentLinkedQueue.html
Linearizability
The key question answered by linearizability is what return values are permissible when multiple threads perform these operations in parallel, taking into account what we know about the expected return values from those operations when they are performed sequentially. As an example, we considered two threads, T1 and T2, performing enq(x) and enq(y) operations in parallel on a shared concurrent queue data structure, and considered what values can be returned by a deq() operation performed by T2 after the call to enq(y). From the viewpoint of linearizability, it is possible for the deq() operation to return item x or item y.
https://en.wikipedia.org/wiki/Linearizability
Concurrent HashMap
Architecture of ConcurrentHashMap looks like below,
http://javabypatel.blogspot.in/2016/09/concurrenthashmap-interview-questions.html
https://en.wikipedia.org/wiki/Java_ConcurrentMap
Borvka’s algorithm
Concurrent algorithm to find a minimum-cost spanning tree (MST) for an undirected graph.
(Spoiler! Proceed with caution)
/**
* Sequential implementatin of Boruvka's minimum spanning tree algorithm.
*/
public final class SeqBoruvka extends AbstractBoruvka<SeqComponent> {
/**
* Constructor.
*/
public SeqBoruvka() {
super();
}
/**
* {@inheritDoc}
*/
@Override
public void computeBoruvka(final Queue<SeqComponent> nodesLoaded,
final SolutionToBoruvka<SeqComponent> solution) {
SeqComponent loopNode = null;
// START OF EDGE CONTRACTION ALGORITHM
while (!nodesLoaded.isEmpty()) {
/*
* poll() removes first element (node loopNode) from the nodesLoaded
* work-list.
*/
loopNode = nodesLoaded.poll();
if (loopNode.isDead) {
continue; // node loopNode has already been merged
}
// retrieve loopNode's edge with minimum cost
final Edge<SeqComponent> e = loopNode.getMinEdge();
if (e == null) {
break; // done - we've contracted the graph to a single node
}
final SeqComponent other = e.getOther(loopNode);
other.isDead = true;
// merge node other into node loopNode
loopNode.merge(other, e.weight());
// add newly merged loopNode back in the work-list
nodesLoaded.add(loopNode);
}
// END OF EDGE CONTRACTION ALGORITHM
solution.setSolution(loopNode);
}
}
/**
* A parallel implementation of Boruvka's algorithm to compute a Minimum
* Spanning Tree.
*/
public void computeBoruvka(final Queue<ParComponent> nodesLoaded,
final SolutionToBoruvka<ParComponent> solution) {
ParComponent loopNode = null;
// START OF EDGE CONTRACTION ALGORITHM
while (!nodesLoaded.isEmpty()) {
/*
* poll() removes first element (node loopNode) from the nodesLoaded
* work-list.
*/
loopNode = nodesLoaded.poll();
if (loopNode == null || !loopNode.lock.tryLock()) {
continue;
}
if (loopNode.isDead) {
loopNode.lock.unlock();
continue; // node loopNode has already been merged
}
// retrieve loopNode's edge with minimum cost
final Edge<ParComponent> e = loopNode.getMinEdge();
if (e == null) {
// add global lock to ensure only call setSolution once
if (solution.lock.tryLock()) {
solution.setSolution(loopNode);
solution.lock.unlock();
}
break;
}
final ParComponent other = e.getOther(loopNode);
if (!other.lock.tryLock()) {
loopNode.lock.unlock();
nodesLoaded.add(loopNode);
continue;
}
if (other.isDead) {
other.lock.unlock();
loopNode.lock.unlock();
nodesLoaded.add(loopNode);
continue;
}
other.isDead = true;
// merge node other into node loopNode
loopNode.merge(other, e.weight());
loopNode.lock.unlock();
other.lock.unlock();
// add newly merged loopNode back in the work-list
nodesLoaded.add(loopNode);
}
}