Tuesday, Oct. 17 

Handout

The classic Producer-Consumer or  "bounded buffer" problem

 

Think of MacDonald’s as a burger pipeline:  cook making burgers, putting them in the bin.  But if the bin fills up, the cook has to stop for a while.  The consumers come to buy burgers, and take them from the bin, but if the bin is empty, they have to wait a while.  The bin has a certain capacity.  Similarly, memory buffers have  a certain size, the "bounded buffer" idea.

 

How can we solve similar problems with data pipelines?  Spinning?  No, wastes CPU.  Instead, we want to block the waiter.

 

Solution: semaphores:  they can block for more than just mutex.  Think how they work: if sem’s count is 3, then down, down, down and sleep.  If we can arrange a count that goes to 0 at the point we need a thread to sleep, semaphores will do the job. 

 

Here we are considering a data pipeline with a buffer of a fixed number of slots, a “bounded buffer.”  Although buffers can be enlarged dynamically, that isn’t such a great idea in many cases, as a faster producer could use up a huge amount of memory and the job isn’t being finished faster.

 

The data consumer can be controlled by a semaphore whose count tracks the number of objects in the data buffer.  Buffer empty, consumer waits.  The data producer can be controlled by a semaphore whose count tracks the number of spaces in the buffer, i.e., the number of empty slots in the buffer.  Buffer full, no empties, producer waits.

 

In Tanenbaum, pg. 112, we have some magic C code (perhaps a preprocessor.)  Let’s use the portable POSIX semaphore API instead, with sem_init(), sem_wait(), sem_post(), sem_destroy().  You can “man sem_init” for example, for more details.

 

/* needs various headers, including <semaphore.h> */

sem_t mutex, empty, full;  /* struct type sem_t is defined in header */

int main()

{

       if (sem_init(&mutex, /*process-local*/ 0, /*init count*/ 1) < 0) 

         perror("Failed to create semaphore 1");   

          return 1;

        }

       if (sem_init(&empty, /*process-local*/ 0, /*init count*/ N) < 0)

         perror("Failed to create semaphore 2");

          return 1;

        }

       if (sem_init(&full, /*process-local*/ 0, /*init count*/ 0) < 0)

         perror("Failed to create semaphore 3");

          return 1;

        }

       thread_create(producer, ...);    

thread_create(consumer, ...); 

       getchar();              /* block main thread */

/* here, could bring down threads and destroy semaphores, but exit will do it for us */

}

 

/* producer() and consumer() are close to Tanenbaum, pg. 112 */

/* Note: each system call should have its return value tested as show above for   sem_init */

void producer()

{

       int item;

 

       while (TRUE) {

          item = produce_item();

          sem_wait(&empty);        /* claim empty spot */

          sem_wait(&mutex);

          insert_item(item);       /* insert, protected by mutex */

          sem_post(&mutex);

          sem_post(&full);         /* signal filled-in spot */

       }

}

void consumer()

{

       int item;

 

       while (TRUE) {

          sem_wait(&full);         /* claim spot in buffer */

          sem_wait(&mutex);

          item = remove_item();    /* remove, protected by mutex */

          sem_post(&mutex);

          sem_post(&empty);        /* signal empty spot */

          consume_item(item);

       }

}

 

Think about how this will run.  First the producer will run and produce N items, and during this time, the consumer will start and consume some. 

 

Either way, it works fine, and if the producer and consumer vary in speed, the system keeps them in balance.

 

Semaphore and mutex systems

Solaris: two slightly different APIs, one for POSIX (more portable) with sem_init, etc. other has sema_init(), etc.  See man pages.

Win32: CreateSemaphore, WaitForSingleObject, ReleaseSemaphore

            CreateMutex, WaitForSingleObject, ReleaseMutex

               

Note on Tan., pp 113-114.  Note this discussion is about user-level threads, not kernel-supported threads.  User-level threads cannot use simple spin locks because they have no preemption capability, unlike kernel-supported threads.  To fix this, a call to the user-level thread scheduler is added to the spin-lock code to prevent the waiter from just spinning forever. If you want, just ignore user-level threads.  You can drop the “call thread_yield” in Fig. 2-25 and get workable mutex for kernel-supported threads, but usually better to use semaphores and use blocking instead of spin waits.

 

Monitors  (from English word monitor, meaning a kind of guard person, like a lunch monitor in grade school)

 

Think of a picture of stick-figure guard attached to piece of code, one thread inside, others waiting their turn outside.

 

A monitor is a language construct (dating from 1974) that sets up mutex protection for its enclosed code, and also helps with necessary blocking actions.  Just skip the pidgin Pascal coverage and let’s cover the Java case.

 

See pg 120 example, now available by link from the class web page.

 

But let’s focus on the monitor part, and put it in its own .java file, OurMonitor0.java.  We start by using the monitor (i.e. synchronized methods) just for mutex, as follows:

 

public class OurMonitor0 {  // this is a monitor

