CS444 Producer Consumer Program Using Java Semaphores, following Tanenbaum, pg. 130
Semaphores were invented in 1965 by Dijkstra. A semaphore is a system object with some sort of identifier, here “sem”. The sem object has a count associated with it. The two important operations on sem (after its creation) are (Tanenbaum, pg. 110):
To make a real system, we also need to be able to create a semaphore with a certain initial count: sem = createsem(initcount). POSIX semaphores: sem_init(…)
Tanenbaum, pg. 130: This program assumes semaphore creation somehow automatically happens, but in real life we need to do a system call to create a semaphore. Here is that program rewritten to use POSIX semaphores. It is still incomplete because of the thread creation calls, as well as the various unimplemented functions (produce_item, etc.)
By Java docs, semaphores work with “permits”: acquire = down = get a permit, release = up = release a permit back to the system. In this program, the buffer has N spots, and the semaphore “empty” has a permit for each empty spot in the buffer, while semaphore “full” has a permit for each filled spot in the buffer.
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Semaphore;
public class ProdConsSemaphore {
private static int N = 10;
private static producer p = new producer(); // producer thread
private static consumer c = new consumer(); // consumer thread
private static Semaphore empty = new Semaphore(N); // start w N permits for empty spots
private static Semaphore full = new Semaphore(0); // 0 permits for filled spots
private static Semaphore mutex = new Semaphore(1) // 1 permit for mutex
private static Queue<Integer> buffer = new LinkedList<Integer>();
private static boolean done = false;
private static class producer extends Thread {
private int val = 0;
public void run() {// run method contains the thread code
while (!done) { // producer loop
try {
int item = produce_item();
empty.acquire(); // down, to get a permit for empty spot
mutex.acquire(); // down, to get mutex permit
System.out.print("p " + item + " ");
Thread.sleep(50); // 50 ms delay to slow system down
buffer.add(item); // add item to fill promised empty spot
mutex.release(); // up, to release mutex
full.release(); // up, to check in permit for filled spot
} catch (InterruptedException e) {
System.out.println("problem with producer");
}
}
}
private int produce_item() {
return ++val;
}
}
private static class consumer extends Thread {
private int ckval = 0;
public void run() {// run method contains the thread code
while (!done) { // consumer loop
try {
full.acquire(); // down, to get a permit for filled spot
mutex.acquire();
int item = buffer.remove(); // get promised filled spot item
System.out.print("c " + item + " ");
Thread.sleep(50);
mutex.release();
empty.release(); // up, to check in permit for empty sp
consume_item(item);
} catch (InterruptedException e) {
System.out.println("problem with consumer");
}
}
}
private void consume_item(int item) {
++ckval; // value we expect to get
if (item != ckval) {
System.out.println("unexpected val " + item);
}
}
}
public static void main(String args[]) {
System.out.println("starting p");
p.start(); // start the producer thread
System.out.println("starting c");
c.start(); // start the consumer thread
try {
Thread.sleep(5000); // run 5 secs
} catch (InterruptedException ex) {
}
done = true; // tell threads to quit looping
}
}
As written: consumer and producer both delay 50 ms:
starting p
starting c
p 1 p 2 p 3 p 4 p 5 p 6 p 7 p 8 p 9 p 10 (partly reformatted to show runs)
c 1 c 2 c 3 c 4
p 11 p 12 p 13 p 14 ß queue full
c 5 c 6 c 7 c 8 ßpartly emptied
p 15 p 16 p 17 p 18
c 9 c 10 c 11
…
consumer with 40 ms: faster consumer means often-empty queue:
starting p
starting c
p 1 p 2
c 1
p 3
c 2 c 3 ßqueue empty
p 4 p 5 ß a little bit filled
c 4 c 5
p 6 p 7 ßqueue empty
c 6 c 7
…
producer with 40 ms: faster producer means often-full queue:
starting p
starting c
p 1 p 2
c 1
p 3 p 4 p 5 p 6 p 7 p 8 p 9 p 10 p 11 ßfill queue up
c 2 c 3
p 12 p 13 ßqueue full again
c 4 c 5
…