Tuesday, September 18, 2012

Using 0MQ to communicate between threads

In this post I show how 0MQ can help with concurrency in a multithreaded program. To do this, I explore what concurrency means and why it is important. I then focus on in-process concurrency and threaded programming, a topic which is notoriously tricky to do well due to the need to share some kind of state between threads. I explore why this is and how this is typically tackled. I will then show how communication between threads can be achieved without sharing any state using 0MQ. Finally I propose that by constructing our multi-threaded applications using the 0MQ model, that this leads us to more succinct and simpler code.

All code can be found in this github project

What is a concurrent program?

The word concurrent means more than one thing working together to achieve a common goal. In computing this means doing one of two things; something which is computationally expensive, like encoding a video file, or something that requires some sort of IO, like retrieving the size of a number of web pages.

The opportunity to employ concurrency has exploded with the arrival of multicore processors and the rise of hosted processing platforms like Amazon EC2 and Windows Azure. These two changes represent the two ends of the concurrency spectrum. To achieve concurrency on a multicore processor we create threads within our application and manage how they will share state. Whereas achieving concurrency using something like EC2 is network based and requires the use of a communication channel like TCP. When communicating over the network, state is handled by passing messages.

0MQ recognises that the best way to create a concurrent program is to pass messages and not to share state. Whether it is two threads running within a process or thousands of processes running across the internet, 0MQ uses the same model of sockets and messaging to create very stable and scalable applications.

Multiple threads shared state and locks

In .Net any program that must do more than one task at a time must create a thread. Threads are a way for Windows to abstract the management of many different streams of execution. Each thread gets it’s own stack and set of registers. The OS will then handle which thread is to be executed at one time.

The problem with threads is that when they have to communicate with each other the typical way is to share some value in memory. This can cause data corruption as more than one thread could be accessing the data at one time, so the application has to manage access to the shared data. This is done by locking the shared data, ensuring that only one thread can manipulate it at any one time. This mechanism adds complexity to an application as it must include the locking logic. It also has an effect on performance.

0MQ multiple threads and no shared state

0MQ makes threaded programming simpler by swapping shared state for messaging. To demonstrate this I have created a simple program which calculates the size of a directory by adding up the size of each file it has.

As we are using 0MQ we have to understand some of the concepts it uses. The first concept is static and dynamic components. Static components are pieces of infrastructure that we can always expect to be there. They usually own an endpoint which can be bound to. Dynamic components come and go and generally bind to endpoints. The next concept is the types of sockets provided by 0MQ. The implementation we’ll be looking at uses two types of sockets, PUSH and PULL. The PUSH socket is designed to distribute the work fairly to all connected clients, whilst the PULL socket collects results evenly from the workers. Using these socket types prevents one thread from being flooded with tasks or left idle waiting for it’s result to be taken.

Finally the 0MQ guide has a number of patterns for composing an application depending on the type of work being done. The example below calculates the size of a directory by getting the size of each file and adding them together. To achieve this task in 0MQ, a good choice is the task ventilator pattern.


In the diagram each box is a component in our application and components communicate with each other using 0MQ sockets. There are two static components in this application, the Ventilator and the Sink. There will only be one instance of each in the application and they will run on the same thread. There is one dynamic component, the Worker. There can be any number of workers and each one runs on it’s own thread.

To calculate the size of the directory, the Ventilator is given a list of files from the directory. It sends the name of each one out on it’s message queue.

When the Sink is started, it is given a number of files to count the size of, in this instance we pass in the length of the array that we passed to the Ventilator. The Sink then pulls in the results from each of the workers and increments the running total for the size of the directory. When it has finished it returns the total size of the files found.

The Worker connects to the Ventilator and Sink end points and sits in an endless loop.

When a message arrives from the Ventilator it triggers an event which causes the Worker to read the file from the disk to find its size. When the operation completes the Worker publishes the size to the Sink’s end point.

All the components are brought together in the controlling program. We create a 0MQ context which will be shared with all the components. This is an important point when using 0MQ with threads, there must be a single context and it must be shared amongst all the threads. We then create instances of the Ventilator and Sink passing in the context.

Next we create five workers each on their own thread, again passing in the 0MQ context.

We do the work by building an array of files from our directory and passing this to the Ventilator. We tell the Sink how many results to expect and wait for the result to be returned.

When we have the final number we print it on the console. At no point in the process did any thread have to update a shared value.


In this post I investigated the programming challenges faced when dealing with concurrency, focusing on those specific to threaded concurrency. I have shown how 0MQ approaches this problem with the view that concurrency should never involve sharing state and communication is best handled by passing messages between processes. To demonstrate how this works I created a simple program to calculate the size of a directory and used the 0MQ task ventilator pattern to structure the program. By following this pattern the software is broken down into very specific parts to perform a job. All knowledge of how to read the size of a file is held in the worker. If we discover a better way to read the size of the file this component can be changed without any impact on the rest of the program. This isolation is a consequence of only allowing communication between the key components over a message channel. Therefore the code is simpler as each component does only one job.

All code can be found in this github project


asad safiq said...

Thanks for this post.Very important and timely article. Information provided is concise and informative. Keep up the great work!
data communication concepts

Tom Robinson said...

Interesting article, thanks.

You've put the Sink.cs in twice though, the second time is where Worker.cs should be?