BlockingQueue from java.util.concurrent package

There are times when you want, for performance reasons, one thread to produce objects and another one to consume it in parallel. You can definitely achieve this with the bare-bones thread API. But it might get a little messy depending on your exact use case (all that synchronize/wait/notify could drive you insane). And then what if the producer produces too fast and the consumer consumes too slow. Now you have too many objects queuing up in memory all waiting to get processed. Fear not – JDK Concurrent API is here to help.

The java.util.concurrent package contains the BlockingQueue interface and a set of implementations of this interface. In our case we want to ensure that one thread produces objects while the other continuously consumes it. But we do not want too many objects to collect in memory if the threads get out of sync in terms of response times. BlockingQueue implementation ArrayBlockingQueue allows you to set a max size of the internal array thereby allowing you to control the maximum number of objects at any given time on the queue. Once the limit is reached subsequent ‘put’ calls will block until space becomes available. On the consumer side we can use the ‘take’ method to pull objects out of the queue. If there are no objects on the queue the consuming thread blocks until one becomes available.

To end the consuming thread you will need some sort of handshake between the two threads. In the example I use a special indicator to inform the consumer that it can shutdown.

Here is the Producer code:

The Consumer code is:

The test driver is:

As you can see in the Driver class, I have set the maximum queue capacity at two objects. This will allow the producer to put at-most 2 objects on the queue before it blocks. In the example here I use only one consumer, you can however many consumer threads as you wish.  We could also have a design where the consumer threads are implemented using the ExecutorService. The ExecutorService can be configured with a predefined thread pool size. As each request comes in, it can be submitted to the ExecutorService for execution.

Enjoy.