[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Java threads and occam processes



This email is folded ... best viewed with Origami/F ...

Cheers,

Peter.

(cut here)
------------------------------------------------------------------------------
//{{{}}}

//{{{  SUMMARY

I finally read Sun's implementation of its Piped I/O Stream classes properly
and think I now understand it.  There is a simple explanation of why Oyvind's
ComsTime program cycles every 1.5 to 2.5 seconds and a simple way to reduce
that to around 30 milliseconds (for a UNIX Java Platform).

The Piped I/O Streams implement blocking FIFO communication (rather clumsily)
and not an occam channel.  Enclosed are *better* classes both for a proper
occam channel and for a blocking FIFO.  Both of these reduce the ComsTime
cycle time to around 5.5 milliseconds.  That's roughly a 1000-fold speedup
on using Piped I/O Streams ... it's only now 1000-times slower than (KRoC)
occam running on the same UNIX workstation ...

//}}}
//{{{  the trouble with Piped I/O Stream

Well, it's too complicated and inefficient:

//{{{  PipedOutputStream is only a front for PipedInputStream

The out method in PipedOutputStream is a front for the receive method in
PipedInputStream.  PipedOutputStream can therefore be discarded in favour
of giving public access to that method.  Then, we need only the concept
of `PipedStream' and life will be simpler.

The only reason I can think of for distinguishing PipedInputStream from
PipedOutputStream is so that we can give the `PipedStream' to a writer
so that it can't see the read method and vice-versa.  That has some merit.

But a simpler way to do that would be to derive a `PipedOutputStream' class
from `PipedStream' that simply hides its read method (and vice-versa).
I need to check whether Java lets us do that kind of thing ... ???

//}}}
//{{{  the code for the circular buffer is weird

PipedInputStream contains a circular buffer.  It maintains pointers to
the next free slot (`in') and the oldest slot (`out') and tries to use
these to determine whether the buffer is full, empty or neither.  This
results in some pretty obscure code, since `(in == out)' could mean
either empty or full.  The code could be made crystal clear by also
maintaining a size count.

//}}}
//{{{  the Thread variables don't seem to be properly maintained

PipedInputStream declares two Thread variables (`readSide' and `writeSide'),
but don't panic ... no new threads are set up with them ... they are only
used to get handles on the (latest) threads that are calling the read/write
methods.  With these handles, a blocked reader can ask if the last writer is
still alive etc ... but these checks look a bit hit-or-miss to me ... so I
wouldn't put too much faith in them ... e.g the last writer may have died,
but there could still be other writers who are alive?

//}}}
//{{{  blocking is implemented by `busy' waiting loops

This is the killer so far as overheads and latency are concerned!

When the buffer is empty, a reader can't read and must be blocked.  When
the buffer is full, a writer can't write and must be blocked.

This blocking is implemented by a `busy' waiting loop on the emptiness
(respectively fullness) of the buffer.  The body of this loop contains
a "wait (1000)" call that puts it to sleep for (at least) 1 second ...
the parameter to "wait" representing milliseconds.  The waiting loop,
therefore, checks its exit condition at most once per second!  I wonder
what Sun had in mind when they wrote this?

//{{{  effect on the ComsTime benchmark

The ComsTime program has four processes trying to communicate through
these blocking buffers.  The buffers have 1024 elements and never get
full ... there is only one packet flying around the feedback loop in
the benchmark, so the buffers never hold more than one element.  BUT,
they do get empty.  Almost all reads will find an empty buffer and, so,
drop into the once-per-second-waiting-loop.  Fortunately, most of these
waits are in parallel with each other.

To complete a cycle, a packet has to get through four buffers.  This
completely explains the cycle times for ComsTimes, which range between
1.2 and 2.5 seconds.

//}}}
//{{{  reducing the wait to the minimum

The minimun wait settable is "wait (0, 1)", which represents one nanosecond.
However, Java on Unix platforms (like KRoC/SPoC on UNIX platforms) will have
to use timers provided by UNIX.  Unless they know something we don't (and
they may), the best UNIX provides are time values that may be expressed in
nanoseconds, but which are actually incremented only every 10 milliseconds.
Therefore, a "wait(0, 1)" will be effectively the same as a wait pretty close
to "wait(10)".  Timings on other platforms (e.g. Windows or the real Java
processor from Sun) will hopefully increment time more frequently ...

On our UNIX platform (a Sun SPARC ???), all times between "wait(0, 1)" and
"wait(9, 99999)" give the same results for the ComsTime cycle ... which is
between 30.3 and 30.8 milliseconds.

There is reason to believe (see below) that Java can do this in around 5.5
milliseconds, so the above results imply that the four parallel waits only
partially overlap giving roughly 2.5 sequential 10 millisecond waits.  This
seems very plausible.

//}}}
//{{{  deleting the wait won't work

Reducing the wait to zero or simply deleting it doesn't work.  Without the
wait, the reader reamains in the busy-waiting loop continuously.  During this
wait, it has control of the object monitor ... the wait is (and has to be)
inside a `synchronized' method.  Since it never releases this monitor, the
writer can't get in to change the empty-condition upon which the reader is
waiting ... and we have livelock!

//}}}

Any occamist could program a block properly (i.e. without `busy' waiting).
See below!

//}}}

//}}}
//{{{  quick summary of Java threads

//{{{  disclaimer

This is my understanding of the thread methods used below.  This may be
completely wrong ... if anyone knows, please tell me!  It's based on the
tiny amount of information gleaned from 5 text books and the Java `white
pages' report, an intuitive guess as to what the method names might really
be doing and the fact that it seems to work.

//}}}
//{{{  Java threads/objects versus occam processes

Java threads and objects are distinct things.  An object has data and methods.
A thread can snake around anywhere, calling a method of one object some time
and a method of a different object another time.  From the point of view of
an object, its methods are not (normally) under its control but are potential
fragments of any threads that care to call them.  That doesn't seem a very
`object-oriented' scheme of things.

What we are trying to do is approach the occam model, where an object has its
own data and its own thread (or threads) that live entirley inside the object
and give it its life.  This does seem a much more `object-oriented' scheme of
things!

Reconciling these philosophies is tricky.  What we are trying to do is find
the mechanisms for working with Java that give us the occam model and, hence,
semantic clarity.  That would be a most valuable contribution from the occam
community.

//}}}
//{{{  yield, suspend and resume

Executing `yield ()' is like executing `DESCHEDULE ()', or `PAR (SKIP, SKIP)'
in occam.  It just deschedules the current thread to the back of the run-queue.

Executing `suspend ()' deschedules the current thread.  To reschedule it,
another thread has to execute a `name.resume ()', where `name' is a thread
varaible holding the suspended thread (a bit like a transputer process
descriptor).  This other thread has to have access to that `name' variable.

Because Java threads can be descheduled any time the underlying threads
kernel sees fit, data accessible by two or more threads cannot be managed
by those threads carefully using just these methods ... (?)

I've not used them at all in the implementation of the occam channel.

//}}}
//{{{  synchronised methods (and synchronised blocks of code)

Object data being accessed by multiple threads needs protection.  Methods
that may be called by multiple threads to read/update object data should
be defined as `synchronized'.  Synchronized methods for any particular
object are controlled though a monitor -- i.e. only one such synchronized
method (per object) can be executing at any time.  A thread that calls a
synchronized method of some object is put on the SYNC-QUEUE for that object.

So each object maintains a SYNC-QUEUE through which it can control use of
its synchronized methods.

Once a thread gets to the front of an object's SYNC-QUEUE and starts
executing the synchronized method, it normally completes the method and
goes on to do something else.  As it completes the method, the next thread
on the SYNC-QUEUE will be scheduled to execute another of that object's
synchronized methods.

//}}}
//{{{  wait/notify

However, during execution of a synchronized method, a thread may execute a
`wait ()'.  This -- and here my understanding is guesswork -- causes that
thread to block and to be put on a WAIT-QUEUE (also belonging to the object
whose synchronized method was being executed).  At which point, the monitor
is released and another thread on the SYNC-QUEUE for the object is scheduled.

To unblock a thread on the WAIT-QUEUE of an object, another thread has to
execute a `notify ()' method for that object.  This puts the first thread on
the WAIT-QUEUE of that object back on to the SYNC-QUEUE for that object.
When its turn comes, it regains the monitor and resumes execution of the
synchronised method at the point just after it had executed the `wait ()'.

[Can these semantics really be right ... they seem to work though ???]

The thread executing the `notify ()' usually does it during a synchronized
method -- because it only then has the information necessary to decide on
such a release.  The thread doesn't know which thread is released and had
better not care.  I imagine the thread executing the `notify ()' continues
running -- i.e. retains control of the monitor -- but I'm not sure.  It
would simplify coding if that were definitely true ... (Help please!).

//}}}
//{{{  more on wait/notify

A thread may also execute a `wait (millisecs)' or `wait (millisecs, nanosecs)'.
This seems to put the thread on two queues: a TIMER-QUEUE and the WAIT-QUEUE
for the object whose synchronized method was being executed.  Such a thread
gets back on to the SYNC-QUEUE for the object either by the time-out occuring
or by being at the front of the WAIT-QUEUE when someone else executed the
`notify ()' method for that object.

There is also a `notifyAll' method which releases all threads on the WAIT-QUEUE
back on to the SYNC-QUEUE for the object.

Executing a `notify ()' or `notifyAll' when the WAIT-QUEUE is empty is OK,
but is probably done by the desperate (?).

Executing a `wait ()' or `notify ()' outside a synchronized method does ... ?

//}}}
//{{{  putting it together

A thread may be:

  running : either actually running or on the run-queue (which, from the
            point of view of designing code, is the same thing);

  blocked : on the SYNC-QUEUE for some object (either through trying to
            execute one of its synchronized methods and some other thread
            got their first ... or through having been through a wait/notify
            cycle);

  blocked : on the WAIT-QUEUE for some object (and, possibly, on a global
            TIMER-QUEUE as well).  This happens by calling a wait method
            whilst inside a synchronized method.

Humm ... :-) ... ?!!
//}}}

//}}}
//{{{  a zero-buffered synchronised (i.e. occam) channel class for Java

It *is* possible to implement a fully synchronised zero-buffered occam channel
in Java without busy-waiting.  Consider:

//{{{  CHAN_OF_INT.java

import java.lang.InterruptedException;  // needed by the standard wait method

public class CHAN_OF_INT {

  //{{{  COMMENT documentation
  //
  //CHAN_OF_INT implements an occam CHAN OF INT.  There is full synchronisation
  //between the outputting and inputting thread.  It assumes that the connection
  //is point-to-point ... no checks are made on this, but there could be!
  //
  //There is no logical buffering of data in the channel.  However, each int
  //is actually copied three or four times with this implementation!  I can't
  //see how to do this with less copying.  Ideally, we ought to copy the message
  //directly from the outputting thread to the inputting thread ... just like
  //in occam ...
  //
  //}}}

  //{{{  local state
  int channel_hold;
  boolean channel_empty = true;
  //}}}

  //{{{  public synchronized void out (int n) throws InterruptedException {
  public synchronized void out (int n) throws InterruptedException {
    if (channel_empty) {
      //{{{  first to the rendezvous
      channel_empty = false;
      channel_hold = n;                // second copy of the message
      wait ();                         // wait for the input process
      channel_empty = true;
      //}}}
    } else {
      //{{{  second to the rendezvous
      channel_hold = n;                // can we copy this directly to the receiver?
      notify ();                       // schedule the waiting input process
      //}}}
    }
  }
  //}}}
  //{{{  public synchronized int in () throws InterruptedException {
  public synchronized int in () throws InterruptedException {
    if (channel_empty) {
      //{{{  first to the rendezvous
      channel_empty = false;
      wait ();                         // wait for the output process
      channel_empty = true;
      return channel_hold;             // third copy of the message
      //}}}
    } else {
      //{{{  second to the rendezvous
      int temporary = channel_hold;    // can this be avoided?
      notify ();                       // schedule the waiting output process
      return temporary;
      //}}}
    }
  }
  //}}}

}

//}}}

It assumes it is used for point-to-point connection only -- i.e. no multiple
readers or multiple writers.  It could be extended to allow that though.

But as it stands, it is very simple.  It just follows, as best it can, the
way channels are implemented on the transputer.

Oyvind's version of the context-switch benchmark (I've just changed some
names and made the consumer process an object/thread on an equal footing
to the others) becomes:

