Multi-threading with blocking queues

in #utopian-io7 years ago (edited)


What Will I Learn?

In this tutorial, I'll discuss multi-threading, and how it relates to queues; show you a simple implementation of the Airline check-In queue that can be very helpful for your own multi-threaded applications, and provide some guidelines for analyzing and tuning the performance of the queue. 

Requirements

  • NET Framework

Difficulty

Intermediate

Tutorial Contents

  • Multi-threading
  • Thread Pool Multi-threading 
  • Blocking Queue 
  • Predicting Queue Behavior 
  • Measuring Queue Behavior 

Multi-threading

There are different types of multi-threading, so let’s distinguish the one we’re looking at. Multi-threading can be used to execute two different lines of code at the same time, or the same line of code multiple times. An example of the former is a background calculation, or grid loading thread in a GUI app. The latter is the type we’re concentrating on today.

You execute the same code multiple times because you need to have parallel activities executing concurrently to increase the throughput of your application. Let’s take an example of a server handling requests that include some database accesses (or perhaps an external web service). If we assume that the database work takes a total of 500ms, then a single threaded server would only be able to sustain a peak load of 2 transactions per second (tps). Here's the relevant extract from code accompanying this tutorial:    

while (true)
    {
        Socket s = acceptingSocket.Accept();
        SocketHelper.ProcessConnection(s);
    }

The call to ProcessConnection() reads a message from the TCP socket, simulates the database access by delaying for 500ms, then sends a response back and closes the socket. I've included a test client program that can simulate 50 senders to drive this server at up to 200 requests per second. The results are as expected:

Now how can we improve this? Neither the server machine nor the database is busy – the limit in throughput is due to the (simulated) latency as the database requests fly back and forth between the server and database machine. While the single thread is stuck waiting for a response from the database, no other requests can be handled. Our machine can definitely handle more throughput, so let’s modify the server code to support multiple simultaneous threads. The easiest way is to use the .Net thread pool.

Thread Pool Multi-threading

Here's the relevant code:    

while (true)
    {
        Socket s = acceptingSocket.Accept();
        ThreadPool.QueueUserWorkItem(new WaitCallback(SocketHelper.ProcessConnection), s);
    }

You'll need to update two of the TCP/IP parameters in your registry (and probably restart your machine). A .Reg file you can simply double-click is included with the accompanying code.

The results of running the test using the .Net ThreadPool Server are interesting:

In total it takes about 26 seconds to reach a rate of 100 requests per second and then doesn’t increase any more – even though our client tool is configured to achieve 200 requests per second. Where does this limit come from? If we look at the Performance monitor (add the Thread Count counter of the Process category for process called QueueSample) during the execution of this test we can see the .Net Thread Pool growing. It continues until it reaches the default maximum value of 50. Because the requests take 500ms each, we have a maximum throughput of 50 threads * 2 per second per thread = 100 rps. More interesting than this is the behavior of the server during this time – look at the server output:

This strange output is caused by the RateCounter code, which is also driven off the thread pool (using a System.Timer callback). Because the thread pool is completely busy during the test, the Timer behavior is unpredictable; a callback is scheduled, but the callback is queued for execution on the next available thread from the pool - which might take some time (more than 5 seconds in some cases). The usage of the .Net thread pool to its capacity is something you should avoid if you want consistent, predictable behavior from your applications. MSDN suggests avoiding the use of the thread pool when:

You require a foreground thread
You require a thread to have a particular priority
You have tasks that cause the thread to block for long periods of time. The thread pool has a maximum number of threads, so a large number of blocked thread pool threads might prevent tasks from starting.
You need to place threads into a single-threaded apartment. All Thread Pool threads are in the multithreaded apartment.
You need to have a stable identity associated with the thread, or to dedicate a thread to a task.

We need another design that allows us to schedule activities for a collection of threads that we create ourselves (with appropriate priority, apartment model, identity etc.). In my company we produce applications that must be stable under loads of hundreds requests per second. We make extensive use of a queue component that supports custom thread pools, a simple interface and predictable behavior. We’re going to look at a slightly simplified version of this component now, which is included (along with all the samples) in the code accompanying this tutorial.

