Flink 双流 Join 的3种操作示例
发布网友
发布时间:2024-10-23 22:50
我来回答
共1个回答
热心网友
时间:2024-10-27 23:05
在数据库进行OLAP分析时,两表JOIN操作是常见操作。在流式处理中,有时也需要在两条流上进行JOIN操作以获取更多信息。Flink DataStream API提供了3个算子实现双流JOIN,分别是:
join() 算子
coGroup() 算子
intervalJoin() 算子
本文将举例说明它们的使用方法,并简单介绍interval join的原理。
准备数据
从Kafka接入点击流和订单流,并转化为POJO。
join() 算子
join() 算子提供"Window join"语义,按照指定字段和(滚动/滑动/会话)窗口进行inner join,支持处理时间和事件时间两种时间特征。以下示例以10秒滚动窗口,通过商品ID关联两个流,取得订单流中的售价相关字段。
简单易用。
coGroup() 算子
coGroup() 算子实现left/right outer join,需要开窗,CoGroupFunction比JoinFunction更灵活,可以按照用户指定逻辑匹配左流和/或右流的数据并输出。以下例子实现了点击流left join订单流的功能,采用nested loop join思想。
intervalJoin() 算子
join() 和coGroup()都是基于窗口进行关联的。但在某些情况下,两条流的数据步调可能不一致。Flink提供了"Interval join"语义,按照指定字段以及右流相对左流偏移的时间区间进行关联,即:
right.timestamp ∈ [left.timestamp + lowerBound; left.timestamp + upperBound]
interval join是inner join,不需要开窗,需要用户指定偏移区间的上下界,只支持事件时间。示例代码如下,需要分别在两个流上应用assignTimestampsAndWatermarks()方法获取事件时间戳和水印。
interval join的实现原理
KeyedStream.process(ProcessJoinFunction)方法调用的重载方法的逻辑如下:
先对两条流执行connect()和keyBy()操作,利用IntervalJoinOperator算子进行转换。在IntervalJoinOperator中,会利用两个MapState分别缓存左流和右流的数据。
当左流和右流有数据到达时,会分别调用processElement1()和processElement2()方法,它们都调用了processElement()方法。这段代码的思路是:
1.取得当前流StreamRecord的时间戳,判断是否是迟到数据,是则丢弃。
2.将时间戳和数据插入当前流对应的MapState。
3.遍历另外一个流的MapState,如果数据满足前述的时间区间条件,则调用collect()方法将该条数据投递给用户定义的ProcessJoinFunction进行处理。
4.注册时间戳为timestamp + relativeUpperBound的定时器,负责在水印超过区间的上界时执行状态的清理逻辑,防止数据堆积。