并行计算中的任务处理与同步机制
发布时间: 2025-08-17 02:17:25 阅读量: 3 订阅数: 3 

# 并行计算中的任务处理与同步机制
## 1. 任务列表累积操作
在并行计算中,当所有任务被创建后,可以通过累积(也称为归约)操作顺序遍历任务列表,合并并使用每个子任务的结果。以斐波那契数列计算为例,`FibVL` 类展示了一种实现方式。
### 1.1 `FibVL` 类代码实现
```java
class FibVL extends FJTask {
volatile int number; // 与之前相同
final FibVL next; // 兄弟任务的嵌入式链表
FibVL(int n, FibVL list) { number = n; next = list; }
public void run() {
int n = number;
if(n <= sequentialThreshold)
number = seqFib(n);
else {
FibVL forked = null; // 子任务列表
forked = new FibVL(n - 1, forked); // 前置到列表
forked.fork();
forked = new FibVL(n - 2, forked);
forked.fork();
number = accumulate(forked);
}
}
// 遍历列表,合并每个子任务并将结果相加
int accumulate(FibVL list) {
int sum = 0;
for (FibVL f = list; f != null; f = f.next) {
f.join();
sum += f.number;
}
return sum;
}
}
```
### 1.2 操作步骤
1. **任务创建**:当 `n` 大于顺序阈值时,创建两个子任务 `n - 1` 和 `n - 2`,并将它们添加到任务列表中。
2. **任务执行**:使用 `fork` 方法启动子任务。
3. **结果累积**:通过 `accumulate` 方法遍历任务列表,使用 `join` 方法等待子任务完成,并将结果累加到 `sum` 中。
这种方式适用于动态创建子任务的场景,尤其是在不同方法中创建子任务的情况。由于结果的处理顺序在这里并不重要,所以使用了最简单的链表算法(前置),这会在遍历过程中反转任务的顺序。只要累积步骤对于结果是可交换和可结合的,任务就可以按任意顺序处理。
## 2. 回调机制
递归任务的 `fork/join` 并行性可以扩展到使用其他局部同步条件而不是 `join` 的情况。在 `FJTask` 框架中,`t.join()` 可以实现为以下优化版本:
```java
while (!t.isDone()) yield();
```
`yield` 方法允许底层工作线程处理其他任务。除了 `isDone` 条件,还可以使用其他条件,只要确保等待的谓词最终会因为子任务的操作而变为真。
### 2.1 基于计数器的任务控制
任务控制可以依赖于计数器来跟踪任务的创建和完成。计数器在每次 `fork` 时递增,在分叉任务产生结果时递减。这种基于计数器的方案在子任务通过回调而不是访问结果字段来返回结果时是有吸引力的选择。
### 2.2 `FibVCB` 类代码实现
```java
class FibVCB extends FJTask {
volatile int number = 0; // 与之前相同
final FibVCB parent; // 最外层调用时为 null
int callbacksExpected = 0;
volatile int callbacksReceived = 0;
FibVCB(int n, FibVCB p) { number = n; parent = p; }
// 子任务完成时调用的回调方法
synchronized void addToResult(int n) {
number += n;
++callbacksReceived;
}
public void run() { // 与基于 join 的版本结构相同
int n = number;
if (n <= sequentialThreshold)
number = seqFib(n);
else {
// 清空 number 以便子任务填充
number = 0;
// 确定预期的回调数量
callbacksExpected = 2;
new FibVCB(n - 1, this).fork();
new FibVCB(n - 2, this).fork();
// 等待子任务的回调
while (callbacksReceived < callbacksExpected) yield();
}
// 回调父任务
if (parent != null) parent.addToResult(number);
}
}
```
### 2.3 注意事项
- **锁的使用**:所有互斥锁都限制在保护字段访问的小代码段中,任务不允许阻塞,除非确定能很快继续执行。
- **计数器拆分**:回调计数拆分为 `callbacksExpected` 和 `callbacksReceived` 两个计数器,当它们相等时任务完成。
- **`callbacksExpected` 计数器**:仅由当前任务使用,不需要同步和 `volatile` 修饰。
- **`addToResult` 方法**:必须同步以避免子任务同时回调时的干扰问题。
- **`yield` 循环测试**:由于 `number` 和 `callbacksReceived` 声明为 `volatile`,且 `callbacksReceived` 在 `addToResult` 方法的最后更新,`yield` 循环测试不需要同步。
基于回调的 `fork/join` 设计在问题解决算法、游戏、搜索和逻辑编程等领域常见。它允许更大的异步性,在子任务预期持续时间不同时可能带来更好的性能,但放弃了结果排序保证,仅适用于子任务结果处理与结果产生顺序完全无关的情况。
## 3. 任务取消
在某些设计中,不需要记录回调计数或详尽遍历子任务列表。当任何子任务(或其子任务等)得到合适结果时,任务就完成了。此时,可以通过取消正在产生不需要结果的子任务来避免浪费计算资源。
### 3.1 取消方式
- **子任务检查父任务状态**:子任务可以定期调用父任务的方法(如 `isDone`)来判断是否已经找到答案,如果是则提前返回,并设置自己的状态,以便其子任务也能这样做。可以使用 `FJTask.cancel` 方法提前设置 `isDone` 状态,这会抑制尚未开始的任务的执行,但对正在执行的任务无效,除非任务的 `run` 方法检测到更新的状态并处理。
- **检查全局变量**:当一组任务都在计算单个结果时,任务可以定期检查一个全局(静态)变量来指示完成情况。但当有许多任务和 CPU 时,更本地化的策略可能更可取,以避免对底层系统造成过大压力。
### 3.2 `NQueens` 类代码实现
```java
class NQueens extends FJTask {
static int boardSize; // 在 main 中初始化后固定
static class Result { // 最终结果的持有者
private int[] board = null; // 解决时不为 null
synchronized boolean solved() { return board != null; }
synchronized void set(int[] b) { // 支持非任务使用
if (board == null) { board = b; notifyAll(); }
}
synchronized int[] await() throws InterruptedException {
while (board == null) wait();
return board;
}
}
static final Result result = new Result();
public static void main(String[] args) {
boardSize = ...;
FJTaskRunnerGroup tasks = new FJTaskRunnerGroup(...);
int[] initialBoard = new int[0]; // 从空棋盘开始
tasks.execute(new NQueens(initialBoard));
int[] board = result.await();
// ...
}
final int[] sofar; // 初始配置
NQueens(int[] board) { this.sofar = board; }
public void run() {
```
0
0