//{{{  ComsTime.java
import java.util.*;

//{{{  PROC Prefix (VAL INT n, CHAN OF INT in, out)
// yen.Teig 20.3.96
// modified Welch 8.5.96

class Prefix extends Thread {
  private CHAN_OF_INT in;
  private CHAN_OF_INT out;
  private int n;

  //{{{  constructor
  Prefix (int n, CHAN_OF_INT in, CHAN_OF_INT out) {
    // setName ("Prefix");
    this.n = n;
    this.in = in;
    this.out = out;
    start ();
  }
  //}}}
  //{{{  run
  public void run () {
    // System.out.println ("    " + getName() + " " + toString());
    try {
      //{{{  main code in here
      out.out (n);                      //          SEQ
      while (true)                      //            out ! n
      {                                 //            WHILE TRUE
        int value;                      //              INT value:
        value = in.in ();               //              SEQ
        out.out (value);                //                in ? value
      }                                 //                out ! value
      //}}}
    }
    catch (InterruptedException caught) {
      System.out.println ("    " + getName() + " method run exception");
    }
  }
  //}}}
}
//}}}
//{{{  PROC Delta (CHAN OF INT in, CHAN OF INT out.0, out.1)
// yen.Teig 20.3.96
// modified Welch 8.5.96

class Delta extends Thread {
  private CHAN_OF_INT in;
  private CHAN_OF_INT out_0;
  private CHAN_OF_INT out_1;

