Home Best way in .NET to manage queue of tasks on a separate (single) thread

# Best way in .NET to manage queue of tasks on a separate (single) thread

Josh
1#
Josh Published in 2014-09-05 18:12:48Z
 I know that asynchronous programming has seen a lot of changes over the years. I'm somewhat embarrassed that I let myself get this rusty at just 34 years old, but I'm counting on StackOverflow to bring me up to speed. What I am trying to do is manage a queue of "work" on a separate thread, but in such a way that only one item is processed at a time. I want to post work on this thread and it doesn't need to pass anything back to the caller. Of course I could simply spin up a new Thread object and have it loop over a shared Queue object, using sleeps, interrupts, wait handles, etc. But I know things have gotten better since then. We have BlockingCollection, Task, async/await, not to mention NuGet packages that probably abstract a lot of that. I know that "What's the best..." questions are generally frowned upon so I'll rephrase it by saying "What is the currently recommended..." way to accomplish something like this using built-in .NET mechanisms preferably. But if a third party NuGet package simplifies things a bunch, it's just as well. I considered a TaskScheduler instance with a fixed maximum concurrency of 1, but seems there is probably a much less clunky way to do that by now. Background Specifically, what I am trying to do in this case is queue an IP geolocation task during a web request. The same IP might wind up getting queued for geolocation multiple times, but the task will know how to detect that and skip out early if it's already been resolved. But the request handler is just going to throw these () => LocateAddress(context.Request.UserHostAddress) calls into a queue and let the LocateAddress method handle duplicate work detection. The geolocation API I am using doesn't like to be bombarded with requests which is why I want to limit it to a single concurrent task at a time. However, it would be nice if the approach was allowed to easily scale to more concurrent tasks with a simple parameter change.
Servy
2#
 To create an asynchronous single degree of parallelism queue of work you can simply create a SemaphoreSlim, initialized to one, and then have the enqueing method await on the acquisition of that semaphore before starting the requested work. public class TaskQueue { private SemaphoreSlim semaphore; public TaskQueue() { semaphore = new SemaphoreSlim(1); } public async Task Enqueue(Func> taskGenerator) { await semaphore.WaitAsync(); try { return await taskGenerator(); } finally { semaphore.Release(); } } public async Task Enqueue(Func taskGenerator) { await semaphore.WaitAsync(); try { await taskGenerator(); } finally { semaphore.Release(); } } }  Of course, to have a fixed degree of parallelism other than one simply initialize the semaphore to some other number.
i3arnon
3#
 Your best option as I see it is using TPL Dataflow's ActionBlock: var actionBlock = new ActionBlock(address => { if (!IsDuplicate(address)) { LocateAddress(address); } }); actionBlock.Post(context.Request.UserHostAddress);  TPL Dataflow is robust, thread-safe, async-ready and very configurable actor-based framework (available as a nuget) Here's a simple example for a more complicated case. Let's assume you want to: Enable concurrency (limited to the available cores). Limit the queue size (so you won't run out of memory). Have both LocateAddress and the queue insertion be async. Cancel everything after an hour. var actionBlock = new ActionBlock(async address => { if (!IsDuplicate(address)) { await LocateAddressAsync(address); } }, new ExecutionDataflowBlockOptions { BoundedCapacity = 10000, MaxDegreeOfParallelism = Environment.ProcessorCount, CancellationToken = new CancellationTokenSource(TimeSpan.FromHours(1)).Token }); await actionBlock.SendAsync(context.Request.UserHostAddress); 
 Use BlockingCollection to create a producer/consumer pattern with one consumer (only one thing running at a time like you want) and one or many producers. First define a shared queue somewhere: BlockingCollection queue = new BlockingCollection();  In your consumer Thread or Task you take from it: //This will block until there's an item available Action itemToRun = queue.Take()  Then from any number of producers on other threads, simply add to the queue: queue.Add(() => LocateAddress(context.Request.UserHostAddress));