Section 12.4
Threads and Networking
In the previous chapter, we looked at several examples of network programming. Those examples showed how to create network connections and communicate through them, but they didn't deal with one of the fundamental characteristics of network programming, the fact that network communication is asynchronous. From the point of view of a program on one end of a network connection, messages can arrive from the other side of the connection at any time; the arrival of a message is an event that is not under the control of the program that is receiving the message. Perhaps an event-oriented networking API would be a good approach to dealing with the asynchronous nature of network communication, but that is not the approach that is taken in Java (or, typically, in other languages). Instead, network programming in Java typically uses threads.
12.4.1 The Blocking I/O Problem
As covered in Section 11.4, network programming uses sockets. A socket, in the sense that we are using the term here, represents one end of a network connection. Every socket has an associated input stream and output stream. Data written to the output stream on one end of the connection is transmitted over the network and appears in the input stream at the other end.
A program that wants to read data from a socket's input stream calls one of that input stream's input methods. It is possible that the data has already arrived before the input method is called; in that case, the input method retrieves the data and returns immediately. More likely, however, the input method will have to wait for data to arrive from the other side of the connection. Until the data arrives, the input method and the thread that called it will be blocked.
It is also possible for an output method in a socket's output stream to block. This can happen if the program tries to output data to the socket faster than the data can be transmitted over the network. (It's a little complicated: a socket uses a "buffer" to hold data that is supposed to be transmitted over the network. A buffer is just a block of memory that is used like a queue. The output method drops its data into the buffer; lower-level software removes data from the buffer and transmits it over the network. The output method will block if the buffer fills up. Note that when the output method returns, it doesn't mean that the data has gone out over the network -- it just means that the data has gone into the buffer and is scheduled for later transmission.)
We say that network communication uses blocking I/O, because input and output operations on the network can block for indefinite periods of time. Programs that use the network must be prepared to deal with this blocking. In some cases, it's acceptable for a program to simply shut down all other processing and wait for input. (This is what happens when a command line program reads input typed by the user. User input is another type of blocking I/O.) However, threads make it possible for some parts of a program to continue doing useful work while other parts are blocked. A network client program that sends requests to a server might get by with a single thread, if it has nothing else to do while waiting for the server's responses. A network server program, on the other hand, can typically be connected to several clients at the same time. While waiting for data to arrive from a client, the server certainly has other things that it can do, namely communicate with other clients. When a server uses different threads to handle the communication with different clients, the fact that I/O with one client is blocked won't stop the server from communicating with other clients.
It's important to understand that using threads to deal with blocking I/O differs in a fundamental way from using threads to speed up computation. When using threads for speed-up in Subsection 12.3.2, it made sense to use one thread for each available processor. If only one processor is available, using more than one thread will yield no speed-up at all; in fact, it would slow things down because of the extra overhead involved in creating and managing the threads.
In the case of blocking I/O, on the other hand, it can make sense to have many more threads than there are processors, since at any given time many of the threads can be blocked. Only the active, unblocked threads are competing for processing time. In the ideal case, to keep all the processors busy, you would want to have one active thread per processor (actually somewhat less than that, on average, to allow for variations over time in the number of active threads). On a network server program, for example, threads generally spend most of their time blocked waiting for I/O operations to complete. If threads are blocked, say, about 90% of the time, you'd like to have about ten times as many threads as there are processors. So even on a computer that has just a single processor, server programs can make good use of large numbers of threads.
12.4.2 An Asynchronous Network Chat Program
As a first example of using threads for network communication, we consider a GUI chat program.
The command-line chat programs, CLChatClient.java and CLChatServer.java, from Subsection 11.4.5 use a straight-through, step-by-step protocol for communication. After a user on one side of a connection enters a message, the user must wait for a reply from the other side of the connection. An asynchronous chat program would be much nicer. In such a program, a user could just keep typing lines and sending messages without waiting for any response. Messages that arrive -- asynchronously -- from the other side would be displayed as soon as they arrive. It's not easy to do this in a command-line interface, but it's a natural application for a graphical user interface. The basic idea for a GUI chat program is to create a thread whose job is to read messages that arrive from the other side of the connection. As soon as the message arrives, it is displayed to the user; then, the message-reading thread blocks until the next incoming message arrives. While it is blocked, however, other threads can continue to run. In particular, the event-handling thread that responds to user actions keeps running; that thread can send outgoing messages as soon as the user generates them.
The GUIChat program can act as either the client end or the server end of a connection. (Refer back to Subsection 11.4.3 for information about how clients and servers work.) The program has a "Listen" button that the user can click to create a server socket that will listen for an incoming connection request; this makes the program act as a server. It also has a "Connect" button that the user can click to send a connection request; this makes the program act as a client. As usual, the server listens on a specified port number. The client needs to know the computer on which the server is running and the port on which the server is listening. There are input boxes in the GUIChat window where the user can enter this information.
Once a connection has been established between two GUIChat windows, each user can send messages to the other. The window has an input box where the user types a message. Pressing return while typing in this box sends the message. This means that the sending of the message is handled by the usual event-handling thread, in response to an event generated by a user action. Messages are received by a separate thread that just sits around waiting for incoming messages. This thread blocks while waiting for a message to arrive; when a message does arrive, it displays that message to the user. The window contains a large transcript area that displays both incoming and outgoing messages, along with other information about the network connection.
I urge you to compile the source code, GUIChat.java, and try the program. To make it easy to try it on a single computer, you can make a connection between one window and another window on the same computer, using "localhost" or "127.0.0.1" as the name of the computer. (Once you have one GUIChat window open, you can open a second one by clicking the "New" button.) I also urge you to read the source code. I will discuss only parts of it here.
The program uses a nested class, ConnectionHandler, to handle most network-related tasks. ConnectionHandler is a subclass of Thread. The ConnectionHandler thread is responsible for opening the network connection and then for reading incoming messages once the connection has been opened. By putting the connection-opening code in a separate thread, we make sure that the GUI is not blocked while the connection is being opened. (Like reading incoming messages, opening a connection is a blocking operation that can take some time to complete.) The ConnectionHandler handles opening the connection both when the program acts as a server and when it acts as a client. The thread is created when the user clicks either the "Listen" button or the "Connect" button. The "Listen" button should make the thread act as a server, while "Connect" should make it act as a client. To distinguish these two cases, the ConnectionHandler class has two constructors. Note that the postMessage() method posts a message to the transcript area of the window, where it will be visible to the user:
/** * Listen for a connection on a specified port. The constructor * does not perform any network operations; it just sets some * instance variables and starts the thread. Note that the * thread will only listen for one connection, and then will * close its server socket. */ ConnectionHandler(int port) { // For acting as the "server." state = ConnectionState.LISTENING; this.port = port; postMessage("\nLISTENING ON PORT " + port + "\n"); start(); } /** * Open a connection to a specified computer and port. The constructor * does not perform any network operations; it just sets some * instance variables and starts the thread. */ ConnectionHandler(String remoteHost, int port) { // For acting as "client." state = ConnectionState.CONNECTING; this.remoteHost = remoteHost; this.port = port; postMessage("\nCONNECTING TO " + remoteHost + " ON PORT " + port + "\n"); start(); }
Here, state is an instance variable whose type is defined by an enumerated type
enum ConnectionState { LISTENING, CONNECTING, CONNECTED, CLOSED };
The values of this enum represent different possible states of the network connection. It is often useful to treat a network connection as a state machine (see Subsection 6.4.4), since the response to various events can depend on the state of the connection when the event occurs. Setting the state variable to LISTENING or CONNECTING tells the thread whether to act as server or as a client when setting up the connection.
Once the thread has been started, it executes the following run() method:
/** * The run() method that is executed by the thread. It opens a * connection as a client or as a server (depending on which * constructor was used). */ public void run() { try { if (state == ConnectionState.LISTENING) { // Open a connection as a server. listener = new ServerSocket(port); socket = listener.accept(); listener.close(); } else if (state == ConnectionState.CONNECTING) { // Open a connection as a client. socket = new Socket(remoteHost,port); } connectionOpened(); // Sets up to use the connection (including // creating a BufferedReader, in, for reading // incoming messages). while (state == ConnectionState.CONNECTED) { // Read one line of text from the other side of // the connection, and report it to the user. String input = in.readLine(); if (input == null) connectionClosedFromOtherSide(); // Close socket and report to user. else received(input); // Report message to user. } } catch (Exception e) { // An error occurred. Report it to the user, but not // if the connection has been closed (since the error // might be the expected error that is generated when // a socket is closed). if (state != ConnectionState.CLOSED) postMessage("\n\n ERROR: " + e); } finally { // Clean up before terminating the thread. cleanUp(); } }
This method calls several other methods to do some of its work, but you can see the general outline of how it works. After opening the connection as either a server or client, the run() method enters a while loop in which it receives and processes messages from the other side of the connection until the connection is closed. It is important to understand how the connection can be closed. The GUIChat window has a "Disconnect" button that the user can click to close the connection. The program responds to this event by closing the socket that represents the connection and by setting the connection state to CLOSED. It is likely that when this happens, the connection-handling thread is blocked in the in.readLine() method, waiting for an incoming message. When the socket is closed by the GUI thread, this method will fail and will throw an exception; this exception causes the thread to terminate. (If the connection-handling thread happens to be between calls to in.readLine() when the socket is closed, the while loop will terminate because the connection state changes from CONNECTED to CLOSED.) Note that closing the window will also close the connection in the same way.
It is also possible for the user on the other side of the connection to close the connection. When that happens, the stream of incoming messages ends, and the in.readLine() on this side of the connection returns the value null, which indicates end-of-stream and acts as a signal that the connection has been closed by the remote user.
For a final look into the GUIChat code, consider the methods that send and receive messages. These methods are called from different threads. The send() method is called by the event-handling thread in response to a user action. Its purpose is to transmit a message to the remote user. (It is conceivable, though not likely, that the data output operation could block, if the socket's output buffer fills up. A more sophisticated program might take this possibility into account.) This method uses a PrintWriter, out, that writes to the socket's output stream. Synchronization of this method prevents the connection state from changing in the middle of the send operation:
/** * Send a message to the other side of the connection, and post the * message to the transcript. This should only be called when the * connection state is ConnectionState.CONNECTED; if it is called at * other times, it is ignored. */ synchronized void send(String message) { if (state == ConnectionState.CONNECTED) { postMessage("SEND: " + message); out.println(message); out.flush(); if (out.checkError()) { postMessage("\nERROR OCCURRED WHILE TRYING TO SEND DATA."); close(); // Closes the connection. } } }
The received() method is called by the connection-handling thread after a message has been read from the remote user. Its only job is to display the message to the user, but again it is synchronized to avoid the race condition that could occur if the connection state were changed by another thread while this method is being executed:
/** * This is called by the run() method when a message is received from * the other side of the connection. The message is posted to the * transcript, but only if the connection state is CONNECTED. (This * is because a message might be received after the user has clicked * the "Disconnect" button; that message should not be seen by the * user.) */ synchronized private void received(String message) { if (state == ConnectionState.CONNECTED) postMessage("RECEIVE: " + message); }
12.4.3 A Threaded Network Server
Threads are often used in network server programs. They allow the server to deal with several clients at the same time. When a client can stay connected for an extended period of time, other clients shouldn't have to wait for service. Even if the interaction with each client is expected to be very brief, you can't always assume that that will be the case. You have to allow for the possibility of a misbehaving client -- one that stays connected without sending data that the server expects. This can hang up a thread indefinitely, but in a threaded server there will be other threads that can carry on with other clients.
The DateServer.java sample program, from Subsection 11.4.4, is an extremely simple network server program. It does not use threads, so the server must finish with one client before it can accept a connection from another client. Let's see how we can turn DateServer into a threaded server. (This server is so simple that doing so doesn't make a great deal of sense. However, the same techniques will work for more complicated servers. See, for example, Exercise 12.4.)
As a first attempt, consider DateServerWithThreads.java. This sample program creates a new thread every time a connection request is received, instead of handling the connection itself by calling a subroutine. The main program simply creates the thread and hands the connection to the thread. This takes very little time, and in particular will not block. The run() method of the thread handles the connection in exactly the same way that it would be handled by the original program. This is not at all difficult to program. Here's the new version of the program, with significant changes shown in red. Note again that the constructor for the connection thread does very little and in particular cannot block; this is very important since the constructor runs in the main thread:
import java.net.*; import java.io.*; import java.util.Date; /** * This program is a server that takes connection requests on * the port specified by the constant LISTENING_PORT. When a * connection is opened, the program sends the current time to * the connected socket. The program will continue to receive * and process connections until it is killed (by a CONTROL-C, * for example). * * This version of the program creates a new thread for * every connection request. */ public class DateServerWithThreads { public static final int LISTENING_PORT = 32007; public static void main(String[] args) { ServerSocket listener; // Listens for incoming connections. Socket connection; // For communication with the connecting program. /* Accept and process connections forever, or until some error occurs. */ try { listener = new ServerSocket(LISTENING_PORT); System.out.println("Listening on port " + LISTENING_PORT); while (true) { // Accept next connection request and create thread to handle it. connection = listener.accept(); ConnectionHandler handler = new ConnectionHandler(connection); handler.start(); } } catch (Exception e) { System.out.println("Sorry, the server has shut down."); System.out.println("Error: " + e); return; } } // end main() /** * Defines a thread that handles the connection with one * client. */ private static class ConnectionHandler extends Thread { Socket client; // The connection to the client. ConnectionHandler(Socket socket) { client = socket; } public void run() { // (code copied from the original DateServer program) String clientAddress = client.getInetAddress().toString(); try { System.out.println("Connection from " + clientAddress ); Date now = new Date(); // The current date and time. PrintWriter outgoing; // Stream for sending data. outgoing = new PrintWriter( client.getOutputStream() ); outgoing.println( now.toString() ); outgoing.flush(); // Make sure the data is actually sent! client.close(); } catch (Exception e){ System.out.println("Error on connection with: " + clientAddress + ": " + e); } } } } //end class DateServerWithThreads
One interesting change is at the end of the run() method, where I've added the clientAddress to the output of the error message. I did this to identify which connection the error message refers to. Since threads run in parallel, it's possible for outputs from different threads to be intermingled in various orders. Messages from the same thread don't necessarily come together in the output; they might be separated by messages from other threads. This is just one of the complications that you have to keep in mind when working with threads!
12.4.4 Using a Thread Pool
It's not very efficient to create a new thread for every connection, especially when the connections are typically very short-lived. Fortunately, we have an alternative: thread pools (Subsection 12.3.2).
DateServerWithThreadPool.java is an improved version of our server that uses a thread pool. Each thread in the pool runs in an infinite loop. Each time through the loop, it handles one connection. We need a way for the main program to send connections to the threads. It's natural to use a blocking queue named connectionQueue for that purpose. A connection-handling thread takes connections from this queue. Since it is a blocking queue, the thread blocks when the queue is empty and wakes up when a connection becomes available in the queue. No other synchronization or communication technique is needed; it's all built into the blocking queue. Here is the run() method for the connection-handling threads:
public void run() { while (true) { Socket client; try { client = connectionQueue.take(); // Blocks until item is available. } catch (InterruptedException e) { continue; // (If interrupted, just go back to start of while loop.) } String clientAddress = client.getInetAddress().toString(); try { System.out.println("Connection from " + clientAddress ); System.out.println("Handled by thread " + this); Date now = new Date(); // The current date and time. PrintWriter outgoing; // Stream for sending data. outgoing = new PrintWriter( client.getOutputStream() ); outgoing.println( now.toString() ); outgoing.flush(); // Make sure the data is actually sent! client.close(); } catch (Exception e){ System.out.println("Error on connection with: " + clientAddress + ": " + e); } } }
The main program, in the meantime, runs in an infinite loop in which connections are accepted and added to the queue:
while (true) { // Accept next connection request and put it in the queue. connection = listener.accept(); try { connectionQueue.put(connection); // Blocks if queue is full. } catch (InterruptedException e) { } }
The queue in this program is of type ArrayBlockingQueue<Socket>. As such, it has a limited capacity, and the put() operation on the queue will block if the queue is full. But wait -- didn't we want to avoid blocking the main program? When the main program is blocked, the server is no longer accepting connections, and clients who are trying to connect are kept waiting. Would it be better to use a LinkedBlockingQueue, with an unlimited capacity?
In fact, connections in the blocking queue are waiting anyway; they are not being serviced. If the queue grows unreasonably long, connections in the queue will have to wait for an unreasonable amount of time. If the queue keeps growing indefinitely, that just means that the server is receiving connection requests faster than it can process them. That could happen for several reasons: Your server might simply not be powerful enough to handle the volume of traffic that you are getting; you need to buy a new server. Or perhaps the thread pool doesn't have enough threads to fully utilize your server; you should increase the size of the thread pool to match the server's capabilities. Or maybe your server is under a "Denial Of Service" attack, in which some bad guy is deliberately sending your server more requests than it can handle in an attempt to keep other, legitimate clients from getting service.
In any case, ArrayBlockingQueue with limited capacity is the correct choice. The queue should be short enough so that connections in the queue will not have to wait too long for service. In a real server, the size of the queue and the number of threads in the thread pool should be adjusted to "tune" the server to account for the particular hardware and network on which the server is running and for the nature of the client requests that it typically processes. Optimal tuning is, in general, a difficult problem.
There is, by the way, another way that things can go wrong: Suppose that the server needs to read some data from the client, but the client doesn't send the expected data. The thread that is trying to read the data can then block indefinitely, waiting for the input. If a thread pool is being used, this could happen to every thread in the pool. In that case, no further processing can ever take place! The solution to this problem is to have connections "time out" if they are inactive for an excessive period of time. Typically, each connection thread will keep track of the time when it last received data from the client. The server runs another thread (sometimes called a "reaper thread", after the Grim Reaper) that wakes up periodically and checks each connection thread to see how long it has been inactive. A connection thread that has been waiting too long for input is terminated, and a new thread is started in its place. The question of how long the timeout period should be is another difficult tuning issue.
12.4.5 Distributed Computing
We have seen how threads can be used to do parallel processing, where a number of processors work together to complete some task. So far, we have assumed that all the processors were inside one multi-processor computer. But parallel processing can also be done using processors that are in different computers, as long as those computers are connected to a network over which they can communicate. This type of parallel processing -- in which a number of computers work together on a task and communicate over a network -- is called distributed computing.
In some sense, the whole Internet is an immense distributed computation, but here I am interested in how computers on a network can cooperate to solve some computational problem. There are several approaches to distributed computing that are supported in Java. RMI and CORBA are standards that enable a program running on one computer to call methods in objects that exist on other computers. This makes it possible to design an object-oriented program in which different parts of the program are executed on different computers. RMI (Remote Method Invocation) only supports communication between Java objects. CORBA (Common Object Request Broker Architecture) is a more general standard that allows objects written in various programming languages, including Java, to communicate with each other. As is commonly the case in networking, there is the problem of locating services (where in this case, a "service" means an object that is available to be called over the network). That is, how can one computer know which computer a service is located on and what port it is listening on? RMI and CORBA solve this problem using a "request broker" -- a server program running at a known location keeps a list of services that are available on other computers. Computers that offer services register those services with the request broker; computers that need services must know the location of the broker, and they contact it to find out what services are available and where they are located.
RMI and CORBA are complex systems that are not very easy to use. I mention them here because they are part of Java's standard network API, but I will not discuss them further. Instead, we will look at a relatively simple demonstration of distributed computing that uses only basic networking.
The problem that we will consider is the same one that we used in MultiprocessingDemo1.java, and its variations, in Section 12.2 and Section 12.3, namely the computation of a complex image. This is an application that uses the simplest type of parallel programming, in which the problem can be broken down into tasks that can be performed independently, with no communication between the tasks. To apply distributed computing to this type of problem, we can use one "master" program that divides the problem into tasks and sends those tasks over the network to "worker" programs that do the actual work. The worker programs send their results back to the master program, which combines the results from all the tasks into a solution of the overall problem. In this context, the worker programs are often called "slaves," and the program uses the so-called master/slave approach to distributed computing.
The demonstration program is defined by three source code files: CLMandelbrotMaster.java defines the master program; CLMandelbrotWorker.java defines the worker programs; and CLMandelbrotTask.java defines the class, CLMandelbrotTask, that represents an individual task that is performed by the workers. The master divides the overall problem into a collection of tasks; it distributes those tasks to the workers that will execute the tasks and send the results back to the master; and the master applies the results from all the individual tasks to the overall problem.
To run the demonstration, you must first start the CLMandelbrotWorker program on several computers (probably by running it on the command line). This program uses CLMandelbrotTask, so both class files, CLMandelbrotWorker.class and CLMandelbrotTask.class, must be present on the worker computers. You can then run CLMandelbrotMaster on the master computer. Note that this program also requires the class CLMandelbrotTask. You must specify the host name or IP address of each of the worker computers as command line arguments for CLMandelbrotMaster. The worker programs listen for connection requests from the master program, and the master program must be told where to send those requests. For example, if the worker program is running on three computers with IP addresses 172.21.7.101, 172.21.7.102, and 172.21.7.103, then you can run CLMandelbrotMaster with the command
java CLMandelbrotMaster 172.21.7.101 172.21.7.102 172.21.7.103
The master will make a network connection to the worker at each IP address; these connections will be used for communication between the master program and the workers.
It is possible to run several copies of CLMandelbrotWorker on the same computer, but they must listen for network connections on different ports. It is also possible to run CLMandelbrotWorker on the same computer as CLMandelbrotMaster. You might even see some speed-up when you do this, if your computer has several processors. See the comments in the program source code files for more information, but here are some commands that you can use to run the master program and two copies of the worker program on the same computer. Give these commands in separate command windows:
java CLMandelbrotWorker (Listens on default port) java CLMandelbrotWorker 2501 (Listens on port 2501) java CLMandelbrotMaster localhost localhost:2501
Every time CLMandelbrotMaster is run, it solves exactly the same problem. (For this demonstration, the nature of the problem is not important, but the problem is to compute the data needed for a picture of a small piece of the famous "Mandelbrot Set." If you are interested in seeing the picture that is produced, uncomment the call to the saveImage() method at the end of the main() routine in CLMandelbrotMaster.java.)
You can run CLMandelbrotMaster with different numbers of worker programs to see how the time required to solve the problem depends on the number of workers. (Note that the worker programs continue to run after the master program exits, so you can run the master program several times without having to restart the workers.) In addition, if you run CLMandelbrotMaster with no command line arguments, it will solve the entire problem on its own, so you can see how long it takes to do so without using distributed computing. In a trial that I ran on some very old, slow computers, it took 40 seconds for CLMandelbrotMaster to solve the problem on its own. Using just one worker, it took 43 seconds. The extra time represents extra work involved in using the network; it takes time to set up a network connection and to send messages over the network. Using two workers (on different computers), the problem was solved in 22 seconds. In this case, each worker did about half of the work, and their computations were performed in parallel, so that the job was done in about half the time. With larger numbers of workers, the time continued to decrease, but only up to a point. The master program itself has a certain amount of work to do, no matter how many workers there are, and the total time to solve the problem can never be less than the time it takes for the master program to do its part. In this case, the minimum time seemed to be about five seconds.
Let's take a look at how this distributed application is programmed. The master program divides the overall problem into a set of tasks. Each task is represented by an object of type CLMandelbrotTask. These tasks have to be communicated to the worker programs, and the worker programs must send back their results. Some protocol is needed for this communication. I decided to use character streams. The master encodes a task as a line of text, which is sent to a worker. The worker decodes the text (into an object of type CLMandelbrotTask) to find out what task it is supposed to perform. It performs the assigned task. It encodes the results as another line of text, which it sends back to the master program. Finally, the master decodes the results and combines them with the results from other tasks. After all the tasks have been completed and their results have been combined, the problem has been solved.
A worker receives not just one task, but a sequence of tasks. Each time it finishes a task and sends back the result, it is assigned a new task. After all tasks are complete, the worker receives a "close" command that tells it to close the connection. In CLMandelbrotWorker.java, all this is done in a method named handleConnection() that is called to handle a connection that has already been opened to the master program. It uses a method readTask() to decode a task that it receives from the master and a method writeResults() to encode the results of the task for transmission back to the master. It must also handle any errors that occur:
private static void handleConnection(Socket connection) { try { BufferedReader in = new BufferedReader( new InputStreamReader( connection.getInputStream()) ); PrintWriter out = new PrintWriter(connection.getOutputStream()); while (true) { String line = in.readLine(); // Message from the master. if (line == null) { // End-of-stream encountered -- should not happen. throw new Exception("Connection closed unexpectedly."); } if (line.startsWith(CLOSE_CONNECTION_COMMAND)) { // Represents the normal termination of the connection. System.out.println("Received close command."); break; } else if (line.startsWith(TASK_COMMAND)) { // Represents a CLMandelbrotTask that this worker is // supposed to perform. CLMandelbrotTask task = readTask(line); // Decode the message. task.compute(); // Perform the task. out.println(writeResults(task)); // Send back the results. out.flush(); // Make sure data is sent promptly! } else { // No other messages are part of the protocol. throw new Exception("Illegal command received."); } } } catch (Exception e) { System.out.println("Client connection closed with error " + e); } finally { try { connection.close(); // Make sure the socket is closed. } catch (Exception e) { } } }
Note that this method is not executed in a separate thread. The worker has only one thing to do at a time and does not need to be multithreaded.
Turning to the master program, CLMandelbrotMaster.java, we encounter a more complex situation. The master program must communicate with several workers over several network connections. To accomplish this, the master program is multi-threaded, with one thread to manage communication with each worker. A pseudocode outline of the main() routine is quite simple:
create a list of all tasks that must be performed if there are no command line arguments { // The master program does all the tasks itself. Perform each task. } else { // The tasks will be performed by worker programs. for each command line argument: Get information about a worker from command line argument. Create and start a thread to communicate with the worker. // The threads do all the work... Wait for all threads to terminate. } // All tasks are now complete (assuming no error occurred).
The tasks are put into a variable of type ConcurrentBlockingQueue<CLMandelbrotTask> named tasks (see Subsection 12.3.2.) The communication threads take tasks from this list and send them to worker programs. The method tasks.poll() is used to remove a task from the queue. If the queue is empty, it returns null, which acts as a signal that all tasks have been assigned and the communication thread can terminate.
The job of a thread is to send a sequence of tasks to a worker thread and to receive the results that the worker sends back. The thread is also responsible for opening the connection in the first place. A pseudocode outline for the process executed by the thread might look like:
Create a socket connected to the worker program. Create input and output streams for communicating with the worker. while (true) { Let task = tasks.poll(). If task == null break; // All tasks have been assigned. Encode the task into a message and transmit it to the worker. Read the response from the worker. Decode and process the response. } Send a "close" command to the worker. Close the socket.
This would work OK. However, there are a few subtle points. First of all, the thread must be ready to deal with a network error. For example, a worker might shut down unexpectedly. But if that happens, the master program can continue, provided other workers are still available. (You can try this when you run the program: Stop one of the worker programs, with CONTROL-C, and observe that the master program still completes successfully.) A difficulty arises if an error occurs while the thread is working on a task: If the problem as a whole is going to be completed, that task will have to be reassigned to another worker. I take care of this by putting the uncompleted task back into the task list. (Unfortunately, my program does not handle all possible errors. If the last worker thread fails, there will be no one left to take over the uncompleted task. Also, if a network connection "hangs" indefinitely without actually generating an error, my program will also hang, waiting for a response from a worker that will never arrive. A more robust program would have some way of detecting the problem reassigning the task.)
Another defect in the procedure outlined above is that it leaves the worker program idle while the thread is processing the worker's response. It would be nice to get a new task to the worker before processing the response from the previous task. This would keep the worker busy and allow two operations to proceed simultaneously instead of sequentially. (In this example, the time it takes to process a response is so short that keeping the worker waiting while it is done probably makes no significant difference. But as a general principle, it's desirable to have as much parallelism as possible in the algorithm.) We can modify the procedure to take this into account:
try { Create a socket connected to the worker program. Create input and output streams for communicating with the worker. Let currentTask = tasks.poll(). Encode currentTask into a message and send it to the worker. while (true) { Read the response from the worker. Let nextTask = tasks.poll(). If nextTask != null { // Send nextTask to the worker before processing the // response to currentTask. Encode nextTask into a message and send it to the worker. } Decode and process the response to currentTask. currentTask = nextTask. if (currentTask == null) break; // All tasks have been assigned. } Send a "close" command to the worker. Close the socket. } catch (Exception e) { Put uncompleted task, if any, back into the task queue. } finally { Close the connection. }
Finally, here is how this translates into Java. The pseudocode presented above becomes the run() method in the class that defines the communication threads used by the master program:
/** * This class represents one worker thread. The job of a worker thread * is to send out tasks to a CLMandelbrotWorker program over a network * connection, and to get back the results computed by that program. */ private static class WorkerConnection extends Thread { int id; // Identifies this thread in output statements. String host; // The host to which this thread will connect. int port; // The port number to which this thread will connect. /** * The constructor just sets the values of the instance * variables id, host, and port and starts the thread. */ WorkerConnection(int id, String host, int port) { this.id = id; this.host = host; this.port = port; start(); } /** * The run() method of the thread opens a connection to the host and * port specified in the constructor, then sends tasks to the * CLMandelbrotWorker program on the other side of that connection. * If the thread terminates normally, it outputs the number of tasks * that it processed. If it terminates with an error, it outputs * an error message. */ public void run() { int tasksCompleted = 0; // How many tasks has this thread handled. Socket socket; // The socket for the connection. try { socket = new Socket(host,port); // open the connection. } catch (Exception e) { System.out.println("Thread " + id + " could not open connection to " + host + ":" + port); System.out.println(" Error: " + e); return; } CLMandelbrotTask currentTask = null; CLMandelbrotTask nextTask = null; try { PrintWriter out = new PrintWriter(socket.getOutputStream()); BufferedReader in = new BufferedReader( new InputStreamReader(socket.getInputStream()) ); currentTask = tasks.poll(); if (currentTask != null) { // Send first task to the worker program. String taskString = writeTask(currentTask); out.println(taskString); out.flush(); } while (currentTask != null) { String resultString = in.readLine(); // Get results for currentTask. if (resultString == null) throw new IOException("Connection closed unexpectedly."); if (! resultString.startsWith(RESULT_COMMAND)) throw new IOException("Illegal string received from worker."); nextTask = tasks.poll(); // Get next task and send it to worker. if (nextTask != null) { // Send nextTask to worker before processing results for // currentTask, so that the worker can work on nextTask // while the currentTask results are processed. String taskString = writeTask(nextTask); out.println(taskString); out.flush(); } readResults(resultString, currentTask); finishTask(currentTask); // Process results from currentTask. tasksCompleted++; currentTask = nextTask; // We are finished with old currentTask. nextTask = null; } out.println(CLOSE_CONNECTION_COMMAND); // Send close command to worker. out.flush(); } catch (Exception e) { System.out.println("Thread " + id + " terminated because of an error"); System.out.println(" Error: " + e); e.printStackTrace(); // Put uncompleted tasks, if any, back into the task list. if (currentTask != null) reassignTask(currentTask); if (nextTask != null) reassignTask(nextTask); } finally { System.out.println("Thread " + id + " ending after completing " + tasksCompleted + " tasks"); try { socket.close(); } catch (Exception e) { } } } //end run() } // end nested class WorkerConnection