  //{{{  constructor
  Delta (CHAN_OF_INT in, CHAN_OF_INT out_0, CHAN_OF_INT out_1) {
    // setName ("Delta");
    this.in = in;
    this.out_0 = out_0;
    this.out_1 = out_1;
    start ();
  }
  //}}}
  //{{{  run
  public void  run () {
    // System.out.println ("    " + getName() + " " + toString());
    try {
      //{{{  main code in here
      while (true)                      //          WHILE TRUE
      {                                 //            INT value:
        int value;                      //            SEQ
        value = in.in ();               //              in ? value
        out_0.out (value);              //              out.0 ! value  -- should be
        out_1.out (value);              //              out.1 ! value  -- in PAR !!
      }
      //}}}
    }
    catch (InterruptedException caught) {
      System.out.println ("    " + getName() + " method run exception");
    }
  }
  //}}}
}
//}}}
//{{{  PROC Succ (CHAN OF INT in, CHAN OF INT out)
// yen.Teig 20.3.96
// modified Welch 8.5.96

class Succ extends Thread {
  private CHAN_OF_INT in;
  private CHAN_OF_INT out;

  //{{{  Succ
  Succ (CHAN_OF_INT in, CHAN_OF_INT out) {
    // setName ("Succ");
    this.in = in;
    this.out = out;
    start ();
  }
  //}}}
  //{{{  run
  public void  run () {
    // System.out.println ("    " + getName() + " " + toString());
    try {
      //{{{  main code in here
      while (true)                      //          WHILE TRUE
      {                                 //            INT value:
        int value;                      //            SEQ
        value = in.in ();               //              in ? value
        out.out (value + 1);            //              out ! value + 1
      }
      //}}}
    }
    catch (InterruptedException caught) {
      System.out.println ("    " + getName() + " method run exception");
    }
  }
  //}}}
}
//}}}
//{{{  PROC Consume (INT nLoops, CHAN OF INT in, out)
// yenyvind Teig 20.3.96
// modified Welch 8.5.96

