flink中outputag测流的用法
时间: 2025-06-25 12:14:02 浏览: 20
### Flink 中 OutputTag 侧输出流的使用方法
在 Apache Flink 的数据处理流程中,`OutputTag` 是用于定义和获取侧输出流的关键组件。通过 `OutputTag`,可以从主数据流 (`DataStream`) 中分离出一部分满足特定条件的数据作为侧输出流。
以下是关于如何使用 `OutputTag` 创建和访问侧输出流的具体说明:
#### 定义 OutputTag
`OutputTag` 需要指定一个唯一的名称以便后续识别。通常情况下,它是一个泛型类实例,表示侧输出流中的数据类型。例如,在 Scala 中可以这样定义:
```scala
val outputTag = new OutputTag[String]("side-output")
```
而在 Java 中则为:
```java
final OutputTag<String> outputTag = new OutputTag<>("side-output") {};
```
#### 添加到 Side Output 流程
要在某个算子 (Operator) 中向侧输出写入数据,需调用其上下文对象的方法 `output()` 并传入对应的 `OutputTag` 和目标数据项。下面展示了一个简单的过滤器逻辑实现方式——当某些记录不符合标准时将其发送至侧输出而非丢弃它们。
##### Scala 实现例子
```scala
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.OutputTag
object SideOutputExample {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Define the side output tag.
val outputTag = new OutputTag[(String, Int)]("filtered-elements")
val input: DataStream[Int] = env.fromElements(1, 2, 3, 4)
val processedStream = input.process(new ProcessFunction[Int, String]() {
override def processElement(value: Int,
ctx: ProcessFunction[Int, String]#Context,
out: Collector[String]): Unit = {
if (value % 2 == 0){
out.collect(s"Even number: $value")
} else {
ctx.output(outputTag, ("Odd", value))
}
}
})
// Get the side output stream using getSideOutput().
val sideOutputStream: DataStream[(String, Int)] = processedStream.getSideOutput(outputTag)
processedStream.print()
sideOutputStream.print()
env.execute("Side Output Example")
}
}
```
##### Java 实现例子
```java
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class SideOutputJavaExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create an OutputTag with a name 'odd-numbers'.
final OutputTag<Tuple2<String, Integer>> oddNumbersTag =
new OutputTag<>("odd-numbers"){};
DataStream<Integer> inputStream = env.fromElements(1, 2, 3, 4);
DataStream<String> evenNumbersStream = inputStream.process(
new ProcessFunction<Integer, String>() {
@Override
public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
if (value % 2 == 0) {
out.collect("Even Number:" + value);
} else {
ctx.output(oddNumbersTag, Tuple.of("Odd Number:", value));
}
}
});
// Retrieve and work on the side output data set by calling getSideOutput() method.
DataStream<Tuple2<String,Integer>> oddNumberStream = evenNumbersStream.getSideOutput(oddNumbersTag);
evenNumbersStream.print();
oddNumberStream.print();
env.execute("Flink Side Outputs");
}
}
```
以上代码展示了如何利用 `ProcessFunction` 将不匹配条件的数据重定向到由 `OutputTag` 标识的另一条路径上去[^1][^2]。
### 注意事项
- **唯一性**: 每个 `OutputTag` 应具有全局范围内独一无二的名字字符串以防混淆不同的分支。
- **性能考量**: 过度依赖于复杂模式下的多重分流可能会影响整体程序效率因此建议谨慎设计架构图结构并评估实际需求后再决定是否采用此特性[^1].
阅读全文
相关推荐
















