CS444 Class 13

2 handouts on producer-consumer

Midterm Exam next week, Wed, Oct. 30

Next Monday: Midterm Review

 

Reading, continued:

Sec 2.3 to Sec. 2.3.4, pg. 125

Sec. 2.3.5 Semaphores, to pg. 131 (stop at user level threads)

Sec. 2.3.7 Monitors, to pg. 137 (stop before condition variables)

Sec. 2.3.8 Message Passing, to pg. 144

Start on Sec. 2.4

 

From last time: Semaphore definition, use as mutex mechanism

 

The classic Producer-Consumer or "bounded buffer" problem

 

Think of MacDonald’s as a burger pipeline:  cook making burgers, putting them in the bin.  But if the bin fills up, the cook has to stop for a while.  The consumers come to buy burgers, and take them from the bin, but if the bin is empty, they have to wait a while.  The bin has a certain capacity.  Similarly, memory buffers have  a certain size, the "bounded buffer" idea.

 

How can we solve similar problems with data pipelines?  Spinning?  No, wastes CPU.  Instead, we want to block the waiter.

 

Solution: semaphores:  they can block for more than just mutex.  Think how they work: if sem’s count is 3, then down, down, down and sleep.  If we can arrange a count that goes to 0 at the point we need a thread to sleep, semaphores will do the job. 

 

Here we are considering a data pipeline with a buffer of a fixed number of slots, a “bounded buffer.”  Although buffers can be enlarged dynamically, that isn’t such a great idea in many cases, as a faster producer could use up a huge amount of memory and the job isn’t being finished faster.  So it’s often best to live with a fixed-size buffer as  is done in this classic problem.

 

<producer thread>

         |            producer adds items to buffer

        v

  [………..]   shared buffer of a certain fixed size

        |          

       v           consumer removes items from buffer

< consumer thread>

 

The data consumer can be controlled by a semaphore whose count tracks the number of objects in the data buffer.  Buffer empty, consumer waits.  The data producer can be controlled by a semaphore whose count tracks the number of spaces in the buffer, i.e., the number of empty slots in the buffer.  Buffer full, no empties, producer waits.

 

In Tanenbaum, Fig. 2-28, pg. 130, we have some magic C code (perhaps a preprocessor.)  On handout too. We could use the portable POSIX semaphore API instead, with sem_init(), sem_wait(), sem_post(), sem_destroy().  You can “man sem_init” for example, for more details.

 

Interpreting the pg. 130 code: it assumes threads have been created to run the producer and consumer functions, and semaphores created using the initial counts shown in the variable definitions.  For ex., “semaphore mutex = 1;”  means creation of a semaphore with initial count 1.  In reality, this requires a system call.  But OK, let’s go along with this model.  Once all these system objects (threads, semaphores) are created, the ensuing execution is clear.

 

Correction, added later: The pthreads (POSIX threads) library does support mutex variables that don’t need a system call at initialization, just a variable definition of type pthread_mutex_t initialized to value PTHREAD_MUTEX_INITIALIZER. I guess the pthreads library keeps track of addresses in use when it sees the pthread_mutex_lock(&mutex) and/or pthread_mutex_unlock(&mutex) calls, i.e., the system is self-initializing to minimize the number of needed calls.  See Love,  Linux System Programming, 2nd Edition, pg. 236. This mutex uses the new Linux futex system call, see the Wikipedia Futex article if interested.

 

Shared data. Note that the variables represent data in memory shared between the threads, i.e., the data of the process virtual machine of the process that has the producer and consumer threads (and has an original thread too, unless the original thread runs one of producer and consumer.)

 

Example trace of execution, on a uniprocessor, so only one thread can be running at a time.  Suppose the consumer starts first:

Initially: empty: ct = 100, full: ct = 0, mutex: ct = 1

Consumer calls down on full: but 0, so blocks…

Producer calls down on empty: ct: 100->99

Producer calls down on mutex: ct: 1->0

Producer calls up on mutex: ct: 0->1 

Producer calls up on full: ct: 0->1  ßunblocking consumer

Suppose producer keeps running:\