Blocking Queue

The BlockingQueue class is a derivative of the Framework’s System.Collection.Queue class. It differs from the standard class in that the Dequeue() method causes the caller to block until there’s something in the queue. Let’s see how that works in our sample application: the BlockingQueue Server:    

while (true)
    {
        Socket s = acceptingSocket.Accept();
        queue.Enqueue(s);
    }

As each socket is accepted it is added to the blocking queue. The accepting thread is then immediately free to continue accepting new sockets as quickly as they are connected. Any .Net object can be added to the queue – but you must be sure that the threads that read the objects back out of the queue can understand what to do with them.The threads are created when the application starts. Note that we can specify the threads any way we want here. Giving a thread a name helps a great deal with debugging later.    

threads = new Thread[serverThreadCount]; 
    for (int i = 0; i < serverThreadCount; ++i)
    {
        threads[i] = new Thread(new ThreadStart(BlockingQueueServer.ThreadLoop));
        threads[i].Name = "Sample Reader Thread " + Thread.CurrentThread.GetHashCode();
        threads[i].IsBackground = true;
        threads[i].Start();
    }

The last aspect is to implement the Reader thread itself:    

while (true)
    {
        Socket s = (Socket) queue.Dequeue();
        SocketHelper.ProcessConnection(s);
    }

The key here is that the threads we’ve created will run at full speed as long as there’s something in the queue for them to process. When the queue is empty the threads will block, waiting for something to do. Think of it as a super efficient, within-process version of MSMQ. Let's run the third sample server:

As hoped, the server is able to run at full speed almost immediately. The behaviour of the application is also smoother because the Thread Pool remains free for .Net’s own use including our Timer.

Because the BlockingQueue is so simple and transparent, it’s easy to see how we might extend it. For example, if we want a mechanism to stop the threads when there is no more work to do change the readers as follows:    

while (true)
    {
        Socket s = (Socket) queue.Dequeue();
        is (s == null)
        {
            queue.Enqueue(s);
            break;
        }
        SocketHelper.ProcessConnection(s);
    }

When a reader reads a null reference from the queue, it takes this as a signal to stop. Before it does this however, it re-enqueues the null, so that eventually every other live thread will also see the null and stop. At the client side it’s trivial to perform queue.Enqueue(null)

Another useful application of blocking queues is relative prioritization of requests. This is a traditionally a difficult feature to provide. Let's say we have two request types (A and B) to handle, and we want to ensure that B type requests have some guaranteed throughput – say one third of the capacity of the system. This is simple with blocking queues; create one queue for each request type as follows:

  • Request A queue – 10 threads
  • Request B queue – 5 threads

As requests come into the system they should be enqueued on the appropriate blocking queue. When the system is full of type A requests or type B requests, it will run at full speed; as required. When the system has a mix of request types, the ratio of threads will determine the relative throughput of the request types. Queue A will process 66% of the requests, Queue B the remainder (these numbers will change if the service times differ). As an extension of this idea, if you want type A requests to run preferentially, then reduce the priority of the threads servicing the B queue.

Here's another useful application of blocking queues. In some circumstances, we wish to synchronize requests from multiple threads (when writing a log file, for example). The traditional way to achieve this would be to use a lock in the log writer implementation. This has the drawback though, that threads may be stuck waiting for the lock for as long as it takes to process a single request. Using a blocking queue avoids this problem because the lock which the blocking queue uses internally is acquired only as long as it takes to Enqueue() the request – which is negligible. The log writer is a simple, single-threaded loop that Dequeue()’s requests from the blocking queue and processes them.

Note that there is no explicit lock shared between the readers and writes of the blocking queue. Thanks to this feature, a range of often tricky thread synchronization issues is avoided.

Finally, the demo application itself is a useful example of how to transfer .Net objects from one machine to another at a very high rate.

