Yusong's Blog Don't Panic

Concurrent Data Structure

2017-09-17

This note talks about Optimistic Concurrency, Concurrent Queue data structure, the notion about Linearizability and Concurrent Hash Map. Finally, this week’s homework is about a concurrent algorithm to find a Minimum Spaning Tree in an undirected graph called Boruvka algorithm.
– Concurrent Programming in Java, Week4, Coursera

Concurrent Data Structure

Optimistic Concurrency

As an example, we considered how the getAndAdd() method is typically implemented for a shared AtomicInteger object. The basic idea is to allow multiple threads to read the existing value of the shared object (curVal) without any synchronization, and also to compute its new value after the addition (newVal) without synchronization. These computations are performed optimistically under the assumption that no interference will occur with other threads during the period between reading curVal and computing newVal. However, it is necessary for each thread to confirm this assumption by using the compareAndSet() method (Atomically). Furthermore, each thread will invoke an operation like compareAndSet() repeatedly in a loop until the operation succeeds.

In general, so long as the contention on a single shared object like A is not high, the number of calls to compareAndSet() that return false will be very small, and the optimistic concurrency approach can perform much better in practice (but at the cost of more complex code logic) than using locks, isolation, or actors.

https://en.wikipedia.org/wiki/Optimistic_concurrency_control

Cocurrent Queues

A common approach for such an implementation(Optimistic Concurrency) is to replace an object reference like tail by an AtomicReference. Since the compareAndSet() method can also be invoked on AtomicReference objects, we can use it to support (for example) concurrent calls to enq() by identifying which calls to compareAndSet() succeeded, and repeating the calls that failed.

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/atomic/AtomicReference.html

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ConcurrentLinkedQueue.html

Linearizability

The key question answered by linearizability is what return values are permissible when multiple threads perform these operations in parallel, taking into account what we know about the expected return values from those operations when they are performed sequentially. As an example, we considered two threads, T1 and T2, performing enq(x) and enq(y) operations in parallel on a shared concurrent queue data structure, and considered what values can be returned by a deq() operation performed by T2 after the call to enq(y). From the viewpoint of linearizability, it is possible for the deq() operation to return item x or item y.

https://en.wikipedia.org/wiki/Linearizability

Concurrent HashMap

Architecture of ConcurrentHashMap looks like below, ConcurrentHashMap

http://javabypatel.blogspot.in/2016/09/concurrenthashmap-interview-questions.html

https://en.wikipedia.org/wiki/Java_ConcurrentMap

Borvka’s algorithm

Concurrent algorithm to find a minimum-cost spanning tree (MST) for an undirected graph.

(Spoiler! Proceed with caution)

/**
 * Sequential implementatin of Boruvka's minimum spanning tree algorithm.
 */
public final class SeqBoruvka extends AbstractBoruvka<SeqComponent> {

    /**
     * Constructor.
     */
    public SeqBoruvka() {
        super();
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void computeBoruvka(final Queue<SeqComponent> nodesLoaded,
            final SolutionToBoruvka<SeqComponent> solution) {
        SeqComponent loopNode = null;

        // START OF EDGE CONTRACTION ALGORITHM
        while (!nodesLoaded.isEmpty()) {

            /*
             * poll() removes first element (node loopNode) from the nodesLoaded
             * work-list.
             */
            loopNode = nodesLoaded.poll();

            if (loopNode.isDead) {
                continue; // node loopNode has already been merged
            }

            // retrieve loopNode's edge with minimum cost
            final Edge<SeqComponent> e = loopNode.getMinEdge();
            if (e == null) {
                break; // done - we've contracted the graph to a single node
            }

            final SeqComponent other = e.getOther(loopNode);
            other.isDead = true;
            // merge node other into node loopNode
            loopNode.merge(other, e.weight());
            // add newly merged loopNode back in the work-list
            nodesLoaded.add(loopNode);

        }
        // END OF EDGE CONTRACTION ALGORITHM

        solution.setSolution(loopNode);
    }
}
/**
 * A parallel implementation of Boruvka's algorithm to compute a Minimum
 * Spanning Tree.
 */
    public void computeBoruvka(final Queue<ParComponent> nodesLoaded,
            final SolutionToBoruvka<ParComponent> solution) {
        ParComponent loopNode = null;

        // START OF EDGE CONTRACTION ALGORITHM
        while (!nodesLoaded.isEmpty()) {

            /*
             * poll() removes first element (node loopNode) from the nodesLoaded
             * work-list.
             */
            loopNode = nodesLoaded.poll();

            if (loopNode == null || !loopNode.lock.tryLock()) {
                continue;
            }

            if (loopNode.isDead) {
                loopNode.lock.unlock();
                continue; // node loopNode has already been merged
            }

            // retrieve loopNode's edge with minimum cost
            final Edge<ParComponent> e = loopNode.getMinEdge();
            if (e == null) {
                // add global lock to ensure only call setSolution once
                if (solution.lock.tryLock()) {
                    solution.setSolution(loopNode);
                    solution.lock.unlock();
                }
                break;
            }

            final ParComponent other = e.getOther(loopNode);

            if (!other.lock.tryLock()) {
                loopNode.lock.unlock();
                nodesLoaded.add(loopNode);
                continue;
            }

            if (other.isDead) {
                other.lock.unlock();
                loopNode.lock.unlock();
                nodesLoaded.add(loopNode);
                continue;
            }

            other.isDead = true;
            // merge node other into node loopNode
            loopNode.merge(other, e.weight());

            loopNode.lock.unlock();
            other.lock.unlock();

            // add newly merged loopNode back in the work-list
            nodesLoaded.add(loopNode);

        }

    }

Get update from Yusong's blog by Email on → Feedburner

上一篇 Actor Model

Comments

Content