Producer calls down on empty: ct: 99->98

Producer calls down, then up on mutex: ct: 1->0 ->1

Producer calls up on full: ct: 1->2

 

At this point, full = 2 = count of items in buffer, empty = 98 = count of empty spots in buffer. The producer has run twice, depositing 2 items in the buffer.

 

Now the unblocked consumer runs…

…unblocked down on full continues, full ct:  2->1

Consumer calls down, up on mutex: 1->0->1

Consumer calls up on empty: ct 98->99

 

At this point, full = 1 = count of items in buffer, empty = 99 = count of empty spots in buffer.  The consumer has removed one of the two items previously in the buffer.

 

Producer Consumer Program in Java

Since Java 5, Java has had a nice Semaphore class, so we’ll use that to turn pg. 130 into real Java code.

 

See handout “Producer Consumer Program Using Java Semaphores” for code and some introduction. The program is available as $cs444/ProdConsSemaphore.java and linked from the class web page.

 

Note that the bounded buffer is implemented by a Queue<Integer>. Queue <E> is an interface in Java, implemented by LinkedList and several other classes in the JDK.  By using the Queue type name, we are indicating that we are using the list as a queue in the data structures sense. We use .add() to enqueue and .remove() to dequeue.  The boundedness of the buffer is ensured by the semaphores—the producer blocks when there are N items in the buffer.

 

 

Think about how this will run.  First the producer will run and produce N items, and during this time, the consumer will start and consume some. 

 

Either way, it works fine, and if the producer and consumer vary in speed, the system keeps them in balance.  Study the output printed after the code to see this happening.

 

Semaphore and mutex system calls

Since semaphores and mutex mechanisms used in user code involve blocking, they must enter the kernel and thus involve system calls.

 

POSIX (portable across UNIX variants) with sem_init, etc. other has sema_init(), etc.  See man pages., use in producer-consumer program on handout

 

Win32: CreateSemaphore, WaitForSingleObject, ReleaseSemaphore

            CreateMutex, WaitForSingleObject, ReleaseMutex

               

Note on Tan., p 131.  Note this discussion is about user-level threads, not kernel-supported threads.  User-level threads cannot use simple spin locks because they have no preemption capability, unlike kernel-supported threads.  To fix this, a call to the user-level thread scheduler is added to the spin-lock code to prevent the waiter from just spinning forever. If you want, just ignore user-level threads, so this whole page.  You can drop the “call thread_yield” in Fig. 2-25 and get workable mutex for kernel-supported threads, but usually better to use semaphores and use blocking instead of spin waits.

 

Monitors  (from English word monitor, meaning a kind of guard person, like a lunch monitor in grade school)

 

Think of a picture of stick-figure guard attached to piece of code with a stop sign, one thread inside, others waiting their turn outside.

 

A monitor is a language construct (dating from 1974) that sets up mutex protection for its encapsulated code, and also helps with necessary blocking actions.  Just skip the pidgin Pascal coverage and let’s cover the Java case.  Java supplies a built-in mutex, usually not in active use, in each object.  We can use the synchronized keyword on methods of the class and then the mutex will start being used to ensure that only one of the methods is currently being executed at each point in time, i.e.., turn the class into a monitor.

 

Tanenbaum’s code on pg. 141 uses a monitor in class “our_monitor”, but it uses open-coded queues like hw1 provided code. Also it doesn’t use semaphores for process coordination. We want to use proper Queues in Java.

 

private class Monitor0 {

    private static int N = 10;

    // How to set up a Queue in Java—Queue interface, LinkedList concrete class

    private Queue<Integer> buffer = new LinkedList<Integer>(); 

 

    public synchronized boolean insert(int val) {

          if (buffer.size() == N)

                return false;  // fail if full 

          buffer.add(val);  // add to end, i.e. enqueue

          return true;

    }

    // return positive number, or -1 if nothing is there

    public synchronized int remove() {

          int val;

          if (buffer.size() == 0)

                return -1;  // fail if empty

          val = buffer.remove(); // remove from front, i.e., dequeue

          return val;

    }

}

 

 

