[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: new sync primitives



Dear All,

As I was the progenitor of the BUCKET concept I will send round 
 a more detailed example of the use of BUCKETS.  

Richard is correct, there is a problem that if a process decides to fall 
into a bucket just as it is flushed then we may have a race hazard.

First of all the buckets form a circular, wrap around set.  A process 
wishing to fall into a bucket determines into which bucket it wishes to 
fall and then puts itself into that bucket.  Now, if the time between the 
current time and the target time of the process is so close then the 
process may attempt to put itself into the next bucket from the one that 
has just flushed.  It may fail, in which case it will wait for the total 
elapsed time to go round the circle of buckets.  The designer has to be 
aware of this.

What happens if the target delay is longer than the circle of buckets 
permits, then the actual delay will in fact be desired delay MINUS wrap 
around time.  This is probably worse, thus as a designer you must ensure 
that no target delay is greater than the wrap around time.

Yes the bucket concept is very easy to use and does precisely what you 
want, in particular it measn that controlling time within a processor 
becomes trivial as there is only one process actually accessing the real 
timer.  In a multi processor system where time co-ordination is required 
between processors this makes it much simpler to organise as you have far 
less processors than processes.

Jon

--{{{  libraries
#USE "utils.tco"
#INCLUDE "semaphor.inc"
#INCLUDE "event.inc"
#INCLUDE "bucket.inc"
--}}}  
--{{{  constants
VAL INT max.lanes IS 2:
VAL INT max.slots IS 100:
VAL INT max.syncs IS 10:
VAL INT iterations IS 10:
VAL INT ticks.per.second IS 1000000:
VAL INT ticks.per.tenth IS ticks.per.second / 10:
VAL INT speed.up.factor IS 10:
VAL INT timer.delay IS ticks.per.tenth / speed.up.factor :
--}}}  

PROC synchronised.block.timer ( CHAN OF BYTE stdin, stdout, stderr )

  --{{{  COMMENT documentation
  --
  --The aim of this program is to build a system which permits many processes
  --to be placed on synchronising timer processes, without having to place
  --all the processes on the timer queue.
  --
  --The example is based upon an application derived from a road traffic
  --modelling system.
  --
  --A lane comprises a number of slots.  A lane control process
  --determines for how long a vehicle should stay in a particular slot.
  --In this example all vehicles are delayed for a fixed time.
  --Access by the lane processes to the control process is a place
  --where an occam3 resource channel would be ideal as this gives a fair
  --implementation.  We use here a sempahore 'control.s' that is claimed
  --by each slot process.  The control process just returns the 'fixed' delay.
  --
  --The example system contains two such lanes.
  --
  --When a slot process receives the time for which it is to be delayed, it would
  --normally delay itself on a timer queue.  However in this case the number
  --of entries on the timer queue would be large and would involve much
  --processing of the linked lists that are used to implement the timer queue.
  --
  --Instead, the slot process falls into a BUCKET associated with the desired
  --time-out.
  --
  --The time.organiser process simply waits for each tenth of a second
  --to elapse and then flushes the relevant BUCKET, releasing any processes
  --that were inside.
  --
  --The slot and time.organiser processes share a current.bucket variable.
  --For current (and forseeable) implementations of occam, this does not
  --introduce a race hazard.
  --
  --The amount of the delay associated with each 1/10 second tick is variable
  --by the value in speed up factor so that the system can run faster
  --than real-time.  In this case 10 times faster than real-time.
  --
  --Use has been made of the semaphore mechanism in three places:
  --
  --1. to remove an ALT in the access by slots to the control process
     --in each lane process.
  --
  --2. To control access by the slot processes to each of the synchroniser
     --processes.  Effectively giving a claim and grant mechanism of an
     --occam3 shared resource channel to each synchroniser process.
  --
  --3. To write directly to the stdout channel from amy of the processes.
  --
  --In a real system, there would need to be 60 such BUCKETs and the number
  --of slot processes per processor is likely to be measured in thousands.
  --
  --It also has the advantage that time on a particular process is guaranteed
  --co-ordinated across all processes that wish to access time.
  --
  --In a multi-processor system time co-ordination will be much easier because
  --now all we have to do is synchronise the time.organiser processes and there
  --is only one of these per processor.   BUT we will need a mecahnism which
  --will allow synchronisation across processors.
  --
  --To keep each processor in step we will need to synchronise each processor
  --at the end of each BUCKET cycle.  This  will ensure that time remains
  --accurate to the minute across all processors.  For my application this is
  --sufficient!
  --
  --}}}  

  --{{{  #PRAGMA SHARED stdout
  #PRAGMA SHARED stdout
  SEMAPHORE stdout.s:
  #PRAGMA SHARED stdout.s
  --}}}  

  --{{{  PROC generator (CHAN OF INT details, CHAN OF BYTE screen, SEMAPHORE screen.s)
  PROC generator (CHAN OF INT details, CHAN OF BYTE stdout, SEMAPHORE stdout.s)
    TIMER time:
    INT now, start :
    SEQ
      time ? start
      SEQ i = 0 FOR iterations
        SEQ
          claim.semaphore (stdout.s)
          out.string ("Starting at ", 0, stdout)
          time ? now
          out.number (start MINUS now, 0, stdout)
          out.number (i, 5, stdout)
          out.string ("*c*n", 0, stdout)
          release.semaphore (stdout.s)
          details ! 0
          time ? AFTER now PLUS 200000
  :
  --}}}  
  --{{{  PROC sink( CHAN OF INT details, CHAN OF BYTE screen, SEMAPHORE screen.s )
  PROC sink (CHAN OF INT details, CHAN OF BYTE stdout, SEMAPHORE stdout.s )
    INT now, start, total.delay :
    TIMER time:
    SEQ
      details ? total.delay
      time ? start
      SEQ i = 0 FOR iterations
        SEQ
          time ? now
          claim.semaphore (stdout.s)
          out.string ("Sinking at ", 0, stdout)
          out.number (start MINUS now, 0, stdout)
          out.number (total.delay, 5, stdout)
          out.number (i, 5, stdout)
          out.string ("*c*n", 0, stdout)
          release.semaphore (stdout.s)
          details ? total.delay
  :
  --}}}  

  --{{{  PROC control ( CHAN OF BOOL for.how.long, CHAN OF INT wait.for,
  PROC control ( CHAN OF BOOL for.how.long, CHAN OF INT wait.for,
                 VAL INT lane.delay )
    -- all slots will receive the same delay in this lane
    WHILE TRUE
      BOOL signal:
      SEQ
        for.how.long ? signal  -- this is a shared channel
        wait.for ! lane.delay  -- and the matching response
  :
  --}}}  
  --{{{  PROC slot ( INT current.bucket, []BUCKET b, CHAN OF INT in, out,
  PROC slot ( INT current.bucket, []BUCKET b, CHAN OF INT in, out,
              CHAN OF BOOL for.how.long, CHAN OF INT wait.for,
              SEMAPHORE control.s,
              CHAN OF BYTE stdout, SEMAPHORE stdout.s)
    --{{{  #PRAGMA SHARED
    #PRAGMA SHARED current.bucket
    #PRAGMA SHARED b
    #PRAGMA SHARED stdout.s
    --}}}  
    WHILE TRUE
      INT total.delay, delay, sync.point :
      SEQ
        in ? total.delay
        --{{{  get lane delay
        SEQ
          claim.semaphore (control.s)
          for.how.long ! TRUE
          wait.for ? delay     -- delay > 0 and in 1/10ths of second
          release.semaphore(control.s)
        --}}}  
        sync.point := (current.bucket + delay ) \ max.syncs
        total.delay := total.delay + delay
        --{{{  COMMENT trace
        --SEQ
          --claim.semaphore (stdout.s)
          --out.string ("Fall into ", 0, stdout)
          --out.number (sync.point, 0, stdout)
          --out.string ("*c*n", 0, stdout)
          --release.semaphore (stdout.s)
        --}}}  
        fall.into.bucket (b[sync.point])
        --{{{  COMMENT trace
        --SEQ
          --claim.semaphore (stdout.s)
          --out.string ("Flushed from ", 0, stdout)
          --out.number (current.bucket, 0, stdout)
          --out.number (sync.point, 3, stdout)
          --out.string ("*c*n", 0, stdout)
          --release.semaphore (stdout.s)
        --}}}  
        out ! total.delay
  :
  --}}}  
  --{{{  PROC lane ( INT current.bucket, []BUCKET b, CHAN OF INT in, out,
  PROC lane ( INT current.bucket, []BUCKET b, CHAN OF INT in, out,
              VAL INT lane.delay, CHAN OF BYTE stdout, SEMAPHORE stdout.s)
    --{{{  #PRAGMA SHARED
    #PRAGMA SHARED current.bucket
    #PRAGMA SHARED b
    #PRAGMA SHARED stdout.s
    --}}}  
    [max.slots-1] CHAN OF INT details:
    --{{{  shared channels and semaphore for access to control
    SEMAPHORE control.s:
    #PRAGMA SHARED control.s
    CHAN OF BOOL for.how.long:
    #PRAGMA SHARED for.how.long
    CHAN OF INT wait.for:
    #PRAGMA SHARED wait.for
    --}}}  
    SEQ
      initialise.semaphore (control.s, 1)
      PAR
        control ( for.how.long, wait.for, lane.delay )
        slot ( current.bucket, b, in, details[0],
               for.how.long, wait.for, control.s,
               stdout, stdout.s )
        PAR i = 1 FOR max.slots - 2
          slot ( current.bucket, b, details[i-1], details[i],
                 for.how.long, wait.for, control.s,
                 stdout, stdout.s )
        slot ( current.bucket, b, details[max.slots - 2], out,
               for.how.long, wait.for, control.s,
               stdout, stdout.s )
  :
  --}}}  

  --{{{  PROC time.organiser ( INT current.bucket, []BUCKET b,
  PROC time.organiser ( INT current.bucket, []BUCKET b,
                        CHAN OF BYTE stdout, SEMAPHORE stdout.s )
    --{{{  COMMENT #PRAGMA SHARED current.bucket
    --#PRAGMA SHARED current.bucket
    --#PRAGMA SHARED b
    --}}}  
    INT now, start:
    BOOL run:
    TIMER real.time :  -- this is the ONLY time control point
    VAL []BYTE message IS "Real.time elapsed for bucket - " :
    SEQ
      run := TRUE
      real.time ? now
      start := now
      now := now PLUS timer.delay
      WHILE TRUE
        SEQ
          real.time ? AFTER now   --  only place where AFTER is used !
          --{{{  print out number in the current bucket (optional)
          INT count:
          SEQ
            count := number.in.bucket (b[current.bucket])
            IF
              count > 0
                --{{{  
                SEQ
                  claim.semaphore (stdout.s)
                  out.string (message, 0, stdout)
                  out.number (current.bucket, 0, stdout)
                  out.string (" at ", 0, stdout)
                  out.number (start MINUS now, 0, stdout)
                  out.string (" releasing ", 0, stdout)
                  out.number (count, 0, stdout)
                  out.string ("*c*n", 0, stdout)
                  release.semaphore (stdout.s)
                --}}}  
              TRUE
                SKIP
          --}}}  
          flush.bucket (b[current.bucket])
          current.bucket := (current.bucket + 1) \ max.syncs
          now := now PLUS timer.delay
  :
  --}}}  

  --{{{  shared buckets
  [max.syncs]BUCKET b:
  #PRAGMA SHARED b
  
  INT current.bucket:
  #PRAGMA SHARED current.bucket
  --}}}  
  --{{{  channel declarations
  [max.lanes+1]CHAN OF INT details :
  --}}}  
  --{{{  main process
  SEQ
    --{{{  initialise anything that's shared
    SEQ
      current.bucket := 0
      SEQ i = 0 FOR max.syncs
        initialise.bucket ( b[i] )
      initialise.semaphore ( stdout.s, 1 )
    --}}}  
    PAR
      generator (details[0], stdout, stdout.s)
      sink ( details[max.lanes], stdout, stdout.s )
      PAR i = 0 FOR max.lanes
        lane ( current.bucket, b, details[i], details[i+1], 3, stdout, stdout.s )
      time.organiser ( current.bucket, b, stdout, stdout.s )
  --}}}  
: