响应式编程:RxJava操作符全解析
立即解锁
发布时间: 2025-08-19 00:06:53 阅读量: 2 订阅数: 7 


Java 12编程实战指南
### 响应式编程:RxJava 操作符全解析
#### 1. 操作符概述
在响应式编程中,有众多操作符可用于处理 Observable 发出的值。需要注意的是,这里介绍的并非全部可用操作符,更多操作符可查看在线文档(http://reactivex.io/RxJava/2.x/javadoc/index.html)。
#### 2. 转换操作符
这些操作符用于转换 Observable 发出的值:
- `buffer()`:根据提供的参数或函数将发出的值收集成束,并定期逐个发出这些束。
- `flatMap()`:根据当前 Observable 生成新的 Observable 并插入当前流中,是最常用的操作符之一。
- `groupBy()`:将当前 Observable 划分为多个 Observable 组(GroupedObservables 对象)。
- `map()`:使用提供的函数转换发出的值。
- `scan()`:将提供的函数应用于每个值,并结合前一次应用该函数的结果。
- `window()`:类似于 `buffer()`,但发出的是 Observable 组,每个 Observable 发出原始 Observable 的一部分值,然后以 `onCompleted()` 终止。
以下代码演示了 `map()`、`flatMap()` 和 `groupBy()` 的使用:
```java
Observable<String> obs = Observable.fromArray("one", "two");
obs.map(s -> s.contains("w") ? 1 : 0)
.forEach(System.out::print); //prints: 01
List<String> os = new ArrayList<>();
List<String> noto = new ArrayList<>();
obs.flatMap(s -> Observable.fromArray(s.split("")))
.groupBy(s -> "o".equals(s) ? "o" : "noto")
.subscribe(g -> g.subscribe(s -> {
if (g.getKey().equals("o")) {
os.add(s);
} else {
noto.add(s);
}
}));
System.out.println(os); //prints: [o, o]
System.out.println(noto); //prints: [n, e, t, w]
```
#### 3. 过滤操作符
这些操作符(及其多个重载版本)用于选择哪些值将继续通过管道流动:
- `debounce()`:仅在指定时间跨度内没有 Observable 发出其他值时才发出一个值。
- `distinct()`:仅选择唯一值。
- `elementAt(long n)`:仅发出流中指定位置 `n` 的一个值。
- `filter()`:仅发出符合指定条件的值。
- `firstElement()`:仅发出第一个值。
- `ignoreElements()`:不发出值,仅 `onComplete()` 信号通过。
- `lastElement()`:仅发出最后一个值。
- `sample()`:发出指定时间间隔内最近发出的值。
- `skip(long n)`:跳过前 `n` 个值。
- `take(long n)`:仅发出前 `n` 个值。
以下是一些过滤操作符的使用示例:
```java
Observable<String> obs = Observable.just("onetwo")
.flatMap(s -> Observable.fromArray(s.split("")));
// obs emits "onetwo" as characters
obs.map(s -> {
if("t".equals(s)){
NonBlockingOperators.pauseMs(15);
}
return s;
})
.debounce(10, TimeUnit.MILLISECONDS)
.forEach(System.out::print); //prints: eo
obs.distinct().forEach(System.out::print); //prints: onetw
obs.elementAt(3).subscribe(System.out::println); //prints: t
obs.filter(s -> s.equals("o"))
.forEach(System.out::print); //prints: oo
obs.firstElement().subscribe(System.out::println); //prints: o
obs.ignoreElements().subscribe(() ->
System.out.println("Completed!")); //prints: Completed!
Observable.interval(5, TimeUnit.MILLISECONDS)
.sample(10, TimeUnit.MILLISECONDS)
.subscribe(v -> System.out.print(v + " ")); //prints: 1 3 4 6 8
pauseMs(50);
```
#### 4. 组合操作符
这些操作符(及其多个重载版本)使用多个源 Observable 创建新的 Observable:
- `concat(src1, src2)`:创建一个 Observable,先发出 `src1` 的所有值,然后发出 `src2` 的所有值。
- `combineLatest(src1, src2, combiner)`:创建一个 Observable,发出两个源中任意一个发出的值,并结合每个源最近发出的值,使用提供的组合函数。
- `join(src2, leftWin, rightWin, combiner)`:根据组合函数,在 `leftWin` 和 `rightWin` 时间窗口内组合两个 Observable 发出的值。
- `merge()`:将多个 Observable 合并为一个,与 `concat()` 不同,它可能会交错发出值,而 `concat()` 不会交错不同 Observable 发出的值。
- `startWith(T item)`:在发出源 Observable 的值之前添加指定的值。
- `startWith(Observable<T> other)`:在发出源 Observable 的值之前添加指定 Observable 的值。
- `switchOnNext(Observable<Observable> observables)`:创建一个新的 Observable,发出指定 Observable 中最近发出的值。
- `zip()`:使用提供的函数组合指定 Observable 的值。
以下代码演示了一些组合操作符的使用:
```java
Observable<String> obs1 = Observable.just("one")
.flatMap(s -> Observable.fromArray(s.split("")));
Observable<String> obs2 = Observable.just("two")
.flatMap(s -> Observable.fromArray(s.split("")));
Observable.concat(obs2, obs1, obs2)
.subscribe(System.out::print); //prints: twoonetwo
Observable.combineLatest(obs2, obs1, (x,y) -> "("+x+y+")")
.subscribe(System.out::print); //prints: (oo)(on)(oe)
System.out.println();
obs1.join(obs2, i -> Observable.timer(5, TimeUnit.MILLISECONDS),
i -> Observable.timer(5, TimeUnit.MILLISECONDS),
(x,y) -> "("+x+y+")").subscribe(System.out::print);
//prints: (ot)(nt)(et)(ow)(nw)(ew)(oo)(no)(eo)
Observable.merge(obs2, obs1, obs2)
.subscribe(System.out::print); //prints: twoonetwo
obs1.startWith("42")
.subscribe(System.out::print); //prints: 42one
Observable.zip(obs1, obs2, obs1, (x,y,z) -> "("+x+y+z+")")
.subscribe(System.out::print); //prints: (oto)(nwn)(eoe)
```
#### 5. 转换操作符(从 XXX 转换)
这些操作符相对简单,以下是 Observable 类的从 XXX 转换的操作符列表:
| 操作符 | 描述 |
| ---- | ---- |
| `fromArray(T... items)` | 从可变参数创建 Observable |
| `fromCallable(Callable<T> supplier)` | 从 Callable 函数创建 Observable |
| `fromFuture(Future<T> future)` | 从 Future 对象创建 Observable |
| `fromFuture(Future<T> future, long timeout, TimeUnit unit)` | 从 Future 对象创建 Observable,并应用超时参数 |
| `fromFuture(Future<T> future, long timeout, TimeUnit unit, Scheduler scheduler)` | 从 Future 对象创建 Observable,应用超时参数并指定调度器 |
| `fromFuture(Future<T> future, Scheduler scheduler)` | 在指定调度器上从 Future 对象创建 Observable |
| `fromIterable(Iterable<T> source)` | 从可迭代对象(如 List)创建 Observable |
| `fromPublisher(Publisher<T> publisher)` | 从 Publisher 对象创建 Observable |
#### 6. 异常处理
`subscribe()` 操作符有一个重载版本,接受 `Consumer<Throwable>` 函数,用于处理管道中任何地方抛出的异常,其工作方式类似于全面的 `try-catch` 块。如果将此函数传递给 `subscribe()` 操作符,可以确保所有异常都将在此处处理。
如果需要在管道中间处理异常,以便在抛出异常的操作符之后恢复值流并由其余操作符处理,以下操作符(及其多个重载版本)可以提供帮助:
- `onErrorXXX()`:在捕获到异常时恢复提供的序列,`XXX` 表示操作符的具体行为,如 `onErrorResumeNext()`、`onErrorReturn()` 或 `onErrorReturnItem()`。
- `retry()`:创建一个 Observable,重复源 Observable 发出的排放,如果源 Observable 调用 `onError()`,则重新订阅。
以下是演示代码:
```java
Observable<String> obs = Observable.just("one")
.flatMap(s -> Observable.fromArray(s.split("")));
Observable.error(new RuntimeException("MyException"))
.flatMap(x -> Observable.fromArray("two".split("")))
.subscribe(System.out::print,
e -> System.out.println(e.getMessage())//prints: MyException
)
```
0
0
复制全文
相关推荐