class Consume extends Thread {
  private CHAN_OF_INT in;
  private CHAN_OF_INT out;
  private int nLoops;

  //{{{  constructor
  Consume (int nLoops, CHAN_OF_INT in, CHAN_OF_INT out) {
    // setName ("Consume");
    this.in = in;
    this.out = out;
    this.nLoops = nLoops;
    start ();
  }
  //}}}
  //{{{  run
  public void  run () {
    // System.out.println ("    " + getName() + " " + toString());
    try {
      //{{{  main code in here
      //{{{  warm-up loop
      int warm_up = 16;                          //          VAL INT warm.up IS 16:
      for (int i = 0; i < warm_up; i++) {        //          SEQ i = 0 FOR warm.up
        int value;                               //            INT value:
        value = in.in ();                        //            in ? value
        //{{{  COMMENT System.out.println (value);
        //System.out.println (value);
        //}}}
      }
      //}}}
      //{{{  Time tag
      Date date1 = new Date();
      //}}}
      //{{{  bench-mark loop
      for (int i = 0; i < nLoops; i++) {         //          SEQ i = 0 FOR nLoops
        int value;                               //            INT value:
        value = in.in ();                        //            in ? value
        //{{{  COMMENT System.out.println (value);
        //System.out.println (value);
        //}}}
      }
      //}}}
      //{{{  Time tag
      Date date2 = new Date();
      //}}}
      //{{{  Report
      long microseconds   = ((date2.getTime() - date1.getTime()) * 1000);
      long timePerLoop_us = (microseconds / ((long) nLoops));
      System.out.println ("   " + timePerLoop_us + " microseconds / iteration");
      //}}}
      //{{{  signal main thread that we're done
      out.out (0);
      //}}}
      //}}}
    }
    catch (InterruptedException caught) {
      System.out.println ("    " + getName() + " method run exception");
    }
  }
  //}}}
}
//}}}

