[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: new sync primitives
Dear All,
As I was the progenitor of the BUCKET concept I will send round
a more detailed example of the use of BUCKETS.
Richard is correct, there is a problem that if a process decides to fall
into a bucket just as it is flushed then we may have a race hazard.
First of all the buckets form a circular, wrap around set. A process
wishing to fall into a bucket determines into which bucket it wishes to
fall and then puts itself into that bucket. Now, if the time between the
current time and the target time of the process is so close then the
process may attempt to put itself into the next bucket from the one that
has just flushed. It may fail, in which case it will wait for the total
elapsed time to go round the circle of buckets. The designer has to be
aware of this.
What happens if the target delay is longer than the circle of buckets
permits, then the actual delay will in fact be desired delay MINUS wrap
around time. This is probably worse, thus as a designer you must ensure
that no target delay is greater than the wrap around time.
Yes the bucket concept is very easy to use and does precisely what you
want, in particular it measn that controlling time within a processor
becomes trivial as there is only one process actually accessing the real
timer. In a multi processor system where time co-ordination is required
between processors this makes it much simpler to organise as you have far
less processors than processes.
Jon
--{{{ libraries
#USE "utils.tco"
#INCLUDE "semaphor.inc"
#INCLUDE "event.inc"
#INCLUDE "bucket.inc"
--}}}
--{{{ constants
VAL INT max.lanes IS 2:
VAL INT max.slots IS 100:
VAL INT max.syncs IS 10:
VAL INT iterations IS 10:
VAL INT ticks.per.second IS 1000000:
VAL INT ticks.per.tenth IS ticks.per.second / 10:
VAL INT speed.up.factor IS 10:
VAL INT timer.delay IS ticks.per.tenth / speed.up.factor :
--}}}
PROC synchronised.block.timer ( CHAN OF BYTE stdin, stdout, stderr )
--{{{ COMMENT documentation
--
--The aim of this program is to build a system which permits many processes
--to be placed on synchronising timer processes, without having to place
--all the processes on the timer queue.
--
--The example is based upon an application derived from a road traffic
--modelling system.
--
--A lane comprises a number of slots. A lane control process
--determines for how long a vehicle should stay in a particular slot.
--In this example all vehicles are delayed for a fixed time.
--Access by the lane processes to the control process is a place
--where an occam3 resource channel would be ideal as this gives a fair
--implementation. We use here a sempahore 'control.s' that is claimed
--by each slot process. The control process just returns the 'fixed' delay.
--
--The example system contains two such lanes.
--
--When a slot process receives the time for which it is to be delayed, it would
--normally delay itself on a timer queue. However in this case the number
--of entries on the timer queue would be large and would involve much
--processing of the linked lists that are used to implement the timer queue.
--
--Instead, the slot process falls into a BUCKET associated with the desired
--time-out.
--
--The time.organiser process simply waits for each tenth of a second
--to elapse and then flushes the relevant BUCKET, releasing any processes
--that were inside.
--
--The slot and time.organiser processes share a current.bucket variable.
--For current (and forseeable) implementations of occam, this does not
--introduce a race hazard.
--
--The amount of the delay associated with each 1/10 second tick is variable
--by the value in speed up factor so that the system can run faster
--than real-time. In this case 10 times faster than real-time.
--
--Use has been made of the semaphore mechanism in three places:
--
--1. to remove an ALT in the access by slots to the control process
--in each lane process.
--
--2. To control access by the slot processes to each of the synchroniser
--processes. Effectively giving a claim and grant mechanism of an
--occam3 shared resource channel to each synchroniser process.
--
--3. To write directly to the stdout channel from amy of the processes.
--
--In a real system, there would need to be 60 such BUCKETs and the number
--of slot processes per processor is likely to be measured in thousands.
--
--It also has the advantage that time on a particular process is guaranteed
--co-ordinated across all processes that wish to access time.
--
--In a multi-processor system time co-ordination will be much easier because
--now all we have to do is synchronise the time.organiser processes and there
--is only one of these per processor. BUT we will need a mecahnism which
--will allow synchronisation across processors.
--
--To keep each processor in step we will need to synchronise each processor
--at the end of each BUCKET cycle. This will ensure that time remains
--accurate to the minute across all processors. For my application this is
--sufficient!
--
--}}}
--{{{ #PRAGMA SHARED stdout
#PRAGMA SHARED stdout
SEMAPHORE stdout.s:
#PRAGMA SHARED stdout.s
--}}}
--{{{ PROC generator (CHAN OF INT details, CHAN OF BYTE screen, SEMAPHORE screen.s)
PROC generator (CHAN OF INT details, CHAN OF BYTE stdout, SEMAPHORE stdout.s)
TIMER time:
INT now, start :
SEQ
time ? start
SEQ i = 0 FOR iterations
SEQ
claim.semaphore (stdout.s)
out.string ("Starting at ", 0, stdout)
time ? now
out.number (start MINUS now, 0, stdout)
out.number (i, 5, stdout)
out.string ("*c*n", 0, stdout)
release.semaphore (stdout.s)
details ! 0
time ? AFTER now PLUS 200000
:
--}}}
--{{{ PROC sink( CHAN OF INT details, CHAN OF BYTE screen, SEMAPHORE screen.s )
PROC sink (CHAN OF INT details, CHAN OF BYTE stdout, SEMAPHORE stdout.s )
INT now, start, total.delay :
TIMER time:
SEQ
details ? total.delay
time ? start
SEQ i = 0 FOR iterations
SEQ
time ? now
claim.semaphore (stdout.s)
out.string ("Sinking at ", 0, stdout)
out.number (start MINUS now, 0, stdout)
out.number (total.delay, 5, stdout)
out.number (i, 5, stdout)
out.string ("*c*n", 0, stdout)
release.semaphore (stdout.s)
details ? total.delay
:
--}}}
--{{{ PROC control ( CHAN OF BOOL for.how.long, CHAN OF INT wait.for,
PROC control ( CHAN OF BOOL for.how.long, CHAN OF INT wait.for,
VAL INT lane.delay )
-- all slots will receive the same delay in this lane
WHILE TRUE
BOOL signal:
SEQ
for.how.long ? signal -- this is a shared channel
wait.for ! lane.delay -- and the matching response
:
--}}}
--{{{ PROC slot ( INT current.bucket, []BUCKET b, CHAN OF INT in, out,
PROC slot ( INT current.bucket, []BUCKET b, CHAN OF INT in, out,
CHAN OF BOOL for.how.long, CHAN OF INT wait.for,
SEMAPHORE control.s,
CHAN OF BYTE stdout, SEMAPHORE stdout.s)
--{{{ #PRAGMA SHARED
#PRAGMA SHARED current.bucket
#PRAGMA SHARED b
#PRAGMA SHARED stdout.s
--}}}
WHILE TRUE
INT total.delay, delay, sync.point :
SEQ
in ? total.delay
--{{{ get lane delay
SEQ
claim.semaphore (control.s)
for.how.long ! TRUE
wait.for ? delay -- delay > 0 and in 1/10ths of second
release.semaphore(control.s)
--}}}
sync.point := (current.bucket + delay ) \ max.syncs
total.delay := total.delay + delay
--{{{ COMMENT trace
--SEQ
--claim.semaphore (stdout.s)
--out.string ("Fall into ", 0, stdout)
--out.number (sync.point, 0, stdout)
--out.string ("*c*n", 0, stdout)
--release.semaphore (stdout.s)
--}}}
fall.into.bucket (b[sync.point])
--{{{ COMMENT trace
--SEQ
--claim.semaphore (stdout.s)
--out.string ("Flushed from ", 0, stdout)
--out.number (current.bucket, 0, stdout)
--out.number (sync.point, 3, stdout)
--out.string ("*c*n", 0, stdout)
--release.semaphore (stdout.s)
--}}}
out ! total.delay
:
--}}}
--{{{ PROC lane ( INT current.bucket, []BUCKET b, CHAN OF INT in, out,
PROC lane ( INT current.bucket, []BUCKET b, CHAN OF INT in, out,
VAL INT lane.delay, CHAN OF BYTE stdout, SEMAPHORE stdout.s)
--{{{ #PRAGMA SHARED
#PRAGMA SHARED current.bucket
#PRAGMA SHARED b
#PRAGMA SHARED stdout.s
--}}}
[max.slots-1] CHAN OF INT details:
--{{{ shared channels and semaphore for access to control
SEMAPHORE control.s:
#PRAGMA SHARED control.s
CHAN OF BOOL for.how.long:
#PRAGMA SHARED for.how.long
CHAN OF INT wait.for:
#PRAGMA SHARED wait.for
--}}}
SEQ
initialise.semaphore (control.s, 1)
PAR
control ( for.how.long, wait.for, lane.delay )
slot ( current.bucket, b, in, details[0],
for.how.long, wait.for, control.s,
stdout, stdout.s )
PAR i = 1 FOR max.slots - 2
slot ( current.bucket, b, details[i-1], details[i],
for.how.long, wait.for, control.s,
stdout, stdout.s )
slot ( current.bucket, b, details[max.slots - 2], out,
for.how.long, wait.for, control.s,
stdout, stdout.s )
:
--}}}
--{{{ PROC time.organiser ( INT current.bucket, []BUCKET b,
PROC time.organiser ( INT current.bucket, []BUCKET b,
CHAN OF BYTE stdout, SEMAPHORE stdout.s )
--{{{ COMMENT #PRAGMA SHARED current.bucket
--#PRAGMA SHARED current.bucket
--#PRAGMA SHARED b
--}}}
INT now, start:
BOOL run:
TIMER real.time : -- this is the ONLY time control point
VAL []BYTE message IS "Real.time elapsed for bucket - " :
SEQ
run := TRUE
real.time ? now
start := now
now := now PLUS timer.delay
WHILE TRUE
SEQ
real.time ? AFTER now -- only place where AFTER is used !
--{{{ print out number in the current bucket (optional)
INT count:
SEQ
count := number.in.bucket (b[current.bucket])
IF
count > 0
--{{{
SEQ
claim.semaphore (stdout.s)
out.string (message, 0, stdout)
out.number (current.bucket, 0, stdout)
out.string (" at ", 0, stdout)
out.number (start MINUS now, 0, stdout)
out.string (" releasing ", 0, stdout)
out.number (count, 0, stdout)
out.string ("*c*n", 0, stdout)
release.semaphore (stdout.s)
--}}}
TRUE
SKIP
--}}}
flush.bucket (b[current.bucket])
current.bucket := (current.bucket + 1) \ max.syncs
now := now PLUS timer.delay
:
--}}}
--{{{ shared buckets
[max.syncs]BUCKET b:
#PRAGMA SHARED b
INT current.bucket:
#PRAGMA SHARED current.bucket
--}}}
--{{{ channel declarations
[max.lanes+1]CHAN OF INT details :
--}}}
--{{{ main process
SEQ
--{{{ initialise anything that's shared
SEQ
current.bucket := 0
SEQ i = 0 FOR max.syncs
initialise.bucket ( b[i] )
initialise.semaphore ( stdout.s, 1 )
--}}}
PAR
generator (details[0], stdout, stdout.s)
sink ( details[max.lanes], stdout, stdout.s )
PAR i = 0 FOR max.lanes
lane ( current.bucket, b, details[i], details[i+1], 3, stdout, stdout.s )
time.organiser ( current.bucket, b, stdout, stdout.s )
--}}}
: