/* A Brief Introduction to Concurrent Programming in Java Chuck Liang Hofstra University Computer Science No serious study of distributed programming can avoid its sibling: concurrent programming. Distributed programming involves independent processes running on independent platforms and environments, whereas concurrent programming refers to multi-tasking processes or threads running on the same system. Often distributed processes are reflected locally as independent threads of a single program. For example, suppose I write a chat program that allows two remote users to see what each other type. Suppose also that I've created a distributed programming environment (perhaps using RMI) so that all I have to do is to call a remote method "send" in order to send what I typed to the other side, and similarly call "receive" to order to receive a message. But this is still not enough for creating a distributed application. Each side of the application still lack the ability to send and receive at the same time! That is, suppose I wrote my chat program with a simple loop: while (!done) { yourmessage = receive(); send(mymessage); } This would clearly not work, since I will not be able to send anything unless I get a message from you first! That is, the two users of this program must thus alternate between typing and reading. They can't "chat" simultaneously! The solution to this problem is for the application to create two separate but LOCAL threads: one for sending anything new I typed and one for receiving and displaying any new incomming messages. The Java programming language makes concurrent programming simple with an array of built-in support. To create a "thread", all one has to do is to write a class that "implements" the Runnable interface. Such a class must implement a method void run(); Now, whenever an instance of this class is created, it can be used to "start" a new thread, which will execute the code found in its run procedure. The following sample code produces two threads, each printing a different message to System.out. It is simple enough to illustrate the basic thread creation mechanism. Use it as a template for creating other threads. class mythread implements Runnable // thread class { private String s; public mythread(String s0) {s=s0;} public void run() { while (true) System.out.println(s); } } public class javaThreads { public static void main(String[] args) { Thread t1, t2; t1 = new Thread(new mythread("THREAD A")); // thread creation t2 = new Thread(new mythread("thread B")); t1.start(); // thread activation t2.start(); } // main } The seperate threads are executed "concurrently" through multi-tasking. They otherwise behave like any other pair of objects in a program. The specific thread class may also contain other methods than "run". Simple? Not so fast! Since the threads are part of the same program, they can share access to the same data. Suppose they tried to modify a common variable x by each executing x++; Depending on the exact machine-language instruction at which the system decides to multitask, the x may be incremented by once or twice! The program becomes unpredictable. This is called a "critical section" - I.E. a portion of a threads code that it must synchronize with other threads in the program, a portion that it must be able to execute to completion before the system switches to another thread. Fortunately, Java also provides easy-to-use tools to accomplish this synchronization. Every method in a class (not necessarily Runnable) can be tagged as "synchronized", as in public synchronized void incx() { x++; } When a thread calls a synchronized method, it effectively "owns" the object that the method belongs to. No other thread can call a synchronized method of the same object (be it incx or something else) until this thread has exited the synchronized method it's calling. Any thread that attempts to do so will be blocked or suspended by the system. A thread exits a synchronized method in one of the following ways: 1. When it naturally reaches the end of the body of the method 2. When it calls wait(), which can throw an InterruptedException. A thread that calls wait() relinquishes ownership of the object and allows other threads to "come in" and perform synchronized operations. The waiting thread will wake up when some other thread calls notify(); or notifyAll() on the same object. A subtlety must be noted here: when a waiting thread is notified, it does not immediately resume execution. Rather, the system places the thread on a queue. It must still wait for other treads in front of it to finish executing synchronized code before executing. In other words, when we notify a waiting thread, we are allowing the thread to again COMPETE for ownership of the object, but that doesn't mean it'll get it right away! As the name suggests, notifyAll() will inform all threads waiting on an object to start competing for ownership again. notify() will merely waken one (which one may depend on the system's scheduling algorithm, but it's usually first-in, first out). 3. You may also call Thread.sleep(int millesconds) (again throws exception) to momentarily suspend the thread. After the indicated time has elapsed, it would be as if the thread was notified automatically. There're a few other ways to suspend a thread, such as yield(), but I won't enumerate them. These tools can be used to precisely synchronize the behavior of threads. These mechanisms also make it easy to implement in Java what are called "Hoare Monitors" - an extremely flexible synchronization device. However, I will not go into too much detail immediately - I encourage you to consult an operating systems text or instructor for further information. In the following program, I use java concurrency mechanisms to write a solution the famous "producer-consumer" problem. That is, a producer tries to insert items while a consumer tries to take out items from a shared, bounded buffer (implemented below using a circular queue array). A consumer must wait if the queue is empty, and a producer must wait if the queue is full. I will leave it up to you to make it into a RMI class. RMI servers automatically create seperate threads of execution when a client calls a method, so that no client can "clog" the server by simply causing it to go into a loop. However, synchronization mechanisms still must be provided by you! In a RMI distributed setting, producers and consumers will be processes running on different systems on a network - but this distributivity must still be mirrored by the server locally! */ class BoundedBuffer { private int[] buffer; // using a cicular queue for buffer private int front, tail, size, max; // consume from front, produce at tail. size indicates current // size, max is max size (of array). public BoundedBuffer(int m) { max = m; buffer = new int[max]; front = tail = size = 0; } // produce must wait if buffer is full: public synchronized void produce(int x) { try{ while (size==max) wait(); // why "while" and not just "if" ? buffer[tail] = x; tail = (tail+1) % max; // move pointer forward in circular queue size++; if (size==1) notifyAll(); // there could be consumers waiting! } catch (InterruptedException ie) {ie.printStackTrace();} } // consume must wait if buffer empty public synchronized int consume() { int result = 0; try{ while (size==0) wait(); result = buffer[front]; front = (front+1) % max; size--; if (size==max-1) notifyAll(); // wake any waiting producers } catch (InterruptedException ie) {ie.printStackTrace();} return result; } } // BoundedBuffer // producer thread - replaceable with RMI client class producer implements Runnable { private int x = 0; // value to be "produced" private BoundedBuffer buf; // pointer to shared buffer object public producer(BoundedBuffer b) { buf = b; } public void run() { while (true) buf.produce(x++); } } // producer // consumer thread class class consumer implements Runnable { private int x = 0; private BoundedBuffer buf; // pointer to shared buffer object public consumer(BoundedBuffer b) { buf = b; } public void run() { while (true) System.out.println(buf.consume()); } } // consumer public class concurrency // makes 1 producer and 2 consumers { public static void main(String[] args) { BoundedBuffer buf = new BoundedBuffer(10); // size 10 buffer Thread t1, t2, t3; t1 = new Thread(new producer(buf)); // create thread objects t2 = new Thread(new consumer(buf)); t3 = new Thread(new consumer(buf)); t1.start(); t2.start(); t3.start(); // start threads } // main }