//{{{  main program thread
// yenyvind Teig 20.3.96
// modified Welch 8.5.96

class ComsTime {
  //{{{  public static void  main (String argv [])
  public static void  main (String argv []) {
    //{{{  Banner
    System.out.println ("");
    System.out.println ("Test of communication between Java threads");
    System.out.println ("Based on occam ComsTime.occ by Peter Welch, University of Kent at Canterbury");
    System.out.println ("Ported into Java by Oyvind Teig");
    System.out.println ("Now using CHAN_OF_INT (phw)");
    System.out.println ("");
    //}}}
    //{{{  nLoops
    int nLoops = 1000;
    System.out.println (nLoops + " loops:");
    //}}}
    //{{{  CHAN OF INT a, b, c, d, e:
    CHAN_OF_INT a = new CHAN_OF_INT ();
    CHAN_OF_INT b = new CHAN_OF_INT ();
    CHAN_OF_INT c = new CHAN_OF_INT ();
    CHAN_OF_INT d = new CHAN_OF_INT ();
    CHAN_OF_INT e = new CHAN_OF_INT ();
    //}}}
    //{{{  PAR (prefix, delta, succ, consume)         //    PAR
    Prefix prefix = new Prefix (0, b, a);           //      prefix (0, b, a)
    Delta delta = new Delta (a, c, d);              //      delta (a, c, d)
    Succ succ = new Succ (c, b);                    //      succ (c, b)
    Consume consume = new Consume (nLoops, d, e);   //      consume (nLoops, d, e)
    //}}}
    //{{{  wait for the all done signal
    try {
      int done = e.in ();
    }
    catch (InterruptedException caught) {
      System.out.println ("    " + "ComsTime" + " method run exception");
    }
    //}}}
    //{{{  Stop threads
    prefix.stop ();
    delta.stop ();
    succ.stop ();
    consume.stop ();
    //}}}
    //{{{  join
    try {
       prefix.join ();
       delta.join ();
       succ.join ();
       consume.join ();
    }
    catch (InterruptedException caught) {
      System.out.println ("    " + "ComsTime" + " final join interrupted");
    }
    //}}}
  }
  //}}}
}
//}}}
//}}}

Performance times reported are around 5.5 milliseconds per cycle.

//}}}
//{{{  a blocking FIFO class for Java

