Using TPL Dataflow Library for Concurrency Testing
The Task Parallel Library contains a very interesting set of dataflow components that for some reason they didn’t get the attention they deserve. This set is called TPL Dataflow Library and in few words, it is a robust in-process actor library that worth spending some time learning it! In this post, we will learn how to use it for concurrency testing.
What is the Task Parallel Library
The TPL is a set of types and APIs that allow developers to write parallel programs that target multi-core machines. The goal is to simplify the process of adding parallelism and concurrency to applications, by scaling the degree of concurrency dynamically and handling the partitioning of the work, the thread scheduling and other low-level details internally. This allows the developer to maximize performance while focusing on the actual work to be done.
Parallel Programming is taking advantage of multiple CPU cores to execute multiple threads simultaneously. Learn more about it here.
What is the TPL Dataflow Library
It is a set of dataflow components that promote actor-based programming by providing in-process message passing. This dataflow is extremely useful in cases of multiple operations that need to asynchronously talk to each other, for example parsing chunks of data as they arrive. In essence, the TPL Dataflow Library provides a foundation for message passing and parallelizing CPU and IO intensive applications.
The easiness of use can be demonstrated following these simple steps:
- Create an
ActionBlock
e.g.var block = new ActionBlock<Uri>(_ => _.DoSomething(uri));
- Sent items to it
e.g.block.SendAsync(new Uri("..."));
- Wait until it’s done
e.g.block.Completion.Wait();
And that’s it!
Of course this is far away from any working solution, but it illustrates perfectly the simplicity upon the dataflow library is build.
Going a bit deeper with ActionBlock
Production code of anything useful should include more than a simple method call: TPL Dataflow supports out of the box cancellation, capacity bound that protects from memory growth, degree of parallelism capping and of course async/awaits. Following, an example that uses the most important of the ActionBlock
goodies:
var actionBlock = new ActionBlock<Uri>(
async uri =>
{
var response = await client.GetAsync(uri);
response.EnsureSuccessStatusCode();
Assert.Equal("application/json; charset=utf-8",
response.Content.Headers.ContentType.ToString());
},
new ExecutionDataflowBlockOptions
{
BoundedCapacity = 5000, //Item count limitation
CancellationToken = ... //Cancelation
MaxDegreeOfParallelism = Environment.ProcessorCount //All cores
});
Are tests for concurrency issues possible?
It is theoretically impossible to create a test that proves there are no concurrency issues in a black-box system, and the reason is simple: Maybe the developer made it thread-unsafe on her/his birthday! Nevertheless, in a white-box system where the code that does the work and the code that does the threading are separate, concurrency tests are feasible by mocking the code that does the work. This guide on Practical Testing has C++ as codebase, but it is an interesting read.
In a more simplistic approach though, let’s assume a WeekendCounter
class with a Count()
method that returns the number of all weekend since the start of time till the DateTime to
. The code would need various improvements but it perfectly serves the reason of this post:
public class WeekendCounter
{
private DateTime current = DateTime.MinValue;
public int Count(DateTime to)
{
var count = 0;
//find first
while (current.DayOfWeek != DayOfWeek.Sunday)
current = current.AddDays(1);
//count rest
while (current <= to)
{
count++;
current = current.AddDays(7);
}
return count;
}
}
Using TPL Dataflow in a unit test
Essentially, a unit test is nothing more than a method that instantiates a small portion of our application and verifies its behavior independently from other parts. In the following test method one instance of the WeekendCounter
class can be tested for concurrency issues, by calling in parallel the Count()
method. The limitation here would only be the number of logical cores available, as more than one is of course needed.
From Microsoft Docs: If the current machine contains multiple processor groups, this property returns the number of logical processors that are available for use by the common language runtime (CLR).
The code that follows uses xUnit and TPL Dataflow to test the Count()
method, asserting the expected count on each block:
[Fact]
public async Task WeekendCounter_Parallelism_ShouldHaveNoConcurrencyIssues()
{
// You need at least 2 processors!
Assert.True(Environment.ProcessorCount > 1, "Not a chance!");
// Single instance of WeekendCounter to be tested
var counter = new WeekendCounter();
// Create an ActionBlock with the code to be executed
var actionBlock = new ActionBlock<Tuple<DateTime, int>>(
test =>
{
var result = counter.Count(test.Item1);
Assert.Equal(test.Item2, result);
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount
});
// Add items to the block
for (var i = 0; i < Environment.ProcessorCount; i++)
await actionBlock.SendAsync(
Tuple.Create(new DateTime(1983, 4, 26), 103432));
// Requests completion of the ActionBlock object.
actionBlock.Complete();
// Wait for the ActionBlock object to assert the result.
actionBlock.Completion.Wait();
}
Conclusion
Although ActionBlock
is the most useful block and alone covers almost all cases, there are more that adding together as puzzle pieces create a flow of information from one block to another, thus the name data flow. The reason behind why this library didn’t get traction it is surely puzzling, especially for the simple ActionBlock. Nevertheless, the applications are potentially unlimited, from just a few lines of code that test your app to biggest complex applications like the Microsoft Advanced Threat Analytics