Tallan's Technology Blog

Tallan's Top Technologists Share Their Thoughts on Today's Technology Challenges

Rate Limited Threading in C#

Jeremy Mill

c-logo

Introduction

I recently ran into an issue while working on a project dealing with an external, rate limited, REST API. We wanted to perform an action on several thousand local objects which required hitting this external service and getting back a response. Running in an iterative pattern wasn’t an option for us because it would take too long, and guessing the number to try per minute wasn’t an option either because we might incur extra charges from the provider of the external API. The solution was to create a rate limited ThreadPool in our C# application to perform the requests, and a BlockingCollection to store the responses.

Technologies

There are only a few technologies/concepts in this post that people may not be familiar with, they are outlined here.

  • ThreadPool – a collection of threads which are idle until given work to do. A threadpool can take in a parameter which sets the maximum number of concurrent threads.
  • ManualResetEvent – a thread in a threadpool may be given a manual reset event. This is essentially a flag which is given to a thread task which, when changed to ‘True’, signals that the threads task is finished, allowing the next task in the threadpool to start.
  • BlockingCollection – This is the .Net implementation of a thread safe IEnumerable. Treat it like a list. It Handles all concurrency issues possible when running tasks in parallel. This is simpler to use than a lock or mutex, by abstracting it away and avoiding accidental deadlock.

Components

There are 3 components in this example. First is the main class, which handles the ThreadPool, ManualResetEvent array and the BlockingCollection. The next is the worker method which the threads in the ThreadPool will run. In this example, the worker method exists inside the main class, but it doesn’t have to. The last is the State class, which holds our integer from our dummy input, a reference to a ManualResetEvent, and a reference to our BlockingCollection. Let’s start with the State class, as it’s the most simple.

State Class

class State
{
    public int x { get; set; }
    public ManualResetEvent manualEvent { get; set; }
    public BlockingCollection<int> bc { get; set; }
}

The state class is simply a holder for our variables.

Worker Method

public static void DoSomething(object input)
{
    //cast input to a state class
    var state = (State)input;
    //write x in our thread
    Console.WriteLine(state.x);
    //add x to the output collection
    state.bc.Add(state.x * 2);
    //sleep for 100ms to simulate more work
    Thread.Sleep(100);
    Console.WriteLine(String.Format("Thread {0} finishing", state.x));
    //signal that we're done doing our thing, allowing the threadpool to start a new thread
    state.manualEvent.Set();
}

The worker method class is static in this case because it is referenced by our static main method. It must return void and take in an ‘object’ as an argument, if it takes in anything. We can then cast the object into an instance of our state class, and perform some work. In the case of our project, this is where our HTTP requests to the external API would take place. We also passed in a http client to the method via the State class to avoid creating and destroying http clients for every request. The method puts a response into the BlockingCollection. Finally, the thread calls ‘Set()’ on the ManualResetEvent to signal that it is finished, and the thread can return to the threadpool.

The Main Class

static void Main(string[] args)
{
    //make a garbage list, simulates our input
    List<int> SomeList = new List<int>();
    for (int x = 0; x < 101; x++)
    {
        SomeList.Add(x);
    }

For the purposes of this article the List of objects we want to work on is a list of integers, and the final result will be those integers multiplied by 2. This simulates work done by a HTTP request. The first thing we do is create our dummy list of inputs named SomeList, and populate it with the integers 0 to 101.

//user variables to set
TimeSpan MaxTime = new TimeSpan(0, 0, 3); //some time in hour,min,seconds
var MaxTasksInTimeFrame = 10; //the maximum number of tasks to complete in 3 seconds
var MaxConcurrentThreads = 5; //how many threads you want to run at the same time
//end user variables

Next is our user defined variables. MaxTime is the timeframe defined by your external API, so if you’re allowed 100 requests per minute, Max time would be set to TimeSpan(0,1,0), or one minute. MaxTasksInTimeFrame is set to the number of requests in that timeframe. Using the same hypothetical, you would set this value to 100. Finally, MaxConcurrentThreads is the number of threads that you want to allow to run at one time. Due to the fact that HTTP calls are an inherently I/O dependent operations which will get put to sleep by the OS pending a return from the remote server, this variable can be set to more than the number of logical CPUs or CPU cores safely. However, if you are using this code for a non I/O driven task such as performing calculations, you would not want to set this higher than the number of logical CPUs that the machine possesses. The number of logical CPU’s available can be attained by using the ‘Environment.ProcessorCount’ variable.

//the output collection
BlockingCollection<int> bc = new BlockingCollection<int>();
//ManualResetEvents are the events which signal a thread is done to the thread pool
ManualResetEvent[] manualEvents = new ManualResetEvent[MaxTasksInTimeFrame];

//set our threadpool variables
ThreadPool.SetMaxThreads(MaxConcurrentThreads, MaxConcurrentThreads);

The next section instantiates our BlockingCollection, our array of ManualResetEvent, and the threadpool settings. This array only needs to be as large as the number of tasks in our timeframe because we will never have more threads than that in the threadpool. The threadpool settings define the number of threads which can run concurrently.

while (SomeList.Count > 0)
{
    var startTime = DateTime.Now;

    for (int x = 0; x < MaxTasksInTimeFrame; x++)
    {
        //surround in try/catch to handle when we have an empty list
        try
        {
            //pop and element out of the list
            var element = SomeList[0];
            SomeList.RemoveAt(0);
            //set the manual reset event for this thread to false (unsignaled)
            manualEvents[x] = new ManualResetEvent(false);
            //create a new state which is made up of our input (x), the reset event (manualEvent), and the output collection
            State state = new State()
            {
                x = element,
                manualEvent = manualEvents[x],
                bc = bc
            };
            //queue the thread to start, calling the DoSomething method with 'state' as an input
            ThreadPool.QueueUserWorkItem(new WaitCallback(DoSomething), state);
        }
        //catch empty lists
        catch(System.ArgumentOutOfRangeException e)
        {
            break;
        }
    }
    //wait for all the threads in the queue to finish
    WaitHandle.WaitAll(manualEvents);
    //if the elapsedTime is less than our rate limit, sleep for the remainder
    var elapsedTime = DateTime.Now - startTime;
    if(elapsedTime < MaxTime)
    {
        Console.WriteLine("sleping for " + (MaxTime - elapsedTime).TotalSeconds.ToString());
        Thread.Sleep(MaxTime - elapsedTime);
    }
}

Next we begin to iterate over our list. We set a start time so that we know how long to wait before starting the next batch of tasks. We don’t want this to take any longer than we have to! We then create the tasks which will run in this time block, assign them their ‘State’ settings, and queue them in the ThreadPool. Note that we’re using WaitCallback in the QueueUserWorkItem method in order to utilize our ManualResetEvent.

We’re treating our input list like a queue, but if you need to hold onto your inputs to this method, simply change the manner in which you iterate over your inputs.

After creating and queuing (and starting!) all of our threads in the threadpool, call ‘WaitHandle.WaitAll’ to ensure that we stop and wait for all of the threads in the threadpool to finish before our main class continues.  Finally, check how much time has elapsed from start to finish, and sleep for the different between the elapsed time and the Max time if it’s less than it.

    //print our output!
    foreach(var item in bc)
    {
        Console.WriteLine(item);
    }
    //we're done
    Console.WriteLine("DONE!");
    Console.ReadLine();
}

Lastly, our example prints out the variables placed into our BlockingCollection which represents the output from all of our threads.

_________________________________________________________________________________________

To learn more on how Tallan can help your organization with our Custom Software Development capabilities, CLICK HERE to check out our website!

Tags: .net, C++, threading,

No comments

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>

\\\