The following is an attempt to allow multiple readers/writers and cope with
their blocking in a non-busy way.  The attempt is only partially successful.

//{{{  tiny problem of divergence

There's something that looks suspiciously like a busy loop (see the fold
marked "...  wait till there is something/room").  The thread won't go round
this loop a second time UNLESS it's beaten to the monitor by a competitive
reader (if it was a reader) or a competitive writer (if it was a writer).
It is sadly possible for a blocked reader to be overtaken (infinitely often)
by readers that come along after it ... same for a writer.

//}}}

It's safe and non-busy for a single reader/writer though ... need to think
some more on this ...

//{{{  BUFFER_OF_INT.java

import java.io.*;
import java.util.*;
import java.lang.*;

public class BUFFER_OF_INT {

  //{{{  COMMENT documentation
  //
  //BUFFER_OF_INT implements a blocking FIFO buffer of integers.  A fixed size
  //is defined upon initialisation.  There can be any number of concurrent
  //readers and writers.  Readers are blocked when the buffer is empty.  Writers
  //are blocked when the buffer is full.  A non-empty buffer will not refuse
  //a reader.  A non-full buffer will not refuse a writer.
  //
  //The buffer should also be `fair'.
  //
  //CAUTION: above is the specification ... this implementation is not fair!
  //I think a reader may be overtaken by new readers and that this could happen
  //indefinitely ... same for writers ... needs fixing ...
  //
  //Meanwhile, this implementation should be safe for non-competetive readers
  //and writers -- e.g. one of each.
  //
  //}}}

  //{{{  local state
  int[] buffer;
  int max;
  
  int size = 0;              // INVARIANT: (0 <= size <= max)
  int hi = 0;                // INVARIANT: (0 <= hi < max)
  int lo = 0;                // INVARIANT: (0 <= lo < max)
  
  int blocked_readers = 0;
  int blocked_writers = 0;
  //}}}
  //{{{  constructor
  BUFFER_OF_INT (int max) {
    //{{{  
    this.max = max;
    buffer = new int[max];
    //}}}
  }
  //}}}

  //{{{  public synchronized void write (int n) throws InterruptedException {
  public synchronized void write (int n) throws InterruptedException {
    //{{{  
    //{{{  wait till there is room
    while (size == max) {
      blocked_writers++;
      wait ();
    }
    //}}}
    //{{{  update buffer
    buffer[hi] = n;
    hi = (hi + 1) % max;
    size++;
    //}}}
    //{{{  schedule any blocked reader (just one)
    if (blocked_readers > 0) {
      blocked_readers--;
      notify ();
    }
    //}}}
    //}}}
  }
  //}}}
  //{{{  public synchronized int read () throws InterruptedException {
  public synchronized int read () throws InterruptedException {
    //{{{  
    int temporary;
    //{{{  wait till there is something
    while (size == 0) {
      blocked_readers++;
      wait ();
    }
    //}}}
    //{{{  update buffer
    temporary = buffer[lo];
    lo = (lo + 1) % max;
    size--;
    //}}}
    //{{{  schedule any blocked writer (just one)
    if (blocked_writers > 0) {
      blocked_writers--;
      notify ();
    }
    //}}}
    return temporary;
    //}}}
  }
  //}}}

}

//}}}

This can be used in the ComsTime program simply by changing all "CHAN_OF_INT"
to "BUFFER_OF_INT" and by changing the "in" method calls to "read" and the
"out" method calls to "write".

Performance times reported are again around 5.5 milliseconds per cycle.

//}}}
//{{{  discussion

More thinking needs to be done ... discussion postponed!

Except to say:

  o the occam code is much simpler and shorter;
  o the occam code executes ComsTime about 1000 times faster;
  o non-busy-looping Java code for secure fair multiple readers/writers
    needs finding (and is pretty hard to reason through correctly ...);
  o non-busy-looping occam code for secure fair multiple readers/writers
    is trivial (especially so if we implement occam3 SHARED channels).

Too early for any conclusions to be drawn ...

//}}}

Peter Welch.