Think
of MacDonald’s as a
burger pipeline: cook
making burgers,
putting them in the bin. But
if the bin fills
up, the cook has to stop for a while.
The consumers come to buy burgers, and take them from the
bin, but if
the bin is empty, they have to wait a while. The bin has a
certain capacity. Similarly, memory buffers have a certain
size, the "bounded buffer" idea.
How
can we solve similar
problems with data pipelines?
Spinning? No,
wastes CPU. Instead,
we want to block the waiter.
Solution:
semaphores: they
can block for more than just mutex.
Think how they work: if sem’s count is 3,
then down, down, down and sleep. If
we
can arrange a count that goes to 0 at the point we need a thread to
sleep,
semaphores will do the job.
Here
we are considering a
data pipeline with a buffer of a fixed number of slots, a
“bounded
buffer.” Although
buffers can be
enlarged dynamically, that isn’t such a great idea in many
cases, as a faster
producer could use up a huge amount of memory and the job
isn’t being finished
faster.
The
data consumer can be
controlled by a semaphore whose count tracks the number of objects in
the data
buffer. Buffer
empty, consumer
waits. The data
producer can be
controlled by a semaphore whose count tracks the number of spaces
in the
buffer, i.e., the number of empty slots in the buffer.
Buffer full, no empties, producer waits.
In
Tanenbaum, pg. 112, we
have some magic C code (perhaps a preprocessor.)
Let’s use the portable POSIX semaphore API
instead, with sem_init(), sem_wait(), sem_post(), sem_destroy(). You can “man
sem_init” for example, for more
details.
/*
needs various headers, including <semaphore.h> */
sem_t
mutex, empty, full; /*
struct type sem_t
is defined in header */
int
main()
{
if (sem_init(&mutex,
/*process-local*/ 0, /*init count*/ 1) < 0)
perror("Failed
to create semaphore
1");
return 1;
}
if (sem_init(&empty,
/*process-local*/ 0, /*init count*/ N) < 0)
perror("Failed
to create semaphore
2");
return 1;
}
if (sem_init(&full, /*process-local*/
0, /*init count*/ 0) < 0)
perror("Failed
to create semaphore
3");
return 1;
}
thread_create(producer, ...);
thread_create(consumer,
...);
getchar();
/* block main thread */
/* here,
could bring down
threads and destroy semaphores, but exit will do it for us */
}
/*
producer() and consumer() are close to Tanenbaum, pg. 112 */
/*
Note: each system call should have its return value tested as show
above
for sem_init
*/
void
producer()
{
int item;
while (TRUE) {
item = produce_item();
sem_wait(&empty);
/*
claim empty spot */
sem_wait(&mutex);
insert_item(item);
/* insert, protected by mutex */
sem_post(&mutex);
sem_post(&full);
/*
signal filled-in spot */
}
}
void
consumer()
{
int item;
while (TRUE) {
sem_wait(&full);
/*
claim spot in buffer */
sem_wait(&mutex);
item = remove_item();
/*
remove, protected by mutex */
sem_post(&mutex);
sem_post(&empty);
/* signal empty spot */
consume_item(item);
}
}
Think
about how this will
run. First the
producer will run and
produce N items, and during this time, the consumer will start and
consume
some.
Either
way, it works fine,
and if the producer and consumer vary in speed, the system keeps them
in
balance.
Semaphore
and mutex systems
Solaris:
two slightly
different APIs, one for POSIX (more portable) with sem_init, etc. other
has
sema_init(), etc. See
man pages.
Win32:
CreateSemaphore,
WaitForSingleObject, ReleaseSemaphore
CreateMutex, WaitForSingleObject, ReleaseMutex
Note
on Tan., pp
113-114. Note this
discussion is about
user-level threads, not kernel-supported threads.
User-level threads cannot use simple spin
locks because they have no preemption capability, unlike
kernel-supported
threads. To fix
this, a call to the
user-level thread scheduler is added to the spin-lock code to prevent
the
waiter from just spinning forever. If you want, just ignore user-level
threads. You can
drop the “call
thread_yield” in Fig. 2-25 and get workable mutex for
kernel-supported threads,
but usually better to use semaphores and use blocking instead of spin
waits.
Monitors (from
English
word monitor, meaning a kind of guard person, like a lunch monitor in
grade
school)
Think
of a picture of
stick-figure guard attached to piece of code, one thread inside, others
waiting
their turn outside.
A
monitor is a language
construct (dating from 1974) that sets up mutex protection for its
enclosed
code, and also helps with necessary blocking actions.
Just skip the pidgin Pascal coverage and
let’s cover the Java case.
See
pg 120 example, now
available by link from the class web page.
But
let’s focus on the
monitor part, and put it in its own .java file, OurMonitor0.java. We start by using the
monitor (i.e.
synchronized methods) just for mutex, as follows:
public class
OurMonitor0 { //
this is a monitor
private int buffer[] = new int[N];
// queue in ring buffer
private int count = 0, lo = 0, hi = 0;
public sychronized boolean insert(int val)
{
if (count == N) return false;
// fail if full
buffer[hi] = val;
// enqueue val, step 1
hi =
(hi + 1) % N;
// enqueue, step 2
count++;
//
enqueue, step 3
return true;
}
// return positive number, or -1 if
nothing is there
public synchronized int remove() {
int val;
if (count == 0) return –1;
val = buffer[lo];
// dequeue val, step 1
lo = (lo + 1) % N;
// dequeue, step 2
count--;
// dequeue, step 3
return val;
}
}
With
this implementation, two
concurrent inserts or removes or both can’t happen, so the
queue data structure
should stay unbroken in spite of multithreaded execution. Providing simple mutex is
the most common use
of synchronized methods.
Note
that count is needed
here. Without it,
we can’t tell a full
buffer from an empty one as they both have lo == hi.
Don’t be fooled!
But
the caller only gets an
error return for a full or empty buffer.
What are they supposed to do, spin??
Probably they want to block, so it’s reasonable
to build that into SafeBuffer,
using the Java thread package’s wait and notify calls.
With
the OurMonitor0
implementation above, two concurrent inserts or removes or both
can’t happen,
so the queue data structure should stay unbroken in spite of
multithreaded
execution. Providing
simple mutex is the
crux of synchronized methods. Each
object with synchronized methods has a hidden mutex variable ensuring
that only
one thread is executing any of its synchronized methods at once.
Note
that count was needed
there. Without it,
we can’t tell a full
buffer from an empty one as they both have lo == hi.
Don’t be fooled!
But with the code given above the caller only gets an error return for a full or empty buffer. What are they supposed to do, spin?? For classic producer-consumer they want to block. So we have more work to do on the code to make the blocking occur--next time.
It’s possible to build that blocking into OurMonitor, using
the Java wait and notify methods.
Note
that mutex alone is not powerful enough to block on any given
condition—it’s
specialized to block only the second and subsequent threads after one
is
allowed to run in the critical region.
Java
gives us wait and
notify, not semaphores, although they (along with mutex) are of equivalent power. You can implement
semaphores from wait and
notify—Google “Java
semaphores” for several implementations.
BTW,
we see simple calls
wait() and notify(). Where’s
the class
or object? What
class has these methods?
Answer: Object, and any Java object ISA Object, so by inheritance these
are
methods of the current class.
public class OurMonitor {
// this is a monitor
private int buffer[] =
new int[N];
private int count =0,
lo = 0, hi = 0;
public sychronized
void insert(int val) { <--no longer returns success/failure
if (count == N)
go_to_sleep(); <-modified,
old returned false
buffer[hi] =
val;
// enqueue val, step 1
hi = (hi + 1) %
N; //
enqueue, step 2
count++;
// enqueue, step 3
if (count == 1)
notify(); <-added
}
public synchronized
int remove() {
int val;
if (count == 0)
go_to_sleep(); <-modified,
old returned -1 here
val =
buffer[lo];
// dequeue val, step 1
lo = (lo + 1) %
N; // dequeue, step 2
count--;
// dequeue, step 3
if (count == N-1)
notify(); <-added
return val;
}
private void
go_to_sleep() { <-added
try { wait(); }
catch
(InterruptedException ex) {};
}
}
But
this looks a lot like Tanenbaum’s
sleep/wakeup example, pg. 109, with the fatal race condition. How does this code avoid
it? What was the
race condition?
Answer: Lost wakeup problem. (Not to be confused with
the lost update
problem, both race conditions.)
Scenario
from top of pg. 110:
lost wakeup problem
--consumer
reads count = 0
<thread
switch>
--producer
inserts an item
into the buffer, incs count, calls wakeup
--but
consumer isn’t waiting
yet, so wakeup wakes up nothing
<thread
switch>
consumer
acts on its
knowledge of count = 0, calls sleep, sleeps forever
This
can’t happen with the
synchronized methods because the consumer “has the
monitor” i.e. the mutex at the thread switch, and
so the producer can’t run inside the synch. methods. There can be a thread
switch, but the
producer quickly blocks on the mutex, and the consumer runs again and
finishes
the full action, going to sleep. Then
the producer inserts the item and calls wakeup, waking up the sleeping
consumer. No lost
wakeup.
Ref: Look at class Object in Java docs http://java.sun.com/j2se/1.5.0/docs/api/