Home Best way in .NET to manage queue of tasks on a separate (single) thread
Reply: 4

Best way in .NET to manage queue of tasks on a separate (single) thread

Josh
1#
Josh Published in 2014-09-05 18:12:48Z

I know that asynchronous programming has seen a lot of changes over the years. I'm somewhat embarrassed that I let myself get this rusty at just 34 years old, but I'm counting on StackOverflow to bring me up to speed.

What I am trying to do is manage a queue of "work" on a separate thread, but in such a way that only one item is processed at a time. I want to post work on this thread and it doesn't need to pass anything back to the caller. Of course I could simply spin up a new Thread object and have it loop over a shared Queue object, using sleeps, interrupts, wait handles, etc. But I know things have gotten better since then. We have BlockingCollection, Task, async/await, not to mention NuGet packages that probably abstract a lot of that.

I know that "What's the best..." questions are generally frowned upon so I'll rephrase it by saying "What is the currently recommended..." way to accomplish something like this using built-in .NET mechanisms preferably. But if a third party NuGet package simplifies things a bunch, it's just as well.

I considered a TaskScheduler instance with a fixed maximum concurrency of 1, but seems there is probably a much less clunky way to do that by now.

Background

Specifically, what I am trying to do in this case is queue an IP geolocation task during a web request. The same IP might wind up getting queued for geolocation multiple times, but the task will know how to detect that and skip out early if it's already been resolved. But the request handler is just going to throw these () => LocateAddress(context.Request.UserHostAddress) calls into a queue and let the LocateAddress method handle duplicate work detection. The geolocation API I am using doesn't like to be bombarded with requests which is why I want to limit it to a single concurrent task at a time. However, it would be nice if the approach was allowed to easily scale to more concurrent tasks with a simple parameter change.

Servy
2#
Servy Reply to 2016-11-03 20:41:45Z

To create an asynchronous single degree of parallelism queue of work you can simply create a SemaphoreSlim, initialized to one, and then have the enqueing method await on the acquisition of that semaphore before starting the requested work.

public class TaskQueue
{
    private SemaphoreSlim semaphore;
    public TaskQueue()
    {
        semaphore = new SemaphoreSlim(1);
    }

    public async Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
    {
        await semaphore.WaitAsync();
        try
        {
            return await taskGenerator();
        }
        finally
        {
            semaphore.Release();
        }
    }
    public async Task Enqueue(Func<Task> taskGenerator)
    {
        await semaphore.WaitAsync();
        try
        {
            await taskGenerator();
        }
        finally
        {
            semaphore.Release();
        }
    }
}

Of course, to have a fixed degree of parallelism other than one simply initialize the semaphore to some other number.

i3arnon
3#
i3arnon Reply to 2014-09-05 22:38:48Z

Your best option as I see it is using TPL Dataflow's ActionBlock:

var actionBlock = new ActionBlock<string>(address =>
{
    if (!IsDuplicate(address))
    {
        LocateAddress(address);
    }
});

actionBlock.Post(context.Request.UserHostAddress);

TPL Dataflow is robust, thread-safe, async-ready and very configurable actor-based framework (available as a nuget)

Here's a simple example for a more complicated case. Let's assume you want to:

  • Enable concurrency (limited to the available cores).
  • Limit the queue size (so you won't run out of memory).
  • Have both LocateAddress and the queue insertion be async.
  • Cancel everything after an hour.

var actionBlock = new ActionBlock<string>(async address =>
{
    if (!IsDuplicate(address))
    {
        await LocateAddressAsync(address);
    }
}, new ExecutionDataflowBlockOptions
{
    BoundedCapacity = 10000,
    MaxDegreeOfParallelism = Environment.ProcessorCount,
    CancellationToken = new CancellationTokenSource(TimeSpan.FromHours(1)).Token
});

await actionBlock.SendAsync(context.Request.UserHostAddress);
Zer0
4#
Zer0 Reply to 2014-09-05 18:33:48Z

Use BlockingCollection<Action> to create a producer/consumer pattern with one consumer (only one thing running at a time like you want) and one or many producers.

First define a shared queue somewhere:

BlockingCollection<Action> queue = new BlockingCollection<Action>();

In your consumer Thread or Task you take from it:

//This will block until there's an item available
Action itemToRun = queue.Take()

Then from any number of producers on other threads, simply add to the queue:

queue.Add(() => LocateAddress(context.Request.UserHostAddress));
Alexander Danilov
5#
Alexander Danilov Reply to 2018-01-28 07:29:35Z

Actually you don't need to run tasks in one thread, you need them to run serially (one after another), and FIFO. TPL doesn't have class for that, but here is my implementation with tests. https://github.com/Gentlee/SerialQueue

Also have @Servy implementation there, tests show it is twice slower than mine and it doesn't guarantee FIFO.

You need to login account before you can post.

About| Privacy statement| Terms of Service| Advertising| Contact us| Help| Sitemap|
Processed in 0.438693 second(s) , Gzip On .

© 2016 Powered by mzan.com design MATCHINFO