Table of Contents
- Introduction
- Why the Concurrency and Coordination Runtime (CCR) Exists
- Core Building Blocks of CCR
- The Session API – Overview
- Practical Example 1 – Coordinating Multiple Web Service Calls
- Practical Example 2 – Sensor Fusion in a Robotics Scenario
- Advanced Topics
- Performance Considerations & Common Pitfalls
- CCR Session API vs. Other Concurrency Models
- Conclusion
- Resources
Introduction
When you build modern, responsive applications—especially in domains like robotics, IoT, or high‑throughput services—handling asynchronous work efficiently becomes a core architectural challenge. Microsoft’s Concurrency and Coordination Runtime (CCR), originally shipped with Microsoft Robotics Developer Studio (MRDS), offers a lightweight, message‑driven model for orchestrating asynchronous operations without the overhead of heavyweight threads.
One of the most powerful yet often under‑explored facets of CCR is its Session API. Sessions let you group a collection of asynchronous tasks, define a shared lifetime, and manage cancellation, error handling, and cleanup in a single, coherent object. In this article we will:
- Explain the motivations behind CCR and where the Session API fits.
- Dive deep into the core building blocks (Dispatcher, Port, Task, etc.).
- Show how to create, use, and dispose of sessions with real‑world code.
- Discuss advanced patterns such as nested sessions, pooling, and integration with
async/await. - Compare CCR Sessions with other .NET concurrency models (TPL, Reactive Extensions, etc.).
By the end of this guide, you should be comfortable using the CCR Session API to coordinate complex, multi‑step asynchronous workflows in production‑grade software.
Why the Concurrency and Coordination Runtime (CCR) Exists
Before .NET introduced the Task Parallel Library (TPL) and async/await, developers struggled with callback hell, race conditions, and deadlocks in event‑driven systems. The CCR was designed to address these pain points by providing:
| Problem | Traditional Approach | CCR Solution |
|---|---|---|
| Callback nesting | Deeply nested delegates, hard to read | Interleave and Arbiter allow declarative composition |
| Thread explosion | One thread per request (e.g., thread pool misuse) | Dispatcher runs a small, fixed number of threads that process messages |
| Coordinating multiple async operations | Manual counting, volatile flags | Session tracks a group of operations and automatically signals completion or cancellation |
| Error propagation | Try/catch scattered across callbacks | Arbiter can capture faults and propagate them to the session’s error handler |
In robotics, for example, a robot may need to fetch sensor data, compute a path, and send motor commands—all concurrently but with a bounded, deterministic lifecycle. CCR’s message‑passing model and the Session API give developers deterministic control over when those operations start, finish, or abort.
Core Building Blocks of CCR
Understanding the Session API requires familiarity with the underlying primitives. Below is a concise refresher.
Dispatcher
The Dispatcher is the execution engine. It owns a pool of worker threads (default is Environment.ProcessorCount) and processes messages in a FIFO manner. All CCR operations ultimately get scheduled on a dispatcher.
// Create a dedicated dispatcher for a subsystem
Dispatcher myDispatcher = new Dispatcher(2, "RobotControlDispatcher");
Port & Receiver
A Port is a thread‑safe queue that holds messages of type T. Receivers subscribe to a port and react when a message arrives.
Port<int> sensorPort = new Port<int>();
Receiver<int> sensorReceiver = Arbiter.Receive<int>(true, sensorPort,
data => Console.WriteLine($"Sensor value: {data}"));
The true argument indicates the receiver should stay active after the first message.
Task, Arbiter, and Interleave
- Task – A delegate that runs on the dispatcher, often used to wrap synchronous work.
- Arbiter – A static helper that creates receivers for common patterns (e.g.,
Receive,Choose,Cancel,Throw). - Interleave – Allows you to define multiple concurrent cases (receivers) that can be processed in any order.
// Simple task that runs on the dispatcher
Task task = new Task(() => DoWork());
// Interleave with two concurrent receivers
Interleave inter = new Interleave();
inter.Add(Arbiter.Receive<int>(true, portA, a => HandleA(a)));
inter.Add(Arbiter.Receive<string>(true, portB, s => HandleB(s)));
These primitives work together to enable message‑driven concurrency.
The Session API – Overview
A Session is essentially a lifetime container for a set of CCR operations. Think of it as a supervisor that:
- Starts a collection of asynchronous work.
- Tracks the number of outstanding operations.
- Signals when all work completes successfully, or when a cancellation/error occurs.
- Cleans up resources (ports, receivers, timers) automatically.
Session Lifetime
A session is created in a running state. When you call session.Start(), the session begins counting any work you add via session.Add(task) or session.Join(port). The session automatically increments an internal reference count for each join. When the count reaches zero, the session fires its completion event (session.Completed).
You can also cancel a session explicitly, which triggers a cascade of Arbiter.Cancel calls on all joined ports.
Creating a Session
using Microsoft.Ccr.Core;
// Create a session that lives on the default dispatcher
Session mySession = new Session(Dispatcher.Current);
If you need a dedicated dispatcher—for example, to isolate a robotic subsystem—you can pass it in:
Session robotSession = new Session(myDispatcher);
Adding Work to a Session
There are several ways to associate work with a session:
| Method | What it does |
|---|---|
session.Join(Port<T> port) | Increments the session’s reference count each time a message arrives on the port. |
session.Add(Task task) | Executes task on the dispatcher and decrements the count when the task finishes. |
session.Queue(Action action) | Shortcut that wraps the action into a Task and adds it. |
session.Add(Arbiter.Receive<T>(...)) | Directly adds a receiver; the session tracks its lifetime. |
Cancellation & Cleanup
// Register a handler for session completion
mySession.Completed += delegate
{
Console.WriteLine("All work finished – cleaning up.");
};
// Cancel the session after 10 seconds (example using a timer)
Timer cancelTimer = new Timer(state => mySession.Cancel(),
null, TimeSpan.FromSeconds(10), Timeout.InfiniteTimeSpan);
When Cancel is called, CCR automatically:
- Sends a cancellation message to all joined ports.
- Calls any
Arbiter.Cancelhandlers you attached. - Fires the
Completedevent after all cleanup is done.
Practical Example 1 – Coordinating Multiple Web Service Calls
Scenario
You need to query three independent REST APIs (weather, traffic, and news) and combine the results into a single dashboard view. The calls should run in parallel, the UI must stay responsive, and the whole operation must be abortable if the user navigates away.
Step‑by‑Step Implementation
using System;
using System.Net.Http;
using Microsoft.Ccr.Core;
public class DashboardFetcher
{
private readonly Dispatcher _dispatcher = Dispatcher.Current;
private readonly HttpClient _http = new HttpClient();
// Ports for each service
private readonly Port<string> _weatherPort = new Port<string>();
private readonly Port<string> _trafficPort = new Port<string>();
private readonly Port<string> _newsPort = new Port<string>();
public void FetchAll(Action<string> onResult, Action<Exception> onError)
{
// 1️⃣ Create a session that will supervise the three calls
Session fetchSession = new Session(_dispatcher);
// 2️⃣ Register a completion handler that aggregates results
string weather = null, traffic = null, news = null;
fetchSession.Completed += delegate
{
if (weather != null && traffic != null && news != null)
{
string combined = $"Weather:{weather}\nTraffic:{traffic}\nNews:{news}";
onResult(combined);
}
};
// 3️⃣ Add receivers for each port (they automatically join the session)
fetchSession.Join(_weatherPort);
fetchSession.Join(_trafficPort);
fetchSession.Join(_newsPort);
// 4️⃣ Define how each incoming message updates the aggregate state
Arbiter.Receive<string>(true, _weatherPort, data => weather = data);
Arbiter.Receive<string>(true, _trafficPort, data => traffic = data);
Arbiter.Receive<string>(true, _newsPort, data => news = data);
// 5️⃣ Kick off the asynchronous HTTP calls
fetchSession.Queue(() => CallServiceAsync("https://api.weather.com/now", _weatherPort));
fetchSession.Queue(() => CallServiceAsync("https://api.traffic.com/status", _trafficPort));
fetchSession.Queue(() => CallServiceAsync("https://api.news.com/headlines", _newsPort));
// 6️⃣ Optional: expose a way to cancel (e.g., UI button)
// myCancelButton.Click += (s, e) => fetchSession.Cancel();
}
private async void CallServiceAsync(string url, Port<string> targetPort)
{
try
{
string json = await _http.GetStringAsync(url);
// Post the raw JSON onto the CCR port
targetPort.Post(json);
}
catch (Exception ex)
{
// Propagate the error to the session – this will cancel everything
targetPort.Post(null); // Signal failure via a null payload
// Alternatively you could use a dedicated error port or throw inside the session
}
}
}
What the code demonstrates
- Session creation –
new Session(_dispatcher)scopes the three service calls. - Automatic reference counting – Each
Joinadds a reference; the session completes when all three ports have posted. - Cancellation – If any call fails (or the UI triggers a cancel), you can call
fetchSession.Cancel(). - Separation of concerns – HTTP logic lives in
CallServiceAsync; aggregation lives in the session’s completion handler.
Running the Example
var fetcher = new DashboardFetcher();
fetcher.FetchAll(
result => Console.WriteLine("Combined result:\n" + result),
ex => Console.WriteLine("Error: " + ex.Message));
The console will print the combined JSON payload once all three services responded, or it will abort early if any request fails.
Practical Example 2 – Sensor Fusion in a Robotics Scenario
Scenario
A mobile robot gathers data from:
- Lidar (point cloud, high frequency)
- IMU (orientation, medium frequency)
- Camera (image frames, lower frequency)
The robot must fuse these streams into a local occupancy map that updates in real time. The fusion algorithm should run only when all three latest sensor readings are available, and it must be cancelable when the robot switches to a different behavior (e.g., docking).
Implementation Using CCR Sessions
using Microsoft.Ccr.Core;
using System;
using System.Collections.Generic;
// Simple data containers (in a real robot you'd use richer types)
public struct LidarScan { public double[] Distances; }
public struct ImuReading { public double Yaw, Pitch, Roll; }
public struct CameraFrame { public byte[] Pixels; }
public class SensorFusionEngine
{
private readonly Dispatcher _dispatcher = Dispatcher.Current;
// Ports for each sensor stream
private readonly Port<LidarScan> _lidarPort = new Port<LidarScan>();
private readonly Port<ImuReading> _imuPort = new Port<ImuReading>();
private readonly Port<CameraFrame> _cameraPort = new Port<CameraFrame>();
// Output port for fused map
public readonly Port<string> FusedMapPort = new Port<string>();
// Starts a new fusion session each time the robot enters "explore" mode
public Session StartFusionSession()
{
Session fusionSession = new Session(_dispatcher);
// Join all three sensor ports – the session will keep a count of pending messages
fusionSession.Join(_lidarPort);
fusionSession.Join(_imuPort);
fusionSession.Join(_cameraPort);
// When any sensor produces a new reading, store it locally
LidarScan latestLidar = default;
ImuReading latestImu = default;
CameraFrame latestCam = default;
Arbiter.Receive<LidarScan>(true, _lidarPort, scan => latestLidar = scan);
Arbiter.Receive<ImuReading>(true, _imuPort, imu => latestImu = imu);
Arbiter.Receive<CameraFrame>(true, _cameraPort, cam => latestCam = cam);
// Define a periodic timer that triggers the fusion step.
// The timer is *joined* to the session, so it stops when the session is cancelled.
Timer fusionTimer = new Timer(state => FuseAndPost(),
null,
TimeSpan.FromMilliseconds(100), // 10 Hz fusion
TimeSpan.FromMilliseconds(100));
// Joining the timer ensures the session holds it alive
fusionSession.Join(fusionTimer);
// Helper that performs the actual fusion (runs on the dispatcher)
void FuseAndPost()
{
// Simple placeholder for a real algorithm:
string map = $"Map @ {DateTime.Now:T} – L:{latestLidar.Distances.Length} pts, " +
$"IMU:{latestImu.Yaw:F1}/{latestImu.Pitch:F1}/{latestImu.Roll:F1}, " +
$"Img:{latestCam.Pixels?.Length ?? 0} bytes";
// Post fused map to an external consumer
FusedMapPort.Post(map);
}
// Optional: expose a cancellation method
fusionSession.Completed += delegate
{
Console.WriteLine("Fusion session ended – cleaning up resources.");
fusionTimer.Dispose(); // timer cleanup
};
// Start the session (implicitly starts counting)
// No explicit call needed; the joins already increment the ref count.
return fusionSession;
}
// Simulated sensor producers (in real code these would come from hardware drivers)
public void SimulateSensors()
{
// Lidar runs at 20 Hz
new Timer(_ => _lidarPort.Post(new LidarScan { Distances = new double[360] }),
null, TimeSpan.Zero, TimeSpan.FromMilliseconds(50));
// IMU runs at 10 Hz
new Timer(_ => _imuPort.Post(new ImuReading { Yaw = 0, Pitch = 0, Roll = 0 }),
null, TimeSpan.Zero, TimeSpan.FromMilliseconds(100));
// Camera runs at 5 Hz
new Timer(_ => _cameraPort.Post(new CameraFrame { Pixels = new byte[640 * 480 * 3] }),
null, TimeSpan.Zero, TimeSpan.FromMilliseconds(200));
}
}
How the Session Manages Lifetime
- Joining ports – Each sensor port adds a reference. The session won’t complete until each port has posted at least once and the timer has been disposed.
- Timer as a member – By joining the timer, cancelling the session also stops the periodic fusion loop automatically.
- Cancellation – When the robot changes behavior, you can invoke
currentFusionSession.Cancel(). All ports receive a cancellation message, the timer is disposed, and theCompletedevent fires.
Running the Fusion Engine
var engine = new SensorFusionEngine();
engine.SimulateSensors();
Session explore = engine.StartFusionSession();
engine.FusedMapPort.Receive(map => Console.WriteLine("New map: " + map));
// Let the robot explore for 5 seconds, then stop.
new Timer(_ => explore.Cancel(), null, TimeSpan.FromSeconds(5), Timeout.InfiniteTimeSpan);
You will see a stream of fused map strings printed roughly every 100 ms, and after 5 seconds the session will be cancelled cleanly.
Advanced Topics
Nested Sessions
Sometimes a high‑level operation spawns sub‑operations that have their own lifetimes. CCR supports nested sessions: a child session can be created with the parent’s dispatcher and its completion can be joined to the parent.
Session parent = new Session(Dispatcher.Current);
Session child = new Session(parent.Dispatcher);
// Child work
child.Queue(() => DoHeavyComputation());
// When child finishes, it automatically decrements parent's reference count
parent.Join(child);
When the parent is cancelled, all children are cancelled recursively.
Session Pooling & Reuse
Creating many short‑lived sessions can cause unnecessary allocations. A common pattern is to maintain a session pool for a given subsystem:
class SessionPool
{
private readonly Stack<Session> _pool = new Stack<Session>();
private readonly Dispatcher _dispatcher;
public SessionPool(Dispatcher dispatcher) => _dispatcher = dispatcher;
public Session Rent()
{
lock (_pool)
return _pool.Count > 0 ? _pool.Pop() : new Session(_dispatcher);
}
public void Return(Session s)
{
// Ensure the session is clean before reusing
s.Reset(); // hypothetical Reset method – you can also create a fresh one
lock (_pool) _pool.Push(s);
}
}
When you Return a session, make sure all ports have been detached and any timers disposed. This approach is especially useful in high‑frequency telemetry pipelines.
Interoperability with async/await
While CCR predates async/await, you can still bridge the two worlds:
public async Task<string> GetDataWithCcrAsync(string url)
{
var tcs = new TaskCompletionSource<string>();
Port<string> resultPort = new Port<string>();
// CCR side – post result onto the port
Task.Run(() => {
// Simulate a blocking call inside CCR
string data = new HttpClient().GetStringAsync(url).Result;
resultPort.Post(data);
});
// Bridge: when the port receives, complete the TaskCompletionSource
Arbiter.Receive<string>(true, resultPort, data => tcs.SetResult(data));
return await tcs.Task;
}
This pattern lets you write modern async methods while leveraging CCR’s message‑driven coordination for the rest of your system.
Debugging Sessions
CCR provides diagnostic hooks:
Dispatcher.Inspect– Returns a snapshot of pending messages and their target ports.Session.Inspect– Shows the current reference count and any attached ports.Arbiter.Trace– You can enable tracing for specific receivers.
Example:
Dispatcher.Current.Inspect(msgs => {
Console.WriteLine($"Dispatcher has {msgs.Count} pending messages.");
foreach (var m in msgs) Console.WriteLine($" {m}");
});
When debugging race conditions, pay attention to reference count mismatches (e.g., a session never completing because a Join was not balanced with a Leave). Adding explicit logging in session.Completed and session.Cancelled helps surface these issues early.
Performance Considerations & Common Pitfalls
| Pitfall | Symptom | Remedy |
|---|---|---|
| Unbalanced Join/Leave | Session never reaches zero references → memory leak, hung UI | Always pair session.Join(port) with a corresponding session.Leave(port) or ensure the port posts a final message. |
| Blocking Work on Dispatcher Thread | Dispatcher stalls, causing latency across all CCR components | Offload heavyweight CPU‑bound work to a dedicated thread pool (Task.Run) and only post results back to CCR. |
| Excessive Port Creation | High allocation pressure, GC churn | Reuse ports where possible; treat them as long‑lived objects attached to a subsystem. |
| Mixed Threading Models | Deadlocks when mixing TPL Task.Wait with CCR Arbiter.Receive | Keep CCR message handling non‑blocking; if you need to wait, use TaskCompletionSource instead of Task.Wait. |
| Large Payloads on Ports | Increased memory usage, slower message dispatch | Prefer passing references (e.g., byte[]) and reuse buffers; avoid copying large objects per message. |
Performance tip: The CCR’s dispatcher processes messages in FIFO order. If you have a burst of high‑priority work, you can create a high‑priority dispatcher (by specifying a larger thread count) and route those messages to its ports, leaving the default dispatcher for background tasks.
CCR Session API vs. Other Concurrency Models
| Feature | CCR Session | TPL (Task, CancellationToken) | Reactive Extensions (Rx) |
|---|---|---|---|
| Message‑driven | Core concept (Ports, Receivers) | Not inherent; you must build it | Built‑in observable streams |
| Reference counting | Automatic via Join/Leave | Manual via Task.WhenAll | Automatic via CompositeDisposable |
| Cancellation propagation | session.Cancel() sends cancel messages to all ports | CancellationTokenSource.Cancel() | Disposable.Dispose() on subscriptions |
| Deterministic ordering | FIFO per dispatcher | Depends on task scheduler | Depends on observable operators |
| Integration with hardware drivers | Designed for robotics, low‑latency sensor loops | Requires wrappers | Can wrap but less common in low‑level code |
| Learning curve | Steeper (ports, arbiter, interleave) | Familiar to most .NET devs | Requires understanding of LINQ‑style operators |
If you’re already invested in a TPL‑centric codebase, you can still use CCR Sessions for specific subsystems that benefit from message passing (e.g., robotics). Conversely, for UI‑heavy applications, async/await with CancellationToken is usually simpler.
Conclusion
The Microsoft CCR Session API offers a disciplined way to group asynchronous work, manage its lifetime, and guarantee clean cancellation—all without spawning a flood of threads. By leveraging Ports, Arbiter, Interleave, and the Session itself, you can:
- Build deterministic pipelines for sensor fusion, service orchestration, or any multi‑source data processing.
- Keep your UI responsive by offloading work to the CCR dispatcher while preserving a clear cancellation path.
- Combine CCR with modern
async/awaitpatterns when you need to bridge legacy message‑driven code with newer .NET APIs.
While CCR is older than the Task Parallel Library, its concepts remain valuable—especially in domains where message‑driven concurrency and tight resource control are paramount. Understanding and mastering the Session API equips you to write robust, maintainable asynchronous code that scales from a single robot to distributed IoT fleets.
Happy coding, and may your sessions always complete cleanly!
Resources
Microsoft Robotics Developer Studio – CCR Overview – Detailed official documentation (archived):
Microsoft Robotics Developer Studio – CCR Documentation“Concurrency and Coordination Runtime (CCR) – A Deep Dive” – Blog post by Dan Harkey, covering ports, arbiters, and sessions:
Concurrency and Coordination Runtime – Deep DiveRobotics Stack Exchange – Real‑world CCR Session patterns – Community Q&A with code snippets:
Robotics Stack Exchange – CCR Sessions“Bridging async/await and CCR” – Article on integrating modern async patterns with CCR:
Bridging async/await and CCRGitHub – Microsoft Robotics Samples – Open‑source sample projects that demonstrate sessions in action:
Microsoft Robotics Sample Repository