[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
OCCAM-style java threads
I've had a go at implementing occam-style constructs and point-to-point
channels in Java. My test program looks like this:
--------------
| FEEDER |\
-------------- \
\
-------------- |--------|
| FEEDER |---| READER |
-------------- |--------|
/
-------------- /
| FEEDER |/
--------------
Three FEEDER processes continually send messages to a READER process which
communicates by ALTernation on its three input channels:
public class myprog {
PAR NETWORK;
public myprog() {
channel[] A = { new channel("0"), new channel("1"), new channel("2")};
Thread[] t = { new FEEDER(A[0],"nought"),
new FEEDER(A[1],"one"),
new FEEDER(A[2],"two") };
PAR FEEDERS = new PAR(t);
Thread[] u = {new READER(A), FEEDERS};
NETWORK = new PAR (u);
}
static public void main (String[] args) {
myprog m = new myprog();
m.NETWORK.start();
try {m.NETWORK.join();} catch (InterruptedException e) {};
}
}
class FEEDER extends Thread {
channel C;
String N;
public FEEDER(channel C2, String N2) {C = C2; N = N2;}
public void run () {
while (true) {
System.out.println("*** :-? Feeding: "+N);
C.write(N);
}
}
}
class READER extends Thread {
channel[] A;
public READER(channel[] A1) {A = A1;}
public void run () {
while (true) {
String X = new String();
READERBODY[] r = {new READERBODY(A[0]),
new READERBODY(A[1]),
new READERBODY(A[2])};
ALT k = new ALT(r);
k.start();
try {k.join();} catch (Throwable e) {};
}
}
}
class READERBODY extends guardedthread {
public READERBODY(channel C) {super(C);};
public void run() {System.out.println("*** :-) Read: "+readobject);}
}
The SEQ and PAR constructs are fairly easily defined:
// Implementation of OCCAM SEQ process.
public class SEQ extends Thread {
private Thread[] t;
public SEQ (Thread[] t2) {
t = t2;
}
public void run () {
for (int i = 0; i < t.length; i++) {
t[i].start();
try {t[i].join();} catch (InterruptedException e) {}
}
}
}
// Implementation of OCCAM PAR process.
public class PAR extends Thread {
private Thread[] t;
public PAR (Thread[] t2) {
t = t2;
}
public void run () {
for (int i = 0; i < t.length; i++) {
t[i].start();
}
for (int i = 0; i < t.length; i++) {
try {t[i].join();} catch (InterruptedException e) {}
}
}
}
ALT is more complicated and requires a "guarded thread" class to be
defined.
// Implementation of OCCAM ALT process
public class ALT extends Thread {
private guardedthread[] g;
public ALT (guardedthread[] g2) {
g = g2;
}
public void run () {
while (true) {
synchronized (this) {
for (int i = 0; i < g.length; i++) { // Try each branch in
if (g[i].guard) { // turn. If none is
if (g[i].chan == null) { // ready wait to be
g[i].start(); // notified.
try {g[i].join();}
catch (InterruptedException e) {}
return;
}
else if (g[i].chan.ready(this)) {
g[i].readobject = g[i].chan.read();
g[i].start();
try {g[i].join();}
catch (InterruptedException e) {}
return;
}
}
}
System.out.println("ALT process waiting");
try {wait ();} catch (InterruptedException e) {};
}
}
}
}
// Implementation of OCCAM guarded process (BOOL)& a?x -> P.
// Used only as a branch of an ALT process.
public class guardedthread extends Thread {
public boolean guard;
public channel chan;
public Object readobject;
public guardedthread(boolean g, channel c) {
guard = g; chan = c;
}
public guardedthread(channel c) {
guard = true; chan = c;
}
public guardedthread() {
guard = true; chan = null;
}
}
The channel definition is a simplified version of those discussed at the
workshop
// Implementation of OCCAM channel
public class channel {
boolean donewriting, waitingALTprocess;
ALT ALTprocess;
String name;
private Object o;
public channel() {
name = "DEFAULT";
donewriting = false;
waitingALTprocess = false;
}
public channel(String name2) {
name = name2;
donewriting = false;
waitingALTprocess = false;
}
public synchronized Object read() { // Read from a channel
if (!donewriting) {
System.out.println("Waiting to read on channel "+name);
try {
wait (); // Wait in channel queue until woken up
} // by method write
catch (InterruptedException e) {};
}
System.out.println("Reading on channel "+name);
donewriting = false;
notify ();
return o;
}
public synchronized void write (Object wo) { // Write to a channel
System.out.println("Writing on channel "+name);
o = wo;
donewriting = true;
notify ();
if (waitingALTprocess) { // If an ALT construct is waiting
synchronized (ALTprocess) { // to input from the channel
System.out.println("Waiting ALT process notified by "+name);
ALTprocess.notify(); // then notify it.
waitingALTprocess = false;
ALTprocess = null;
}
}
System.out.println("Waiting to exit write method on channel "+name);
try { // Now wait until the message has been read
wait (); // before returning.
}
catch (InterruptedException e) {};
System.out.println("Exiting write method on channel "+name);
return;
}
public synchronized boolean ready (ALT a) {
// An ALT process checks to see wheth the channel is ready to output
System.out.println("Seeing if ready channel "+name);
if (donewriting) {
return true;
}
else {
waitingALTprocess = true;
ALTprocess = a;
return false;
}
}
}
Here I am assuming that a channel may only be used by two concurrent threads
at any time. One for input the other for output. I am also banning
output guards from ALT statements. (This means that when a process is
ready to output on a channel there can be no other option for its
behaviour).
When the outputting thread wishes to write to the channel it invokes the
write method. It is not allowed to return until the message has been read
by the inputting process. Similarly a reading process must wait until a
message becomes available. Alternation is more complicated. An ALT
process checks to see whether any of its input channels are ready? If not
it goes to sleep and asks each of the input channels to wake it up when
they are ready. As soon as a channel becomes ready it performs a read
method on that channel.
Here is some sample output from the program, interleaved messages fron the
various channels and the ALT process, with all the debugging messages included:
Seeing if ready channel 0
Seeing if ready channel 1
Seeing if ready channel 2
ALT process waiting
*** :-? Feeding: nought
Writing on channel 0
Waiting ALT process notified by 0
Waiting to exit write method on channel 0
*** :-? Feeding: one
Writing on channel 1
*** :-? Feeding: two
Writing on channel 2
Waiting ALT process notified by 1
Waiting to exit write method on channel 1
Waiting ALT process notified by 2
Waiting to exit write method on channel 2
Seeing if ready channel 0
Reading on channel 0
Exiting write method on channel 0
*** :-? Feeding: nought
Writing on channel 0
Waiting to exit write method on channel 0
*** :-) Read: nought
Seeing if ready channel 0
Reading on channel 0
Exiting write method on channel 0
*** :-? Feeding: nought
Writing on channel 0
Waiting to exit write method on channel 0
*** :-) Read: nought
Seeing if ready channel 0
Reading on channel 0
Exiting write method on channel 0
Note that my ALT is currently unfair, so channel 0 is always favoured.
I'd be interested to know what people think about the potential for this
slightly different approach.
Jeremy