All code can be found in this github project
What is a concurrent program?
The word concurrent means more than one thing working together to achieve a common goal. In computing this means doing one of two things; something which is computationally expensive, like encoding a video file, or something that requires some sort of IO, like retrieving the size of a number of web pages.The opportunity to employ concurrency has exploded with the arrival of multicore processors and the rise of hosted processing platforms like Amazon EC2 and Windows Azure. These two changes represent the two ends of the concurrency spectrum. To achieve concurrency on a multicore processor we create threads within our application and manage how they will share state. Whereas achieving concurrency using something like EC2 is network based and requires the use of a communication channel like TCP. When communicating over the network, state is handled by passing messages.
0MQ recognises that the best way to create a concurrent program is to pass messages and not to share state. Whether it is two threads running within a process or thousands of processes running across the internet, 0MQ uses the same model of sockets and messaging to create very stable and scalable applications.
Multiple threads shared state and locks
In .Net any program that must do more than one task at a time must create a thread. Threads are a way for Windows to abstract the management of many different streams of execution. Each thread gets it’s own stack and set of registers. The OS will then handle which thread is to be executed at one time.The problem with threads is that when they have to communicate with each other the typical way is to share some value in memory. This can cause data corruption as more than one thread could be accessing the data at one time, so the application has to manage access to the shared data. This is done by locking the shared data, ensuring that only one thread can manipulate it at any one time. This mechanism adds complexity to an application as it must include the locking logic. It also has an effect on performance.
0MQ multiple threads and no shared state
0MQ makes threaded programming simpler by swapping shared state for messaging. To demonstrate this I have created a simple program which calculates the size of a directory by adding up the size of each file it has.As we are using 0MQ we have to understand some of the concepts it uses. The first concept is static and dynamic components. Static components are pieces of infrastructure that we can always expect to be there. They usually own an endpoint which can be bound to. Dynamic components come and go and generally bind to endpoints. The next concept is the types of sockets provided by 0MQ. The implementation we’ll be looking at uses two types of sockets, PUSH and PULL. The PUSH socket is designed to distribute the work fairly to all connected clients, whilst the PULL socket collects results evenly from the workers. Using these socket types prevents one thread from being flooded with tasks or left idle waiting for it’s result to be taken.
Finally the 0MQ guide has a number of patterns for composing an application depending on the type of work being done. The example below calculates the size of a directory by getting the size of each file and adding them together. To achieve this task in 0MQ, a good choice is the task ventilator pattern.
In the diagram each box is a component in our application and components communicate with each other using 0MQ sockets. There are two static components in this application, the Ventilator and the Sink. There will only be one instance of each in the application and they will run on the same thread. There is one dynamic component, the Worker. There can be any number of workers and each one runs on it’s own thread.
To calculate the size of the directory, the Ventilator is given a list of files from the directory. It sends the name of each one out on it’s message queue.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public void Start() | |
{ | |
_ventilator = _context.CreateSocket(SocketType.PUSH); | |
_ventilator.Bind("inproc://ventilator"); | |
} | |
public void Run(string[] fileList) | |
{ | |
foreach (var fileName in fileList) | |
{ | |
_ventilator.Send(fileName, Encoding.Unicode); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public void Start() | |
{ | |
_receiver = _context.CreateSocket(SocketType.PULL); | |
_receiver.Bind("inproc://sink"); | |
} | |
public Int64 Run(int length) | |
{ | |
Int64 sizeOfDirectory = 0; | |
for (var i = 0; i < length; i++) | |
{ | |
var size = _receiver.Receive(Encoding.Unicode); | |
Int64 temp; | |
if(Int64.TryParse(size, out temp)) | |
{ | |
sizeOfDirectory += temp; | |
} | |
} | |
return sizeOfDirectory; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public void Start() | |
{ | |
_ventilator = _context.CreateSocket(SocketType.PUSH); | |
_ventilator.Bind("inproc://ventilator"); | |
} | |
public void Run(string[] fileList) | |
{ | |
foreach (var fileName in fileList) | |
{ | |
_ventilator.Send(fileName, Encoding.Unicode); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private void RecieverPollInHandler(ZmqSocket reciever, ZmqSocket) | |
{ | |
Thread.Sleep(100); | |
// Pull the job from the Ventilator | |
var fileToMeasure = reciever.Receive(Encoding.Unicode); | |
Int64 fileLength = 0; | |
FileStream fs = null; | |
try | |
{ | |
fs = File.OpenRead(fileToMeasure); | |
fileLength = fs.Length; | |
} | |
catch (IOException) { } | |
finally | |
{ | |
if (fs != null) fs.Dispose(); | |
} | |
Console.Write("."); | |
// Push the result to the Sink | |
sender.Send(fileLength.ToString(), Encoding.Unicode); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var context = ZmqContext.Create() | |
var ventilator = new Ventilator(context); | |
var sink = new Sink(context); | |
ventilator.Start(); | |
sink.Start(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const int workersCount = 4; | |
var workers = new Thread[workersCount]; | |
for (int i = 0; i < workersCount; i++) | |
{ | |
(workers[i] = new Thread(() => new TaskWorker(context).Run())).Start(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var fileList = EnumerateDirectory(@"C:\example\directory", "*.*", SearchOption.AllDirectories); | |
ventilator.Run(fileList); | |
var result = sink.Run(fileList.Length); | |
Console.WriteLine("Found the length of {0} files in {1} milliseconds.\nDirectory size is {2}", | |
fileList.Length, stopWatch.ElapsedMilliseconds, result); |
Conclusion
In this post I investigated the programming challenges faced when dealing with concurrency, focusing on those specific to threaded concurrency. I have shown how 0MQ approaches this problem with the view that concurrency should never involve sharing state and communication is best handled by passing messages between processes. To demonstrate how this works I created a simple program to calculate the size of a directory and used the 0MQ task ventilator pattern to structure the program. By following this pattern the software is broken down into very specific parts to perform a job. All knowledge of how to read the size of a file is held in the worker. If we discover a better way to read the size of the file this component can be changed without any impact on the rest of the program. This isolation is a consequence of only allowing communication between the key components over a message channel. Therefore the code is simpler as each component does only one job.All code can be found in this github project
1 comment:
Interesting article, thanks.
You've put the Sink.cs in twice though, the second time is where Worker.cs should be?
Post a Comment