Window意为窗口。在流处理系统中数据源源不断流入到系统,我们可以逐条处理流入的数据,也可以按一定规则一次处理流中的多条数据。当处理数据时程序需要知道什么时候开始处理、处理哪些数据。窗口提供了这样一种依据,决定了数据何时开始处理。
Flink内置Window
Flink有3个内置Window
- 以事件数量驱动的Count Window
- 以会话间隔驱动的Session Window
- 以时间驱动的Time Window
本文围绕这3个内置窗口展开讨论,我们首先了解这3个窗口在运行时产生的现象,最后再讨论它们的实现原理。
Count Window
计数窗口,采用 事件数量 作为窗口处理依据。计数窗口分为滚动和滑动两类,使用keyedStream.countWindow
实现计数窗口定义。
- Tumbling Count Window 滚动计数窗口
例子
:以用户分组,当每位用户有3次付款事件时计算一次该用户付款总金额。下图中“消息A、B、C、D”代表4位不同用户,我们以A、B、C、D分组并计算金额。
1 | /** 每3个事件,计算窗口内数据 */ |
- Sliding Count Window 滑动计数窗口
例子
:一位用户每3次付款事件计算最近4次付款事件总金额。
1 | /** 每3个事件,计算最近4个事件消息 */ |
Session Window
会话窗口,采用 会话持续时长 作为窗口处理依据。设置指定的会话持续时长时间,在这段时间中不再出现会话则认为超出会话时长。
例子
:每只股票超过2秒没有交易事件时计算窗口内交易总金额。下图中“消息A、消息B”代表两只不同的股票。
1 | /** 会话持续2秒。当超过2秒不再出现会话认为会话结束 */ |
Time Window
时间窗口,采用 时间 作为窗口处理依据。时间窗分为滚动和滑动两类,使用keyedStream.timeWindow
实现时间窗定义。
- Tumbling Time Window 滚动时间窗口:
1
2/** 每1分钟,计算窗口数据 */
keyedStream.timeWindow(Time.minutes(1));
- Sliding Time Window 滑动时间窗口:
1
2/** 每半分钟,计算最近1分钟窗口数据 */
keyedStream.timeWindow(Time.minutes(1), Time.seconds(30));
Flink Window组件
Flink Window使用3个组件协同实现了内置的3个窗口。通过对这3个组件不同的组合,可以满足许多场景的窗口定义。
WindowAssigner组件 为数据分配窗口、Trigger组件决定 如何处理窗口中的数据、借助Evictor组件实现 灵活清理窗口中数据时机。
WindowAssigner
当有数据流入到Window Operator时需要按照一定规则将数据分配给窗口,WindowAssigner
为数据分配窗口。下面代码片段是WindowAssigner
部分定义,assignWindows
方法定义返回的结果是一个集合,也就是说数据允许被分配到多个窗口中。1
2
3
4
5
6/*** WindowAssigner关键接口定义 ***/
public abstract class WindowAssigner<T, W extends Window> implements Serializable {
/** 分配数据到窗口集合并返回 */
public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);
}
Flink内置WindowAssigner
Flink针对不同窗口类型实现了相应的WindowAssigner。Flink 1.7.0继承关系如下图
Assigner | 说明 |
---|---|
GlobalWindows | 所有数据都分配到同一个窗口(GlobalWindow) |
TumblingProcessingTimeWindows | 基于处理时间的滚动窗口分配处理 |
TumblingEventTimeWindows | 基于事件时间的滚动窗口分配处理 |
SlidingEventTimeWindows | 基于事件时间的滑动窗口分配处理 |
SlidingProcessingTimeWindows | 基于处理时间的滑动窗口分配处理 |
MergingWindowAssigner | 一个抽象类,本身是一个WindowAssigner。内部定义了Window可以Merge的特性。 |
EventTimeSessionWindows | 基于事件时间可Merge的窗口分配处理 |
ProcessingTimeSessionWindows | 基于处理时间可Merge的窗口分配处理 |
Trigger
Trigger触发器,它定义了3个触发动作,并且定义了触发动作处理完毕后的返回结果。返回结果交给Window Operator后由Window Operator决定后续操作。也就是说,Trigger通过具体的动作处理结果决定窗口是否应该被处理、被清除、被处理+清除、还是什么都不做。1
2
3
4
5
6
7
8
9
10
11
12/** Trigger关键接口定义 */
public abstract class Trigger<T, W extends Window> implements Serializable {
/*** 新的数据进入窗口时触发 ***/
public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
/*** 处理时间计数器触发 ***/
public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
/*** 事件时间计数器触发 ***/
public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
}
当有数据流入Window Operator时会触发onElement
方法、当处理时间和事件时间生效时会触发onProcessingTime
和onEventTime
方法。每个触发动作的返回结果用TriggerResult
定义。
TriggerResult返回类型及说明
Trigger触发运算后返回处理结果,处理结果使用TriggerResult
枚举表示。1
2
3public enum TriggerResult {
CONTINUE,FIRE,PURGE,FIRE_AND_PURGE;
}
返回类型 | 说明 |
---|---|
CONTINUE | 不做任何操作 |
FIRE | 处理窗口数据,窗口计算后不做清理。这意味着下次FIRE时候可以再次用来计算(比如滑动计数窗口)。 |
PURGE | 移除窗口和窗口中的数据 |
FIRE_AND_PURGE | FIRE+PURGE的组合处理,即处理并移除窗口中的数据 |
Flink内置Trigger
Flink的内置窗口(Counter、Session、Time)有自己的触发器实现。下表为不同窗口使用的触发器。
窗口 | 触发器 | 说明 |
---|---|---|
Session Window | EventTimeTrigger | 基于事件时间/摄取时间的触发器实现 |
Session Window | ProcessingTimeTrigger | 基于处理时间的触发器实现 |
Time Window | EventTimeTrigger | 基于滚动、事件时间/摄取时间的触发器实现 |
Time Window | ProcessingTimeTrigger | 基于滚动、处理时间的触发器实现 |
Time Window | EventTimeTrigger | 基于滑动、事件时间/摄取时间的触发器实现 |
Time Window | ProcessingTimeTrigger | 基于滑动、处理时间的触发器实现 |
- | NeverTrigger | GlobalWindows提供,每个触发逻辑不执行任何处理 |
- | ContinuousEventTimeTrigger | 根据设置的时间间隔连续触发,时间间隔依赖于水印时间戳 |
- | ContinuousProcessingTimeTrigger | 根据设置的时间间隔连续触发,时间间隔依赖于运行程序所在机器系统时钟 |
Evictor
Evictor驱逐者,如果定义了Evictor当执行窗口处理前会删除窗口内指定数据再交给窗口处理,或等窗口执行处理后再删除窗口中指定数据。1
2
3
4
5
6public interface Evictor<T, W extends Window> extends Serializable {
/** 在窗口处理前删除数据 */
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
/** 在窗口处理后删除数据 */
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
}
Flink内置Evictor
Evictor | 说明 |
---|---|
CountEvictor | 可以保持窗口中一定数量的事件数据。在滑动计数窗口中实现evictBefore方法,以保持窗口中存在最近N次事件,从而达到了滑动效果。 |
TimeEvictor | 可以让元素在窗口中存在一定的时间,较老的数据会被删除。 |
DeltaEvictor | 根据DeltaFunction实现和阀值决定如何清理数据 |
实现原理
通过KeyedStream
可以直接创建Count Window和Time Window。他们最终都是基于window(WindowAssigner)
方法创建,在window方法中创建WindowedStream
实例,参数使用当前的KeyedStream对象和指定的WindowAssigner。1
2
3
4/** 依据WindowAssigner实例化WindowedStream */
public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
return new WindowedStream<>(this, assigner);
}
1 | /** WindowedStream构造器 */ |
构造器执行完毕后,WindowedStream创建完成。构造器中初始化了3个属性。默认情况下trigger属性使用WindowAssigner提供的DefaultTrigger作为初始值。
同时,WindowedStream提供了trigger方法用来覆盖默认的trigger。Flink内置的计数窗口就使用windowedStream.trigger
方法覆盖了默认的trigger。1
2
3
4
5
6
7
8
9
10public WindowedStream<T, K, W> trigger(Trigger<? super T, ? super W> trigger) {
if (windowAssigner instanceof MergingWindowAssigner && !trigger.canMerge()) {
throw new UnsupportedOperationException();
}
if (windowAssigner instanceof BaseAlignedWindowAssigner) {
throw new UnsupportedOperationException();
}
this.trigger = trigger;
return this;
}
在WindowedStream中还有一个比较重要的属性evictor
,可以通过evictor
方法设置。1
2
3
4
5
6
7public WindowedStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor) {
if (windowAssigner instanceof BaseAlignedWindowAssigner) {
throw new UnsupportedOperationException();
}
this.evictor = evictor;
return this;
}
WindowedStream实现中根据evictor属性是否空(null == evictor
)决定是创建WindowOperator
还是EvictingWindowOperator
。EvictingWindowOperator
继承自WindowOperator
,它主要扩展了evictor属性以及相关的逻辑处理。1
2
3public class EvictingWindowOperator extends WindowOperator {
private final Evictor evictor;
}
Evictor定义了清理数据的时机。在EvictingWindowOperator的emitWindowContents
方法中,实现了清理数据逻辑调用。这也是EvictingWindowOperator与WindowOperator的主要区别。「在WindowOperator中压根就没有evictor的概念」1
2
3
4
5
6
7
8private void emitWindowContents(W window, Iterable<StreamRecord<IN>> contents, ListState<StreamRecord<IN>> windowState) throws Exception {
/** Window处理前数据清理 */
evictorContext.evictBefore(recordsWithTimestamp, Iterables.size(recordsWithTimestamp));
/** Window处理 */
userFunction.process(triggerContext.key, triggerContext.window, processContext, projectedContents, timestampedCollector);
/** Window处理后数据清理 */
evictorContext.evictAfter(recordsWithTimestamp, Iterables.size(recordsWithTimestamp));
}
Count Window API
下面代码片段是KeyedStream
提供创建Count Window的API。1
2
3
4
5
6
7
8
9
10
11/** 滚动计数窗口 */
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
}
/** 滑动计数窗口 */
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
return window(GlobalWindows.create())
.evictor(CountEvictor.of(size))
.trigger(CountTrigger.of(slide));
}
滚动计数窗口与滑动计数窗口有几个差异
- 入参不同
- 滑动窗口使用了evictor组件
- 两者使用的trigger组件不同
下面我们对这几点差异做深入分析,看一看他们是如何影响滚动计数窗口和滑动计数窗口的。
Count Window Assigner
通过方法window(GlobalWindows.create())
创建WindowedStream实例,滚动计数窗口处理和滑动计数窗口处理都是基于GlobalWindows
作为WindowAssigner来创建窗口处理器。GlobalWindows
将所有数据都分配到同一个GlobalWindow
中。「这里需要注意GlobalWindows
是一个WindowAssigner,而GlobalWindow
是一个Window」1
2
3
4
5
6
7/** GlobalWindows是一个WindowAssigner实现,这里只展示实现assignWindows的代码片段 */
public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
/** 返回一个GlobalWindow */
public Collection<GlobalWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
return Collections.singletonList(GlobalWindow.get());
}
}
GlobalWindow继承了Window,表示为一个窗口。对外提供get()
方法返回GlobalWindow
实例,并且是个全局单例。所以当使用GlobalWindows
作为WindowAssigner时,所有数据将被分配到一个窗口中。1
2
3
4
5
6
7
8/** GlobalWindow是一个Window */
public class GlobalWindow extends Window {
private static final GlobalWindow INSTANCE = new GlobalWindow();
/** 永远返回GlobalWindow单例 */
public static GlobalWindow get() {
return INSTANCE;
}
}
Count Window Trigger
滚动计数窗口创建时使用PurgingTrigger.of(CountTrigger.of(size))
覆盖了GlobalWindows默认的Trigger,而滑动计数窗口创建时使用CountTrigger.of(size)
覆盖了GlobalWindows默认的Trigger。
PurgingTrigger是一个代理模式的Trigger实现,在计数窗口中PurgingTrigger代理了CountTrigger。1
2
3
4
5
6
7
8
9
10
11/** PurgingTrigger代理的Trigger */
private Trigger<T, W> nestedTrigger;
/** PurgingTrigger私有构造器 */
private PurgingTrigger(Trigger<T, W> nestedTrigger) {
this.nestedTrigger = nestedTrigger;
}
/** 为代理的Trigger构造一个PurgingTrigger实例 */
public static <T, W extends Window> PurgingTrigger<T, W> of(Trigger<T, W> nestedTrigger) {
return new PurgingTrigger<>(nestedTrigger);
}
在这里比较一下PurgingTrigger.onElement
和CountTrigger.onElement
方法实现,帮助理解PurgingTrigger的作用。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15/** CountTrigger实现 */
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
count.add(1L);
if (count.get() >= maxCount) {
count.clear();
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
/** PurgingTrigger实现 */
public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
TriggerResult triggerResult = nestedTrigger.onElement(element, timestamp, window, ctx);
return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
}
在CountTrigger实现中,当事件流入窗口后计数+1,之后比较窗口中事件数是否大于设定的最大数量,一旦大于最大数量返回FIRE。也就是说只处理窗口数据,不做清理。
在PurgingTrigger实现中,依赖CountTrigger的处理逻辑,但区别在于当CounterTrigger返回FIRE时PurgingTrigger返回FIRE_AND_PURGE。也就是说不仅处理窗口数据,还做数据清理。通过这种方式实现了滚动计数窗口数据不重叠。
Count Window Evictor
滚动计数窗口和滑动计数窗口另一个区别在于滑动计数窗口通过windowedStream.evictor(CountEvictor.of(size))
方法设置了Evictor,而滚动窗口并没有设置Evictor。
滑动计数窗口依赖Evictor组件在窗口处理前清除了指定数量以外的数据,再交给窗口处理。通过这种方式实现了窗口计算最近指定次数的事件数量。
总结
计数窗口 | WindowAssigner | Evictor | Trigger | 说明 |
---|---|---|---|---|
滚动计数窗口 | GlobalWindows | - | PurgingTrigger | 窗口处理数据前后不清理数据,由Trigger返回值声明直接清理数据,清理数据依赖Trigger返回结果 |
滑动计数窗口 | GlobalWindows | CountEvictor | CountTrigger | Trigger返回结果不能清理数据(返回结果不带PURGE),在窗口处理完后数据会被保留下来,为下一个滑动窗口使用。因为使用了CountEvictor,会在窗口处理前清除不需要的数据 |
Time Window API
下面代码片段是KeyedStream
中提供创建Time Window的API。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16/** 创建滚动时间窗口 */
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return window(TumblingProcessingTimeWindows.of(size));
} else {
return window(TumblingEventTimeWindows.of(size));
}
}
/** 创建滑动时间窗口 */
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) {
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return window(SlidingProcessingTimeWindows.of(size, slide));
} else {
return window(SlidingEventTimeWindows.of(size, slide));
}
}
创建TimeWindow时会根据Flink应用当前时间类型environment.getStreamTimeCharacteristic()
来决定使用哪个WindowAssigner创建窗口。
Flink对时间分成了3类。处理时间、摄入时间、事件时间。使用TimeCharacteristic
枚举定义。1
2
3
4
5
6
7
8public enum TimeCharacteristic {
/** 处理时间 */
ProcessingTime,
/** 摄入时间 */
IngestionTime,
/** 事件时间 */
EventTime
}
对于Flink的3个时间概念,我们目前只需要了解
- 处理时间(TimeCharacteristic.ProcessingTime)就是运行Flink环境的系统时钟产生的时间
- 事件时间(TimeCharacteristic.EventTime)是业务上产生的时间,由数据自身携带
- 摄入时间(TimeCharacteristic.IngestionTime)是数据进入到Flink的时间,它在底层实现上与事件时间相同。
Time Window Assigner
下面的表格中展示了窗口类型和时间类型对应的WindowAssigner
的实现类
时间窗口类型 | 时间类型 | WindowAssigner |
---|---|---|
滚动时间窗 | ProcessingTime | TumblingProcessingTimeWindows |
滚动时间窗 | IngestionTime | TumblingEventTimeWindows |
滚动时间窗 | EventTime | TumblingEventTimeWindows |
滑动时间窗 | ProcessingTime | SlidingProcessingTimeWindows |
滑动时间窗 | IngestionTime | SlidingEventTimeWindows |
滑动时间窗 | EventTime | SlidingEventTimeWindows |
我们以一个TumblingProcessingTimeWindows
和一个SlidingEventTimeWindows
为例,讨论它的实现原理。
TumblingProcessingTimeWindowsTumblingProcessingTimeWindows
基于处理时间的滚动时间窗口分配器,它是一个WindowAssigner。Flink提供两个接口初始化TumblingProcessingTimeWindows
1
2
3
4
5
6public static TumblingProcessingTimeWindows of(Time size) {
return new TumblingProcessingTimeWindows(size.toMilliseconds(), 0);
}
public static TumblingProcessingTimeWindows of(Time size, Time offset) {
return new TumblingProcessingTimeWindows(size.toMilliseconds(), offset.toMilliseconds());
}
不管使用哪种方式初始化TumblingProcessingTimeWindows
,最终都会调用同一个构造方法初始化,构造方法初始化size和offset两个属性。
1 | /** TumblingProcessingTimeWindows构造器 */ |
TumblingProcessingTimeWindows
是一个WindowAssigner,所以它实现了assignWindows
方法来为流入的数据分配窗口。1
2
3
4
5public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
final long now = context.getCurrentProcessingTime();
long start = TimeWindow.getWindowStartWithOffset(now, offset, size);
return Collections.singletonList(new TimeWindow(start, start + size));
}
- 第一步
assignWindows
首先获得系统当前时间戳,context.getCurrentProcessingTime();
最终实现实际是调用System.currentTimeMillis()
。 - 第二步执行
TimeWindow.getWindowStartWithOffset(now, offset, size);
这个方法根据当前时间、偏移量、设置的间隔时间最终计算窗口起始时间。 - 第三步根据起始时间和结束时间创建一个新的窗口
new TimeWindow(start, start + size)
并返回。
比如,希望每10秒处理一次窗口数据keyedStream.timeWindow(Time.seconds(10))
。当数据源源不断的流入Window Operator时,它会按10秒切割一个时间窗。
我们假设数据在2019年1月1日 12:00:07到达,那么窗口以下面方式切割(请注意,窗口是左闭右开
)。1
Window[2019年1月1日 12:00:00, 2019年1月1日 12:00:10)
如果在2019年1月1日 12:10:09又一条数据到达,窗口是这样的1
Window[2019年1月1日 12:10:00, 2019年1月1日 12:10:10)
如果我们希望从第15秒开始,每过1分钟计算一次窗口数据,这种场景需要用到offset。基于处理时间的滚动窗口可以这样写keyedStream.window(TumblingProcessingTimeWindows.of(Time.minutes(1), Time.seconds(15)))
我们假设数据从2019年1月1日 12:00:14到达,那么窗口以下面方式切割1
Window[2019年1月1日 11:59:15, 2019年1月1日 12:00:15)
如果在2019年1月1日 12:00:16又一数据到达,那么窗口以下面方式切割1
Window[2019年1月1日 12:00:15, 2019年1月1日 12:01:15)
TumblingProcessingTimeWindows.assignWindows
方法每次都会返回一个新的窗口,也就是说窗口是不重叠的。但因为TimeWindow实现了equals
方法,所以通过计算后start, start + size相同的数据,在逻辑上是同一个窗口。1
2
3
4
5
6
7
8
9
10public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TimeWindow window = (TimeWindow) o;
return end == window.end && start == window.start;
}
SlidingEventTimeWindowsSlidingEventTimeWindows
基于事件时间的滑动时间窗口分配器,它是一个WindowAssigner。Flink提供两个接口初始化SlidingEventTimeWindows
1
2
3
4
5
6public static SlidingEventTimeWindows of(Time size, Time slide) {
return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0);
}
public static SlidingEventTimeWindows of(Time size, Time slide, Time offset) {
return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(),offset.toMilliseconds() % slide.toMilliseconds());
}
同样,不管使用哪种方式初始化SlidingEventTimeWindows
,最终都会调用同一个构造方法初始化,构造方法初始化三个属性size、slide和offset。1
2
3
4
5
6
7
8protected SlidingEventTimeWindows(long size, long slide, long offset) {
if (offset < 0 || offset >= slide || size <= 0) {
throw new IllegalArgumentException();
}
this.size = size;
this.slide = slide;
this.offset = offset;
}
SlidingEventTimeWindows
是一个WindowAssigner,所以它实现了assignWindows
方法来为流入的数据分配窗口。1
2
3
4
5
6
7
8
9
10
11
12public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
for (long start = lastStart; start > timestamp - size;start -= slide) {
windows.add(new TimeWindow(start, start + size));
}
return windows;
} else {
throw new RuntimeException();
}
}
与基于处理时间的WindowAssigner不同,基于事件时间的WindowAssigner不依赖于系统时间,而是依赖于数据本身的事件时间。在assignWindows
方法中第二个参数timestamp
就是数据的事件时间。
- 第一步
assignWindows
方法会先初始化一个List<TimeWindow>
,大小是size / slide
。这个集合用来存放时间窗对象并作为返回结果。 - 第二步执行
TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
计算窗口起始时间。 - 第三步根据事件时间、滑动大小和窗口大小计算并生成数据能落入的窗口
new TimeWindow(start, start + size)
,最后加入到List集合并返回。「因为是滑动窗口一个数据可能落在多个窗口」
比如,希望每5秒滑动一次处理最近10秒窗口数据keyedStream.timeWindow(Time.seconds(10), Time.seconds(5))
。当数据源源不断流入Window Operator时,会按10秒切割一个时间窗,5秒滚动一次。
我们假设一条付费事件数据付费时间是2019年1月1日 17:11:24,那么这个付费数据将落到下面两个窗口中(请注意,窗口是左闭右开
)。1
2Window[2019年1月1日 17:11:20, 2019年1月1日 17:11:30)
Window[2019年1月1日 17:11:15, 2019年1月1日 17:11:25)
Time Window Trigger
Flink API在创建Time Window时没有使用windowStream.trigger
方法覆盖默认Trigger。
TumblingProcessingTimeWindows使用ProcessingTimeTrigger作为默认Trigger。ProcessingTimeTrigger
在onElement
的策略是永远返回CONTINUE,也就是说它不会因为数据的流入触发窗口计算和清理。在返回CONTINUE前调用registerProcessingTimeTimer(window.maxTimestamp());
注册一个定时器,并且逻辑相同窗口只注册一次,事件所在窗口的结束时间与系统当前时间差决定了定时器多久后触发。1
ScheduledThreadPoolExecutor.schedule(new TriggerTask(), timeEndTime - systemTime, TimeUnit.MILLISECONDS);
定时器一旦触发会回调Trigger的onProcessingTime
方法。ProcessingTimeTrigger
中实现的onProcessingTime
直接返回FIRE。也就是说系统时间大于等于窗口最大时间时,通过回调方式触发窗口计算。但因为返回的是FIRE只是触发了窗口计算,并没有做清除。
SlidingEventTimeWindows使用EventTimeTrigger作为默认Trigger。事件时间、摄入时间与处理时间在时间概念上有一点不同,处理时间处理依赖的是系统时钟生成的时间,而事件时间和摄入时间依赖的是Watermark(水印)
。我们现在只需要知道水印是一个时间戳,可以由Flink以固定的时间间隔发出,或由开发人员根据业务自定义。水印用来衡量处理程序的时间进展。
EventTimeTrigger
的onElement
方法中比较窗口的结束时间与当前水印时间,如果窗口结束时间已小于或等于当前水印时间立即返回FIRE。
「个人理解这是由于时间差问题导致的窗口时间小于或等于当前水印时间,正常情况下如果窗口结束时间已经小于水印时间则数据不会被处理,也不会调用onElement」
如果窗口结束时间大于当前水印时间,调用registerEventTimeTimer(window.maxTimestamp())
注册一个事件后直接返回CONTINUE。EventTime注册事件没有使用Scheduled,因为它依赖水印时间。所以在注册时将逻辑相同的时间窗封装为一个特定对象添加到一个排重队列,并且相同窗口对象只添加一次。
上面提到水印是以固定时间间隔发出或由开发人员自定义的,Flink处理水印时从排重队列头获取一个时间窗对象与水印时间戳比较,一旦窗口时间小于或等于水印时间回调trigger的onEventTime
。
EventTimeTrigger中onEventTime
并不是直接返回FIRE,而是判断窗口结束时间与获取的时间窗对象时间做比较,仅当时间相同时才返回FIRE,其他情况返回CONTINUE。「个人理解这么做是为了满足滑动窗口的需求,因为滑动窗口在排重队列中存在两个不同的对象,而两个窗口对象的时间可能同时满足回调条件」
Time Window Evictor
Flink内置Time Window实现没有使用Evictor。
Session Window API
KeyedStream
中没有为Session Window提供类似Count Windown和Time Window一样能直接使用的API。我们可以使用window(WindowAssigner assigner)
创建Session Window。
比如创建一个基于处理时间,时间间隔为2秒的SessionWindow可以这样实现1
keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(2)))
Assigner
Flink内置的Session Window Assigner全部继承MergingWindowAssigner。下图展示了MergingWindowAssigner的上下结构关系。
MergingWindowAssigner继承了WindowAssigner,所以它具备分配时间窗的能力。MergingWindowAssigner自身是一个可以merge的Window,它的内部定义了一个mergeWindows抽象方法以及merge时的回调定义。1
2
3
4
5public abstract void mergeWindows(Collection<W> windows, MergeCallback<W> callback);
public interface MergeCallback<W> {
void merge(Collection<W> toBeMerged, W mergeResult);
}
我们以ProcessingTimeSessionWindows
为例介绍Session Window。ProcessingTimeSessionWindows
提供了一个静态方法用来初始化ProcessingTimeSessionWindows
1
2
3public static ProcessingTimeSessionWindows withGap(Time size) {
return new ProcessingTimeSessionWindows(size.toMilliseconds());
}
静态方法withGap接收一个时间参数,用来描述时间间隔。并调用构造方法将时间间隔赋值给sessionTimeout属性。1
2
3
4
5
6protected ProcessingTimeSessionWindows(long sessionTimeout) {
if (sessionTimeout <= 0) {
throw new IllegalArgumentException();
}
this.sessionTimeout = sessionTimeout;
}
ProcessingTimeSessionWindows
是一个WindowAssigner,所以它实现了数据分配窗口的能力。1
2
3
4public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
long currentProcessingTime = context.getCurrentProcessingTime();
return Collections.singletonList(new TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout));
}
ProcessingTimeSessionWindows
会为每个数据都分配一个新的时间窗口。由于是基于处理时间,所以窗口的起始时间就是系统当前时间,而结束时间是系统当前时间+设置的时间间隔。通过起始时间和结束时间确定了窗口的时间范围。
Trigger
如果在代码中我们不手动覆盖Trigger,那么将使用ProcessingTimeSessionWindows
默认的ProcessingTimeTrigger
1
2
3public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return ProcessingTimeTrigger.create();
}
ProcessingTimeTrigger
在基于处理时间的Time Window介绍过,它通过注册、onProcessorTime回调方式触发窗口计算,这里不再讨论。
Evictor
Session Window不由Flink API控制生成,完全取决于客户端如何创建。在创建Window实例后可以通过调用evictor
方法并传入Flink内置的Evictor或自己实现的Evictor。
Merging
Session Window继承MergingWindowAssigner
,MergingWindowAssigner
继承WindowAssigner
。所以本质上Session Window还是一个WindowAssigner
,但因继承了MergingWindowAssigner
使得自己具有了一个「可以合并时间窗口」的特性。1
2
3public void mergeWindows(Collection<TimeWindow> windows, MergeCallback<TimeWindow> c) {
TimeWindow.mergeWindows(windows, c);
}
Session Window处理流程大致是这样
- 使用WindowAssigner为流入的数据分配窗口
- Merge窗口,将存在交集的窗口合并,取最小时间和最大时间作为窗口的起始和关闭。假设有两条数据流入系统后,通过WindowAssigner分配的窗口分别是
数据A:Window[2019年1月1日 10:00:00, 2019年1月1日 10:20:00)
数据B:Window[2019年1月1日 10:05:00, 2019年1月1日 10:25:00)
经过合并后,使用数据A的起始时间和数据B的结束时间作为节点,窗口时间变为了
[2019年1月1日 10:00:00, 2019年1月1日 10:25:00) - 执行
Trigger.onMerge
,为合并后的窗口注册回调事件 - 移除其他注册的回调事件
- Window State合并
- 开始处理数据,执行
Trigger.onElement
…后续与其他Window处理一样
可以看到,Session Window与Time Window类似,通过注册回调方式触发数据处理。但不同的是Session Window通过不断为新流入的数据做Merge操作来改变回调时间点,以实现Session Window的特性。
总结
- Window Operator创建
Window处理流程由WindowOperator
或EvictingWindowOperator
控制,他们的关系及区别体现在以下几点
EvictingWindowOperator
继承自WindowOperator
,所以EvictingWindowOperator
是一个WindowOperator
,具备WindowOperator
的特性。- 清理窗口数据的机制不同,
EvictingWindowOperator
内部依赖Evictor组件,而WindowOperator
内部不使用Evictor。这也导致它们两个Operator初始化时的差异
MergeWindow特殊处理
可以合并窗口的WindowAssigner
会继承MergingWindowAssigner
。当数据流入Window Operator后,根据WindowAssigner
是否为一个MergingWindowAssigner
决定了处理流程。窗口生命周期
Flink内置的窗口生命周期是不同的,下表描述了他们直接的差异
窗口类型 | 窗口周期 |
---|---|
Count Window | 依赖事件数量,当事件数量达不到设定上限时永远不会触发。浪费内存、数据处理延时 |
Session Window | 依赖了会话时间间隔,超过设置的时间时窗口结束,触发计算 |
Time Window | 时间是一直在变的,所以时间窗口总会触发 |
侧路输出
当Flink应用采用EventTime作为时间机制时,Window不会处理延迟到达的数据,也就是说不处理在水印时间戳之前的数据。Flink提供了一个SideOutput机制可以处理这些延迟到达的数据。通过WindowedStream.sideOutputLateData
方法实现侧路输出。自定义窗口
Flink内置窗口利用WindowAssigner、Trigger、Evictor3个组件的相互组合实现了多种非常强大的功能,我们也可以尝试通过组件实现一个自定义的Window。由于篇幅原因,自定义窗口下篇再细聊。