CS444 Producer Consumer Program Using Java Semaphores, following Tanenbaum, pg. 130

Semaphores were invented in 1965 by Dijkstra. A semaphore is a system object with some sort of identifier, here “sem”.  The sem object has a count associated with it.  The two important operations on sem (after its creation) are (Tanenbaum, pg. 110):

To make a real system, we also need to be able to create a semaphore with a certain initial count: sem = createsem(initcount).  POSIX semaphores: sem_init(…)

Tanenbaum, pg. 130: This program assumes semaphore creation somehow automatically happens, but in real life we need to do a system call to create a semaphore.  Here is that program rewritten to use POSIX semaphores.  It is still incomplete because of the thread creation calls, as well as the various unimplemented functions (produce_item, etc.)

By Java docs, semaphores work with “permits”: acquire = down = get a permit, release = up = release a permit back to the system.  In this program, the buffer has N spots, and the semaphore “empty” has a permit for each empty spot in the buffer, while semaphore “full” has a permit for each filled spot in the buffer.  

 

import java.util.LinkedList;

import java.util.Queue;

import java.util.concurrent.Semaphore;

 

public class ProdConsSemaphore {

    private static int N = 10;

    private static producer p = new producer(); // producer thread

    private static consumer c = new consumer(); // consumer thread

    private static Semaphore empty = new Semaphore(N); // start w N permits for empty spots

    private static Semaphore full = new Semaphore(0);  // 0 permits for filled spots

    private static Semaphore mutex = new Semaphore(1)  // 1 permit for mutex

    private static Queue<Integer> buffer = new LinkedList<Integer>();

    private static boolean done = false;

 

    private static class producer extends Thread {

        private int val = 0;

 

        public void run() {// run method contains the thread code

            while (!done) { // producer loop

                try {

                    int item = produce_item();

                    empty.acquire(); // down, to get a permit for empty spot

                    mutex.acquire(); // down, to get mutex permit

                    System.out.print("p " + item + " ");

                    Thread.sleep(50);  // 50 ms delay to slow system down

                    buffer.add(item); // add item to fill promised empty spot

                    mutex.release(); // up, to release mutex

                    full.release(); // up, to check in permit for filled spot

                } catch (InterruptedException e) {

                    System.out.println("problem with producer");

                }

            }

        }

 

        private int produce_item() {

            return ++val;

        }

    }

 

    private static class consumer extends Thread {

        private int ckval = 0;

 

        public void run() {// run method contains the thread code

            while (!done) { // consumer loop

                try {

                    full.acquire(); // down, to get a permit for filled spot

                    mutex.acquire();

                    int item = buffer.remove(); // get promised filled spot item

                    System.out.print("c " + item + " ");

                    Thread.sleep(50);

                    mutex.release();

                    empty.release(); // up, to check in permit for empty sp

                    consume_item(item);

                } catch (InterruptedException e) {

                    System.out.println("problem with consumer");

                }

            }

        }

 

        private void consume_item(int item) {

            ++ckval; // value we expect to get

            if (item != ckval) {

                System.out.println("unexpected val " + item);

            }

        }

    }

 

    public static void main(String args[]) {

        System.out.println("starting p");

        p.start(); // start the producer thread

        System.out.println("starting c");

        c.start(); // start the consumer thread

        try {

            Thread.sleep(5000); // run 5 secs

        } catch (InterruptedException ex) {

        }

        done = true; // tell threads to quit looping

    }

}

As written: consumer and producer both delay 50 ms:

starting p

starting c

p 1 p 2 p 3 p 4 p 5 p 6 p 7 p 8 p 9 p 10   (partly reformatted to show runs)

c 1 c 2 c 3 c 4

p 11 p 12 p 13 p 14 ß queue full

c 5 c 6 c 7 c 8  ßpartly emptied

p 15 p 16 p 17 p 18

c 9 c 10 c 11

 

consumer with 40 ms: faster consumer means often-empty queue:

starting p

starting c

p 1 p 2

c 1

p 3

c 2 c 3 ßqueue empty

p 4 p 5 ß a little bit filled

c 4 c 5

p 6 p 7 ßqueue empty

c 6 c 7

producer with 40 ms: faster producer means often-full queue:

starting p

starting c

p 1 p 2

c 1

p 3 p 4 p 5 p 6 p 7 p 8 p 9 p 10 p 11   ßfill queue up

c 2 c 3

p 12 p 13 ßqueue full again

c 4 c 5