[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

simplified multi-reader/writer BUFFER_OF_INT.java



//{{{}}}

Here's a simplified version of the BUFFER_OF_INT for multiple readers and
writers.  Having to chain the public read method through read_monitor.read,
which called straight back to sync_read did seem odd, but something had to
be done to queue the reader (or writer) on to the necessary monitors.

//{{{  two ways to synchronize threads

Java provides two ways of queueing a thread on a monitor.  One is to call
a synchronised method of the monitor.  The other is to execute a synchronised
block statement referring to that monitor.

I had used the former ... now let's use the latter.  This simplifies the
coding, effectively in-lining the synchronised method calls (which, in this
case, makes what is happening explicit) and removing the need to declare them.

//}}}
//{{{  no reduction in overheads though

I had hoped this might lead to lower overheads being reported by ComsTime,
but there is no noticable difference (i.e. still around 8.8 ms).  Still,
this simplification is worth having for its own sake.

//}}}
//{{{  critical observations still stand

I still stand by my "critical observations" on the non-compositional nature
of Java's thread semantics though (and, by comparison, the WYSIWIG semantics
of occam processes).

//}}}
//{{{  minimalist programming

The new BUFFER_OF_INT declares a class with no data and no methods ... this
should appeal to believers in minimalist programming ... don't blink or you
will miss it!

Instances of this class turn out to be very useful.  They are Java objects
and come equipped with inherited wait/notify methods and their own monitor.

//}}}

//{{{  BUF_OF_INT.java

//{{{  imports
import java.io.*;
import java.util.*;
import java.lang.*;
//}}}

//{{{  class MONITOR {
class MONITOR {
}
//}}}

//{{{  public class BUFFER_OF_INT {
public class BUFFER_OF_INT {

  //{{{  COMMENT specification
  //
  //BUFFER_OF_INT implements a blocking FIFO buffer of integers.  A fixed size
  //is defined upon initialisation.  There can be any number of concurrent
  //readers and writers.  Readers are blocked when the buffer is empty.  Writers
  //are blocked when the buffer is full.  A non-empty buffer will not refuse
  //a reader.  A non-full buffer will not refuse a writer.
  //
  //The buffer is also `fair' -- i.e. readers are dealt with in the order of
  //their arrival ... same for writers.  Contention between readers and writers
  //is dealt with in an arbitrary fashion ... but any unfairness is limited by
  //the size of the buffer (only a finite number of reads can take place without
  //a write and vice-versa).
  //
  //The buffer is `non-busy' -- i.e. there are no polling loops (e.g. by a reader
  //waiting for the buffer to become non-empty).  Blocking is entirely passive.
  //
  //}}}
  //{{{  COMMENT implementation
  //
  //Two local monitors, read_monitor and write_monitor, are declared.
  //
  //Readers call the read method and, then, queue up for the read_monitor.  When
  //a reader acquires this, it queues up for the monitor associated with the
  //BUFFER_OF_INT object itself, where its only competitor may be a single writer
  //that has acquired its write_monitor.  When it acquires the BUFFER_OF_INT
  //monitor, it may have to wait() because the buffer turned out to be empty.
  //This wait() releases the BUFFER_OF_INT monitor, allowing a writer in, but does
  //not release the read_monitor first acquired.  This forces the other readers
  //to wait patiently in line and stops them overtaking (perhaps infinitely often)
  //the waiting reader.  After completing its read, any waiting writer is notified
  //to continue.
  //
  //The writers' story is symmetric to the above.
  //
  //It is important that the public read and write methods are not synchronized.
  //Otherwise, a suspended reader would block all writers and vice-versa!
  //
  //}}}

  //{{{  local state
  int[] buffer;
  int max;
  
  int size = 0;                    // INVARIANT: (0 <= size <= max)
  int hi = 0;                      // INVARIANT: (0 <= hi < max)
  int lo = 0;                      // INVARIANT: (0 <= lo < max)
  
  boolean waiting_reader = false;  // INVARIANT: waiting_reader ==> (size = 0)
  boolean waiting_writer = false;  // INVARIANT: waiting_writer ==> (size = max)
  
  MONITOR read_monitor =           // all readers multiplex through this
    new MONITOR ();
  
  MONITOR write_monitor =          // all writers multiplex through this
    new MONITOR ();
  //}}}
  //{{{  constructor
  BUFFER_OF_INT (int max) {
    this.max = max;
    buffer = new int[max];
  }
  //}}}

  //{{{  public int read () {
  public int read () {
    synchronized (read_monitor) {
      synchronized (this) {
        if (size == 0) {
          waiting_reader = true;
          //{{{  wait ();
          try {
            wait ();
          } catch (InterruptedException e) {
            System.out.println ("BUFFER_OF_INT: InterruptedException exception raised" +
              " whilst waiting to read from an empty buffer ...");
          }
          //}}}
        }
        int tmp = lo;                    // ASSERT: size > 0
        lo = (lo + 1) % max;
        size--;
        if (waiting_writer) {            // ASSERT: size == (max - 1)
          waiting_writer = false;
          notify ();
        }
        return buffer[tmp];
      }
    }
  }
  //}}}
  //{{{  public void write (int n) {
  public void write (int n) {
    synchronized (write_monitor) {
      synchronized (this) {
        if (size == max) {
          waiting_writer = true;
          //{{{  wait ();
          try {
            wait ();
          } catch (InterruptedException e) {
            System.out.println ("BUFFER_OF_INT: InterruptedException exception raised" +
              " whilst waiting to write to a full buffer ...");
          }
          //}}}
        }
        buffer[hi] = n;                  // ASSERT: size < max
        hi = (hi + 1) % max;
        size++;
        if (waiting_reader) {            // ASSERT: size == 1
          waiting_reader = false;
          notify ();
        }
      }
    }
  }
  //}}}

}
//}}}

//}}}

Peter.