With this implementation, two concurrent inserts or removes or both can’t happen, so the queue data structure  (a LinkedList) should stay unbroken in spite of multithreaded execution.  Note that LinkedList is not itself synchronized, so concurrent access to it can break it. A synchronized version can be created using the following JDK method of Collections:

public static <T> List<T> synchronizedList(List<T> list)

 

 

With the Monitor0 implementation above, two concurrent inserts or removes or both can’t happen, so the queue data structure should stay unbroken in spite of multithreaded execution.  Providing simple mutex is the crux of synchronized methods.  Each object with synchronized methods has a hidden mutex variable ensuring that only one thread is executing any of its synchronized methods at once.

 

But the caller only gets an error return for a full or empty buffer.  What are they supposed to do, spin??  Clearly they want to block so as not to waste CPU—that’s the full producer-consumer program.

 

See $cs444/ProdConsSemaphore1.java for a version of the full program that uses Monitor0.  Monitor0 can be replaced by a JDK-generated synchronized LinkedList:

Collections.synchronizedList(new LinkedList<Integer>())

and the whole BoundedBuffer can be replaced by a JDK LinkedBlockingQueue<Integer>. 

It’s amazing what you can find in the JDK these days.

 

 

Producer Consumer using a Java Monitor

We just saw how to use Java synchronized methods to provide mutex for a data structure accessed by multiple threads.  It's really easy, since all you have to do is add the keyword "synchronized" to the method declaration, and Java sets up a mutex variable in the object for you and uses it appropriately at each thread entrance into these methods, and exit from them.

Easy mutex is great, but we need more for full producer consumer: we need to block the producer when the buffer is full, and the consumer when the buffer is empty.

It’s possible to build that blocking into OurMonitor, using the Java wait and notify methods.  Note that mutex alone is not powerful enough to block on any given condition—it’s specialized to block only the second and subsequent threads after one is allowed to run in the critical region.  This approach leads to Tanenbaum’s code on pg. 141. But we can do it in a much more comprehensible way by just using Semaphores again. We can use empty and full as done in the first handout, but replace the mutex Semaphore with the mutex in Monitor0.

 

Note that a Semaphore is itself implemented as a monitor.  You can implement semaphores from wait and notify—Google “Java semaphores wait notify” for several implementations.

 

BTW, on pg. 141 we see simple calls wait() and notify().  Where’s the class or object?  What class has these methods? Answer: Object, and any Java object ISA Object, so by inheritance these are methods of the current class.

 

However, I strongly caution against coding with wait and notify—it’s very tricky and usually not needed.  Use semaphores instead—see handout.  In fact, the code on pg. 141 looks a lot like the code on pg. 127, labeled as having a fatal race condition.  Shows how hard it is to make this right.

 

For those interested in synch problems…

Tanenbaum’s sleep/wakeup example, pg. 127, has a fatal race condition.  How does this code avoid it?  What was this fatal race condition?

Answer:  Lost wakeup problem. (Not to be confused with the lost update problem, both race conditions.)

 

Scenario from pg. 127: lost wakeup problem

--consumer reads count = 0

<thread switch>

--producer inserts an item into the buffer, incs count, calls wakeup

--but consumer isn’t waiting yet, so wakeup wakes up nothing

<thread switch>

consumer acts on its knowledge of count = 0, calls sleep, sleeps forever

 

This can’t happen with the synchronized methods of pg. 141 because the consumer “has the monitor” i.e. the mutex at the thread switch, and so the producer can’t run inside the synch. methods.  There can be a thread switch, but the producer quickly blocks on the mutex, and the consumer runs again and finishes the full action, going to sleep.  Then the producer inserts the item and calls wakeup, waking up the sleeping consumer.  No lost wakeup.

 

What about our producer-consumer code with semaphores?

 

Consumer tries to get something from empty queue: blocks in sem

Producer inserts an item into queue, OK, consumer already waiting

Producer does release: wakeup, wakes up consumer, no problem.

 

Bottom line: no lost wakeup problem because the semaphore deals with the underlying count, causing blocking along with recognition of needed blocking.