# Monitor Examples
**Source**: https://cforall.uwaterloo.ca/features/monitor.shtml
**Parent**: https://cforall.uwaterloo.ca/features/
- [Synchronization Bounded Buffer](#BoundedBuffer)
- [Synchronization Dating Service](#DatingService)
---
## Mutual Exclusion
Mutual Exclusion provides exclusive access to a shared resource.
---
## Synchronization
Synchronization provides a timing relationship among threads.
### Bounded Buffer
Concurrent generic bounded-buffer blocks producers when buffer full, consumers when buffer empty, and handles multiple producers/consumers inserting/removing simultaneously.
| Internal Scheduling | External Scheduling |
| --- | --- |
| ``` #include <thread.hfa> enum { BufferSize = 5 }; forall( T ) { monitor Buffer { condition full, empty; int front, back, count; T elements[BufferSize]; }; void ?{}( Buffer(T) & buffer ) with( buffer ) { [front, back, count] = 0; } // read-only, no mutual exclusion int query( Buffer(T) & buffer ) { return buffer.count; } void insert( Buffer(T) & mutex buffer, T elem ) with( buffer ) { if ( count == BufferSize ) wait( empty ); elements[back] = elem; back = ( back + 1 ) % BufferSize; count += 1; signal( full ); } T remove( Buffer(T) & mutex buffer ) with( buffer ) { if ( count == 0 ) wait( full ); T elem = elements[front]; front = ( front + 1 ) % BufferSize; count -= 1; signal( empty ); return elem; } } ``` | ``` #include <thread.hfa> enum { BufferSize = 5 }; forall( T ) { monitor Buffer { int front, back, count; T elements[BufferSize]; }; void ?{}( Buffer(T) & buffer ) with( buffer ) { [front, back, count] = 0; } // read-only, no mutual exclusion int query( Buffer(T) & buffer ) { return buffer.count; } void insert( Buffer(T) & mutex buffer, T elem ) with( buffer ) { if ( count == BufferSize ) waitfor( remove : buffer ); elements[back] = elem; back = ( back + 1 ) % BufferSize; count += 1; } T remove( Buffer(T) & mutex buffer ) with( buffer ) { if ( count == 0 ) waitfor( insert : buffer ); T elem = elements[front]; front = ( front + 1 ) % BufferSize; count -= 1; return elem; } } ``` |
### Dating Service
Girl/boy threads with matching compatibility codes exchange phone numbers.
| signal (Explict Exchange) | signal\_block (Implicit Exchange) |
| --- | --- |
| ``` #include <thread.hfa> enum { CompCodes = 20 }; monitor DatingService { unsigned int GirlPhoneNo, BoyPhoneNo; condition Girls[CompCodes], Boys[CompCodes]; condition exchange; }; unsigned int girl( DatingService & mutex ds, unsigned int PhoneNo, unsigned int ccode ) with( ds ) { if ( is_empty( Boys[ccode] ) ) { wait( Girls[ccode] ); GirlPhoneNo = PhoneNo; signal( exchange ); } else { GirlPhoneNo = PhoneNo; signal( Boys[ccode] ); wait( exchange ); } return BoyPhoneNo; } unsigned int boy( DatingService & mutex ds, unsigned int PhoneNo, unsigned int ccode ) with( ds ) { if ( is_empty( Girls[ccode] ) ) { wait( Boys[ccode] ); BoyPhoneNo = PhoneNo; signal( exchange ); } else { BoyPhoneNo = PhoneNo; signal( Girls[ccode] ); wait( exchange ); } return GirlPhoneNo; } ``` | ``` #include <thread.hfa> // condition type enum { CompCodes = 20 }; // number of compatibility codes monitor DatingService { unsigned int GirlPhoneNo, BoyPhoneNo; condition Girls[CompCodes], Boys[CompCodes]; }; unsigned int girl( DatingService & mutex ds, unsigned int PhoneNo, unsigned int ccode ) with( ds ) { if ( is_empty( Boys[ccode] ) ) {// no compatible boy ? wait( Girls[ccode] ); // wait for boy GirlPhoneNo = PhoneNo; // make phone number available } else { GirlPhoneNo = PhoneNo; // make phone number available signal_block( Boys[ccode] );// restart boy to set phone number } return BoyPhoneNo; } unsigned int boy( DatingService & mutex ds, unsigned int PhoneNo, unsigned int ccode ) with( ds ) { if ( is_empty( Girls[ccode] ) ) {// no compatible girl ? wait( Boys[ccode] ); // wait for girl BoyPhoneNo = PhoneNo; // make phone number available } else { BoyPhoneNo = PhoneNo; // make phone number available signal_block( Girls[ccode] );// restart girl to set phone number } return GirlPhoneNo; } ``` |
### Readers Writer Lock
Allow simultaneous readers, only one writer, and maintain temporal order preventing stale or fresh reads.
```
#include <thread.hfa> // condition type
monitor ReadersWriter {
int rcnt, wcnt; // number of readers/writer using resource
};
void EndRead( ReadersWriter & mutex rw ) with(rw) {
rcnt -= 1;
}
void EndWrite( ReadersWriter & mutex rw ) with(rw) {
wcnt = 0;
}
void StartRead( ReadersWriter & mutex rw ) with(rw) {
if ( wcnt > 0 ) waitfor( EndWrite : rw );
rcnt += 1;
}
void StartWrite( ReadersWriter & mutex rw ) with(rw) {
if ( wcnt > 0 ) waitfor( EndWrite : rw );
else while ( rcnt > 0 ) waitfor( EndRead : rw );
wcnt = 1;
}
void ?{}( ReadersWriter & rw ) with(rw) { rcnt = wcnt = 0; }
int readers( ReadersWriter & rw ) { return rw.rcnt; }
void Read( ReadersWriter & rw ) { // compress two step protocol to one
StartRead( rw );
sout | "Reader:" | active_thread() | ", shared:" | SharedRW | " with:" | readers( rw ) | " readers";
yield( 3 );
EndRead( rw );
}
void Write( ReadersWriter & rw ) { // compress two step protocol to one
StartWrite( rw );
SharedRW += 1;
sout | "Writer:" | active_thread() | ", wrote:" | SharedRW;
yield( 1 );
EndWrite( rw );
}
thread Worker {
ReadersWriter &rw;
};
void ?{}( Worker & w, ReadersWriter & rw ) { &w.rw = &rw; }
void main( Worker & w ) with(w) {
for ( 10 ) {
if ( rand() % 100 < 70 ) { // decide to be a reader or writer
Read( rw );
} else {
Write( rw );
} // if
} // for
}
int main() {
enum { MaxTask = 5 };
ReadersWriter rw;
Worker * workers[MaxTask];
for ( i; MaxTask ) workers[i] = new( rw );
for ( i; MaxTask ) delete( workers[i] );
sout | "successful completion";
} // main
```