- Split-phase Barriers with Java Phasers
- Point-to-Point Synchronization with Phasers
- One-Dimensional Iterative Averaging with Phasers
- Pipeline Parallelism
- Data Flow Parallelism
Coursera - Parallel Programming in Java Week4
Split-phase Barriers with Java Phasers
forall (i : [0:n-1]) {
print HELLO, i;
myId = lookup(i); // convert int to a string
print BYE, myId;
}
// initialize phaser ph for use by n tasks ("parties")
Phaser ph = new Phaser(n);
// Create forall loop with n iterations that operate on ph
forall (i : [0:n-1]) {
print HELLO, i;
int phase = ph.arrive();
myId = lookup(i); // convert int to a string
ph.awaitAdvance(phase);
print BYE, myId;
}
Point-to-Point Synchronization with Phasers
Task 0 | Task 1 | Task 2 |
---|---|---|
1a:X=A();//cost=1 | 1b:Y=B();//cost=2 | 1c:Z=C();//cost=3 |
2a:ph0.arrive(); | 2b:ph1.arrive(); | 2c:ph2.arrive(); |
3a:ph1.awaitAdvance(0); | 3b:ph0.awaitAdvance(0); | 3c:ph1.awaitAdvance(0); |
4a:D(X,Y);//cost=3 | 4b:ph2.awaitAdvance(0); | 4c:F(Y,Z);//cost=1 |
5b:E(X,Y,Z);//cost=2 |
the span (critical path length) would be 6 units of time if we used a barrier, but is reduced to 5 units of time if we use individual phasers.
One-Dimensional Iterative Averaging with Phasers
// Allocate array of phasers
Phaser[] ph = new Phaser[n+2]; //array of phasers
for (int i = 0; i < ph.length; i++) ph[i] = new Phaser(1);
// Main computation
forall ( i: [1:n-1]) {
for (iter: [0:nsteps-1]) {
newX[i] = (oldX[i-1] + oldX[i+1]) / 2;
ph[i].arrive();
if (index > 1) ph[i-1].awaitAdvance(iter);
if (index < n-1) ph[i + 1].awaitAdvance(iter);
swap pointers newX and oldX;
}
}
Using grouping:
// Allocate array of phasers proportional to number of chunked tasks
Phaser[] ph = new Phaser[tasks+2]; //array of phasers
for (int i = 0; i < ph.length; i++) ph[i] = new Phaser(1);
// Main computation
forall ( i: [0:tasks-1]) {
for (iter: [0:nsteps-1]) {
// Compute leftmost boundary element for group
int left = i * (n / tasks) + 1;
myNew[left] = (myVal[left - 1] + myVal[left + 1]) / 2.0;
// Compute rightmost boundary element for group
int right = (i + 1) * (n / tasks);
myNew[right] = (myVal[right - 1] + myVal[right + 1]) / 2.0;
// Signal arrival on phaser ph AND LEFT AND RIGHT ELEMENTS ARE AV
int index = i + 1;
ph[index].arrive();
// Compute interior elements in parallel with barrier
for (int j = left + 1; j <= right - 1; j++)
myNew[j] = (myVal[j - 1] + myVal[j + 1]) / 2.0;
// Wait for previous phase to complete before advancing
if (index > 1) ph[index - 1].awaitAdvance(iter);
if (index < tasks) ph[index + 1].awaitAdvance(iter);
swap pointers newX and oldX;
}
}
Pipeline Parallelism
The synchronization required for pipeline parallelism can be implemented using phasers by allocating an array of phasers, such that phaser 𝚙𝚑[𝚒] is “signalled” in iteration i by a call to 𝚙𝚑[𝚒].𝚊𝚛𝚛𝚒𝚟𝚎() as follows:
// Code for pipeline stage i
while ( there is an input to be processed ) {
// wait for previous stage, if any
if (i > 0) ph[i - 1].awaitAdvance();
process input;
// signal next stage
ph[i].arrive();
}
Data Flow Parallelism
The simple data flow graph studied in the lecture consisted of five nodes and four edges: A → C, A → D, B → D, B → E.
we introduced the asyncAwait notation to specify a task along with an explicit set of preconditions (events that the task must wait for before it can start execution):
async( () -> {/* Task A */; A.put(); } ); // Complete task and trigger event A
async( () -> {/* Task B */; B.put(); } ); // Complete task and trigger event B
asyncAwait(A, () -> {/* Task C */} ); // Only execute task after event A is triggered
asyncAwait(A, B () -> {/* Task D */} ); // Only execute task after events A, B are triggered
asyncAwait(B, () -> {/* Task E */} ); // Only execute task after event B is triggered
Actually, the order of the above statements is not significant.