      private int buffer[] = new int[N];  // queue in ring buffer

      private int count = 0, lo = 0, hi = 0;

 

      public sychronized boolean insert(int val) {

            if (count == N) return false;  // fail if full

            buffer[hi] = val;      // enqueue val, step 1

            hi = (hi + 1) % N;     // enqueue, step 2

            count++;              // enqueue, step 3

            return true;

      }

      // return positive number, or -1 if nothing is there

      public synchronized int remove() {

            int val;

            if (count == 0) return –1; 

            val = buffer[lo];     // dequeue val, step 1

            lo = (lo + 1) % N;    // dequeue, step 2

count--;              // dequeue, step 3

            return val;

      }

}

 

With this implementation, two concurrent inserts or removes or both can’t happen, so the queue data structure should stay unbroken in spite of multithreaded execution.  Providing simple mutex is the most common use of synchronized methods.

 

Note that count is needed here.  Without it, we can’t tell a full buffer from an empty one as they both have lo == hi.  Don’t be fooled!

 

But the caller only gets an error return for a full or empty buffer.  What are they supposed to do, spin??  Probably they want to block, so it’s reasonable to build that into SafeBuffer, using the Java thread package’s wait and notify calls.

 

With the OurMonitor0 implementation above, two concurrent inserts or removes or both can’t happen, so the queue data structure should stay unbroken in spite of multithreaded execution.  Providing simple mutex is the crux of synchronized methods.  Each object with synchronized methods has a hidden mutex variable ensuring that only one thread is executing any of its synchronized methods at once.

 

Note that count was needed there.  Without it, we can’t tell a full buffer from an empty one as they both have lo == hi.  Don’t be fooled!

 

But with the code given above the caller only gets an error return for a full or empty buffer.  What are they supposed to do, spin??  For classic producer-consumer they want to block.  So we have more work to do on the code to make the blocking occur--next time.

Producer Consumer using a Java Monitor with blocking
We just saw how to use Java synchronized methods to provide mutex for a data structure accessed by multiple threads.  It's really easy, since all you have to do is add the keyword "synchronized" to the method declaration, and Java sets up a mutex variable in the object for you and uses it appropriately at each thread entrance into these methods, and exit from them.

Easy mutex is great, but we need more for full producer consumer: we need to block the producer when the buffer is full, and the consumer when the buffer is empty.

It’s possible  to build that blocking into OurMonitor, using the Java wait and notify methods.  Note that mutex alone is not powerful enough to block on any given condition—it’s specialized to block only the second and subsequent threads after one is allowed to run in the critical region. 

 

Java gives us wait and notify, not semaphores, although they (along with mutex) are of equivalent power.  You can implement semaphores from wait and notify—Google “Java semaphores” for several implementations.

 

BTW, we see simple calls wait() and notify().  Where’s the class or object?  What class has these methods? Answer: Object, and any Java object ISA Object, so by inheritance these are methods of the current class.

 

public class OurMonitor {  // this is a monitor

     private int buffer[] = new int[N]; 

     private int count =0, lo = 0, hi = 0;

 

     public sychronized void insert(int val) {  <--no longer returns success/failure

         if (count == N) go_to_sleep();   <-modified, old returned false

         buffer[hi] = val;      // enqueue val, step 1

         hi = (hi + 1) % N;     // enqueue, step 2

         count++;               // enqueue, step 3

         if (count == 1) notify();        <-added

     }

     public synchronized int remove() {

         int val;

         if (count == 0) go_to_sleep();  <-modified, old returned -1 here

         val = buffer[lo];     // dequeue val, step 1

         lo = (lo + 1) % N;    // dequeue, step 2

         count--;              // dequeue, step 3

         if (count == N-1) notify();     <-added

         return val;

     }

     private void go_to_sleep() {        <-added

         try { wait(); }

         catch (InterruptedException ex) {};

     }

}

 

But this looks a lot like Tanenbaum’s sleep/wakeup example, pg. 109, with the fatal race condition.  How does this code avoid it?  What was the race condition?

Answer:  Lost wakeup problem. (Not to be confused with the lost update problem, both race conditions.)

 

Scenario from top of pg. 110: lost wakeup problem

--consumer reads count = 0

<thread switch>

--producer inserts an item into the buffer, incs count, calls wakeup

--but consumer isn’t waiting yet, so wakeup wakes up nothing

<thread switch>

consumer acts on its knowledge of count = 0, calls sleep, sleeps forever

 

This can’t happen with the synchronized methods because the consumer “has the monitor” i.e. the mutex at the thread switch, and so the producer can’t run inside the synch. methods.  There can be a thread switch, but the producer quickly blocks on the mutex, and the consumer runs again and finishes the full action, going to sleep.  Then the producer inserts the item and calls wakeup, waking up the sleeping consumer.  No lost wakeup.

 

Ref: Look at class Object in Java docs http://java.sun.com/j2se/1.5.0/docs/api/