Predicting Queue Behavior

Our Airline Check-in desk / Blocking Queue model is approximated quite well with the following assumptions:

  • There are an infinite number of requesters.
  • The requesters are independent of each other.
  • The service time is not affected by the load on the system.
  • Requests are serviced on a first-in, first-out (FIFO) basis.

The mathematical queuing model that results from these assumptions is characterized as "M/M/c". The accompanying code gives a class which will calculate a number of metrics of these queues (such as average queue length). An alternative is to use the excellent and very complete Qts Plus Excel-based toolbox.

Let's take the example of a queue running at a steady rate of 90 rps, with 10 servers capable of 10 rps each. In this case the server utilization, or load, is 90%. The average queue length will be 6 requests, and the average response time will have increased from 0.1s (when the system is empty) to 0.167s. If the load on the system increases to 95%, then the response time increases to 0.26s. These would be the best possible results.

In practice, the assumptions above begin to break down quickly when the system gets busy. The service times are not independent (because each thread contends for CPU and memory), and the queue and threads themselves are not 'free' (more queue entries and more threads gives the garbage collector more work). From experience I would recommend that you configure a system to be no more than about 75% busy at peak load. Do not create more threads than are necessary (no more than two active threads per CPU according to Microsoft, but these would be active threads, not our examples, which spend most of their time idle).

Most importantly of all, the models and predictions you make should be backed up by measurements in your production environment.

Measuring Queue Behavior

Measuring queue behavior is extremely important, as often the first indication that you have something going wrong in the system is one or more of the queues filling up. A 15 second database lock timeout can cause real problems at 200 rps. Usefully, the blocking queue offers a natural measurement point to collect some critical statistics about the queue's dynamic behavior:

  • Queue Length
  • Enqueue Rate
  • Dequeue Rate
  • Response Time
  • Number of Blocked Readers

At Codescent we use a component based on BlockingQueue which makes these statistics available as performance counters. Thse can easily be viewed for a running system using the Performance Monitor, and provide a very interesting insight into how the system is performing - something which is often very difficult to achieve by other means.

The statistics are all fairly obvious apart from the Response Time measurement. This is measured from the exit of one invocation of the Dequeue() method until the entrance of the following invocation of the Dequeue() method by the same thread. If you have this, you can measure the response time when the system is quiet to derive the service rate for the mathematical model.

Conclusion

I've presented a small component that supports multi-threading using a simple, highly efficient queue mechanism. We’ve seen the situations when this mechanism is to be preferred over use of the .Net Thread Pool, and explored a small, but useful sample application that shows the usage of the three approaches (single threaded, thread pool and blocking queue). We've seen how to analyze the theoretical behavior of such a queue, and how we might monitor the dynamic behavior of the queue in a live system.  



Posted on Utopian.io - Rewarding Open Source Contributors

Sort:  

@dorodor, No matter approved or not, I upvote and support you.

Thank you for the contribution. It has been approved.

You can contact us on Discord.
[utopian-moderator]

Hey @dorodor I am @utopian-io. I have just upvoted you!

Achievements

  • You have less than 500 followers. Just gave you a gift to help you succeed!
  • Seems like you contribute quite often. AMAZING!

Suggestions

  • Contribute more often to get higher and higher rewards. I wish to see you often!
  • Work on your followers to increase the votes/rewards. I follow what humans do and my vote is mainly based on that. Good luck!

Get Noticed!

  • Did you know project owners can manually vote with their own voting power or by voting power delegated to their projects? Ask the project owner to review your contributions!

Community-Driven Witness!

I am the first and only Steem Community-Driven Witness. Participate on Discord. Lets GROW TOGETHER!

mooncryption-utopian-witness-gif

Up-vote this comment to grow my power and help Open Source contributions like this one. Want to chat? Join me on Discord https://discord.gg/Pc8HG9x

Coin Marketplace

STEEM 0.16
TRX 0.15
JST 0.028
BTC 58106.16
ETH 2286.45
USDT 1.00
SBD 2.56