多线程任务协调:从基础事件到生产者-消费者模式
立即解锁
发布时间: 2025-08-21 02:36:01 阅读量: 3 订阅数: 9 


深入理解C#并行编程精髓
### 多线程任务协调:从基础事件到生产者 - 消费者模式
在多线程编程中,任务协调是一个至关重要的主题。它涉及到如何让多个任务有序地执行,避免冲突,并高效地完成工作。本文将介绍几种常见的任务协调机制,包括 `AutoResetEvent`、`SemaphoreSlim` 以及并行生产者/消费者模式。
#### 1. AutoResetEvent
`AutoResetEvent` 与 `ManualResetEventSlim` 类似,但有一个重要的区别:每次调用 `Set()` 方法后,事件会自动重置,并且每次设置事件时只会释放一个等待的工作任务。
以下是 `AutoResetEvent` 类的关键成员:
| 成员 | 描述 |
| ---- | ---- |
| `Set()` | 设置事件,释放一个等待的任务。 |
| `WaitOne()` <br> `WaitOne(int)` <br> `WaitOne(TimeSpan)` | 等待事件被发出信号,或者指定的时间段过去。 |
下面是一个使用 `AutoResetEvent` 类的示例代码:
```csharp
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Listing_17 {
class Listing_17 {
static void Main(string[] args) {
// create the primtive
AutoResetEvent arEvent = new AutoResetEvent(false);
// create the cancellation token source
CancellationTokenSource tokenSource
= new CancellationTokenSource();
// create and start the task that will wait on the event
for (int i = 0; i < 3; i++) {
Task.Factory.StartNew(() => {
while (!tokenSource.Token.IsCancellationRequested) {
// wait on the primtive
arEvent.WaitOne();
// print out a message when we are released
Console.WriteLine("Task {0} released", Task.CurrentId);
}
// if we reach this point, we know the task has been cancelled
tokenSource.Token.ThrowIfCancellationRequested();
}, tokenSource.Token);
}
// create and start the signalling task
Task signallingTask = Task.Factory.StartNew(() => {
// loop while the task has not been cancelled
while (!tokenSource.Token.IsCancellationRequested) {
// go to sleep for a random period
tokenSource.Token.WaitHandle.WaitOne(500);
// set the event
arEvent.Set();
Console.WriteLine("Event set");
}
// if we reach this point, we know the task has been cancelled
tokenSource.Token.ThrowIfCancellationRequested();
}, tokenSource.Token);
// ask the user to press return before we cancel
// the token and bring the tasks to an end
Console.WriteLine("Press enter to cancel tasks");
Console.ReadLine();
// cancel the token source and wait for the tasks
tokenSource.Cancel();
// wait for input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();
}
}
}
```
在这个示例中,我们创建了三个工作任务,每个任务都调用 `AutoResetEvent` 的 `WaitOne()` 方法等待事件。一个监督任务每 500 毫秒设置一次事件,每次设置事件时会释放一个等待的工作任务。需要注意的是,`AutoResetEvent` 类并不保证在设置事件时会释放哪个等待的任务,因此在使用该类时,不要对工作任务的释放顺序做出假设。
#### 2. SemaphoreSlim
`System.Threading.SemaphoreSlim` 类允许你指定在设置事件时释放多少个等待的工作任务,这在你想要限制一组任务的并发程度时非常有用。监督者通过调用 `Release()` 方法来释放工作任务。默认版本释放一个任务,你也可以通过提供一个整数参数来指定释放的任务数量。
以下是 `SemaphoreSlim` 类的关键成员:
| 成员 | 描述 |
| ---- | ---- |
| `Release()` <br> `Release(int)` | 释放一个或指定数量的任务。 |
| `Wait()` <br> `Wait(CancellationToken)` <br> `Wait(int)` <br> `Wait(TimeSpan)` <br> `Wait(int, CancellationToken)` <br> `Wait(TimeSpan, CancellationToken)` | 调用此方法会阻塞,直到事件被设置、指定的时间段过去或指定的令牌被取消。 |
| `CurrentCount` | 返回将被释放的任务数量,或者如果没有等待的任务,则返回可以无阻塞调用 `Wait()` 方法的次数。 |
下面是一个使用 `SemaphoreSlim` 类的示例代码:
```csharp
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Listing_18 {
class Listing_18 {
static void Main(string[] args) {
// create the primtive
SemaphoreSlim semaphore = new SemaphoreSlim(2);
// create the cancellation token source
CancellationTokenSource tokenSource
= new CancellationTokenSource();
// create and start the task that will wait on the event
for (int i = 0; i < 10; i++) {
Task.Factory.StartNew(() => {
while (true) {
semaphore.Wait(tokenSource.Token);
// print out a message when we are released
Console.WriteLine("Task {0} released", Task.CurrentId);
}
}, tokenSource.Token);
}
// create and start the signalling task
Task signallingTask = Task.Factory.StartNew(() => {
// loop while the task has not been cancelled
while (!tokenSource.Token.IsCancellationRequested) {
// go to sleep for a random period
tokenSource.Token.WaitHandle.WaitOne(500);
// signal the semaphore
semaphore.Release(2);
Console.WriteLine("Semaphore released");
}
// if we reach this point, we know the task has been cancelled
tokenSource.Token.ThrowIfCancellationRequested();
}, tokenSource.Token);
// ask the user to press return before we cancel
// the token and bring the tasks to an end
Console.WriteLine("Press enter to cancel tasks");
Console.ReadLine();
// cancel the token source and wait for the tasks
tokenSource.Cancel();
// wait for input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();
}
}
}
```
在这个示例中,我们创建了十个工作任务,每个任务都调用 `SemaphoreSlim` 的 `Wait()` 方法等待信号。一个监督任务每 500 毫秒调用 `Release(2)` 方法释放两个线程。同样,调用 `Release()` 方法时,并不保证会释放哪些等待的任务。如果在调用 `Release()` 时没有任务等待,事件将保持设置状态,直到 `Wait()` 方法被调用。
#### 3. 并行生产者/消费者模式
并行生产者/消费者模式是一种非常常见的协调技术。一组任务(生产者)创建数据项,另一组任务(消费者)处理这些数据项。生产者和消费者之间的工作流程由一个集合(通常是队列)来协调:生产者将工作项放入集合中,消费者从集合中取出并处理这些工作项。
`System.Collections.Concurrent.BlockingCollection` 类将集合和同步原语组合成一个类,非常适合实现生产者/消费者模式。
##### 3
0
0
复制全文
相关推荐










