Yusong's Blog Don't Panic

Dataflow Synchronization and Pipelining

2017-08-25

Coursera - Parallel Programming in Java Week4

Split-phase Barriers with Java Phasers

forall (i : [0:n-1]) { 
  print HELLO, i;
  myId = lookup(i); // convert int to a string 
  print BYE, myId;
}
// initialize phaser ph for use by n tasks ("parties") 
Phaser ph = new Phaser(n);
// Create forall loop with n iterations that operate on ph 
forall (i : [0:n-1]) {
  print HELLO, i;
  int phase = ph.arrive();
  
  myId = lookup(i); // convert int to a string

  ph.awaitAdvance(phase);
  print BYE, myId;
}

Java Phaser Documentation

Point-to-Point Synchronization with Phasers

Task 0 Task 1 Task 2
1a:X=A();//cost=1 1b:Y=B();//cost=2 1c:Z=C();//cost=3
2a:ph0.arrive(); 2b:ph1.arrive(); 2c:ph2.arrive();
3a:ph1.awaitAdvance(0); 3b:ph0.awaitAdvance(0); 3c:ph1.awaitAdvance(0);
4a:D(X,Y);//cost=3 4b:ph2.awaitAdvance(0); 4c:F(Y,Z);//cost=1
  5b:E(X,Y,Z);//cost=2  

the span (critical path length) would be 6 units of time if we used a barrier, but is reduced to 5 units of time if we use individual phasers.

One-Dimensional Iterative Averaging with Phasers

// Allocate array of phasers
Phaser[] ph = new Phaser[n+2]; //array of phasers
for (int i = 0; i < ph.length; i++) ph[i] = new Phaser(1);

// Main computation 
forall ( i: [1:n-1]) {
  for (iter: [0:nsteps-1]) {
    newX[i] = (oldX[i-1] + oldX[i+1]) / 2;
    ph[i].arrive();
    
    if (index > 1) ph[i-1].awaitAdvance(iter);
    if (index < n-1) ph[i + 1].awaitAdvance(iter); 
    swap pointers newX and oldX;
  }
}

Using grouping:

// Allocate array of phasers proportional to number of chunked tasks 
Phaser[] ph = new Phaser[tasks+2]; //array of phasers
for (int i = 0; i < ph.length; i++) ph[i] = new Phaser(1);

// Main computation 
forall ( i: [0:tasks-1]) {
  for (iter: [0:nsteps-1]) {
    // Compute leftmost boundary element for group
    int left = i * (n / tasks) + 1;
    myNew[left] = (myVal[left - 1] + myVal[left + 1]) / 2.0;
    
    // Compute rightmost boundary element for group 
    int right = (i + 1) * (n / tasks);
    myNew[right] = (myVal[right - 1] + myVal[right + 1]) / 2.0;
    
    // Signal arrival on phaser ph AND LEFT AND RIGHT ELEMENTS ARE AV 
    int index = i + 1;
    ph[index].arrive();
    
    // Compute interior elements in parallel with barrier 
    for (int j = left + 1; j <= right - 1; j++)
      myNew[j] = (myVal[j - 1] + myVal[j + 1]) / 2.0;
    // Wait for previous phase to complete before advancing 
    if (index > 1) ph[index - 1].awaitAdvance(iter);
    if (index < tasks) ph[index + 1].awaitAdvance(iter);
    swap pointers newX and oldX;
  }
}

Pipeline Parallelism

The synchronization required for pipeline parallelism can be implemented using phasers by allocating an array of phasers, such that phaser 𝚙𝚑[𝚒] is “signalled” in iteration i by a call to 𝚙𝚑[𝚒].𝚊𝚛𝚛𝚒𝚟𝚎() as follows:

// Code for pipeline stage i
while ( there is an input to be processed ) {
  // wait for previous stage, if any 
  if (i > 0) ph[i - 1].awaitAdvance(); 
  
  process input;
  
  // signal next stage
  ph[i].arrive();
}

Data Flow Parallelism

The simple data flow graph studied in the lecture consisted of five nodes and four edges: A → C, A → D, B → D, B → E.

we introduced the asyncAwait notation to specify a task along with an explicit set of preconditions (events that the task must wait for before it can start execution):

async( () -> {/* Task A */; A.put(); } ); // Complete task and trigger event A
async( () -> {/* Task B */; B.put(); } ); // Complete task and trigger event B
asyncAwait(A, () -> {/* Task C */} );       // Only execute task after event A is triggered 
asyncAwait(A, B () -> {/* Task D */} );   // Only execute task after events A, B are triggered 
asyncAwait(B, () -> {/* Task E */} );       // Only execute task after event B is triggered

Actually, the order of the above statements is not significant.


Get update from Yusong's blog by Email on → Feedburner

上一篇 Loop Parallelism

Comments

Content