Controlling the queue with ThreadPoolExecutor

The previous page showed a skeleton of a simple server with ThreadPoolExecutor. We used a common paradigm, in which one thread continually sits waiting to accept connections; these connections are then each farmed off to be executed by the next available thread. Now, one problem that can occur is if we get a large volume of incoming connections so that the available threads can't proess them fast enough. In this case, the connections waiting to be processed will be queued. But we haven't put any bounds on the queue, so that in the worst case, they will just continue to "pile up". If connections aren't being processed fast enough because the server is overloaded or "has a problem", then we're not going to help matters by piling up an endless number of connections that the server doesn't have a realistic chance of processing. At some point, we need to accept that "the server is busy" and drop further connections until things have calmed down.

To achieve this goal, we need to:

Specifying a queue with maximum capacity

In our initial example, for convenience, we just used the Executors helper class to construct a thread pool with default options. However, if we constract a ThreadPoolExecutor object directly via its constructor, we can specify various additional parameters including the implementation of BlockingQueue that we wish to use as the job queue. In this case, we can use an ArrayBlockingQueue or LinkedBlockingQueue with a maximum capacity. The queue is declared to take objects of type Runnable, since this is what the thread pool deals with:

BlockingQueue q = new ArrayBlockingQueue(20);
ThreadPoolExecutor ex = new ThreadPoolExecutor(4, 10, 20, TimeUnit.SECONDS, q);

Note a side effect of specifying our own queue is that we must specify the maximum number of threads (10 in this case) and the time-to-live of idle threads (20 seconds in this case). As the number of simultaneous connections grows, the thread pool will automatically expand the number of threads up to this maximum. When the number of connections (and hence threads needed) decreases, the thread pool will "kill" each spare thread after it has been sitting idle for 20 seconds, until we're down to our "core" size of 4 threads (the first parameter).

If you specify your own job queue, be careful not to post jobs "manually" to the queue (using the regular queue methods). If you do so, the job will not be picked up by the thread pool. Always use ThreadPoolExecutor.execute() even though it's "your own queue".

Rejected execution handlers and RejectedExecutionException

With an upper bound on our queue size, the other issue we need to deal with is what happens if a job isn't executed because the queue is full. In this case, we'll be left with a "dangling" socket that we should close as soon as possible. By default, we can handle the full queue situation by catching RejectedExecutionException:

while (!shutDownRequested()) {
  Socket s = null;
  try {
    s = ss.accept();
    exec.execute(new ConnectionRunnable(s));
  } catch (RejectedExecutionException rej) {
    try { s.close(); } catch (Exception ignore) {}
  } catch (Exception e) {
    // ... log
  }
}

Another way to handle closing the socket is to pass a RejectedExecutionHandler into the constructor of our ThreadPoolExecutor. RejectedExecutionHandler is an interface that specifies one method that we must define:

public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
  ...
}

Then, instead of throwing an exception, if the connection job won't fit on the queue, the ThreadPoolExecutor will call our rejectedExecution() method instead. Whether you catch the exception or define a separate handler essentially depends on which makes your design easier— for example, you could have a single rejection handler shared by multiple executors.


If you enjoy this Java programming article, please share with friends and colleagues. Follow the author on Twitter for the latest news and rants.

Editorial page content written by Neil Coffey. Copyright © Javamex UK 2021. All rights reserved.