CS444 Example of Monitor in Java, using the synchronized keyword

Monitor: Package of code with encapsulated data and its own special built-in mutex ensuring that only one thread at a time can execute inside it. Other threads wait for their turn.  Not available in C or C++.

Example of Monitor in Java.  In Java, we have the “synchronized” keyword to cause an object to use its built-in mutex to ensure that only one thread at a time can execute in any of its synchronized methods.  LinkedList is not itself synchronized, so it can be corrupted by interleaving operations. Its fields can be protected from simultaneous access by multiple threads by our use of “synchronized” on wrapper methods, as follows. Here the mutex is built into the Monitor0 object.

import java.util.concurrent.Semaphore;

 

public class ProdConsSemaphore1 {

    private static int N = 10;

    private static producer p = new producer(); // producer thread

    private static consumer c = new consumer(); // consumer thread

    private static BlockingQueue sharedBuffer = new BlockingQueue();

    private static boolean done = false;

 

    private static class producer extends Thread {

        private int val = 0;

 

        public void run() {// run method contains the thread code

            int item;

            while (!done) { // producer loop

                try {

                    item = produce_item();

                    System.out.print("i " + item + " ");

                    sharedBuffer.insert(item);

                } catch (InterruptedException e) {

                    System.out.println("problem with insert");

                }

            }

        }

 

        private int produce_item() {

            return ++val;

        }

    }

 

    private static class consumer extends Thread {

        private int ckval = 0;

 

        public void run() {// run method contains the thread code

            int item;

            while (!done) { // consumer loop

                try {

                    item = sharedBuffer.remove();

                    System.out.print("c " + item + " ");

                    consume_item(item);

                } catch (InterruptedException e) {

                    System.out.println("problem with insert");

                }

            }

        }

 

        private void consume_item(int item) {

            ++ckval; // value we expect to get

            if (item != ckval) {

                System.out.println("unexpected val " + item);

            }

        }

    }

 

    // not itself a monitor, but uses a monitor for the buffer.

    private static class BlockingQueue {

        private Monitor0 buffer = new Monitor0();

        private Semaphore empty = new Semaphore(N);

        private Semaphore full = new Semaphore(0);

 

        public boolean insert(int val) throws InterruptedException {

            empty.acquire();

            buffer.insert(val);

            full.release();

            return true;

        }

 

        public int remove() throws InterruptedException {

            int val;

            full.acquire();

            val = buffer.remove();

            empty.release();

            return val;

        }

    }

 

    // a monitor, that is, an encapsulated object that only one thread at a

    // time can execute inside of.

    private static class Monitor0 {

        private Queue<Integer> buffer = new LinkedList<Integer>();

        // or use buffer = Collections.synchronizedList(new LinkedList(...));

 

        public synchronized boolean insert(int val) {

            if (buffer.size() == N)

                return false; // fail if full

            buffer.add(val);

            return true;

        }

 

        // return positive number, or -1 if nothing is there

        public synchronized int remove() {

            int val;

            if (buffer.size() == 0)

                return -1; // fail if empty

            val = buffer.remove();

            return val;

        }

    }

 

    public static void main(String args[]) {

        System.out.println("starting p");

        p.start(); // start the producer thread

        System.out.println("starting c");

        c.start(); // start the consumer thread

        try {

            Thread.sleep(5000); // 5 secs

        }

        catch (InterruptedException ex) {

        }

        done = true; // tell threads to stop looping

    }

}