三、Flink关于source源码分享
一、什么是source
ddd
source是flink面向流计算的数据源头,所有的数据都是通过source进入我们的程序内部,详见下图source所在整个流程位置.
二、如何使用一个source
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamdataStream= env.addSource(SourceFunctionfunction);
从上面的代码来看,使用一个source非常的简单,创建一个环境对象env,然后将SourceFunction传入addSource.
三、SourceFunction的源码
SourceFunction是Flink中所有流数据源的基本接口(源码注释里面有说明)
1、 SourceFunction接口继承关系
SourceFunction接口继承了Function接口,其实Function接口就是一个空接口(java里面的基本功能是:实现类似多重继承的功能)
Function是一个实现自定义函数的基础接口(The base interface for all user-defined functions) ,既然是所以类的基础类,那么我们的SourceFunction继承当然也需要继承它.
2、 SourceFunction接口内部方法
SourceFunction里面有run、cancel接口和一个内部接口SourceContext,
2.1 : run方法的入参数是SourceContext,run方法是数据的接受入口.即对接一个外部数据源然后emit元素形成stream(大部分情况下会通过在该方法里运行一个while循环的形式来产生stream)
2.2 : cancel方法:取消一个source. 比较典型的是将run方法里面while(run) 甚至为run=false. 取消一个source,也即将run中的循环emit元素的行为终止
2.3 : SourceContext是摄取源数据和水位线的的接口.
collect方法不指定时间
collectWithTimestamp 自定义EventTime时间
四、flink内置的Source方法
介绍几个常用source内
1、RichSourceFunction(抽象类) : 他是一个抽象类继承了AbstractRichFunction ,用于实现可访问上下文信息的并行数据源的基类
2、ParallelSourceFunction(接口): source并行接口
常用的kafkasource就属于并行的
3、RichParallelSourceFunction : 继承AbstractRichFunction,并且实现RichSourceFunction (此类具有并行功能).
4、SocketTextStreamFunction :数据源是socket的 \n或者\r作为分隔符
五、自定义source
SourceFunction.java类里面有一个demo,代码如下
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
/**
* @author zhuhuipei
* @Description:
* @date 2020-04-18
* @time 17:21
*/
public class DemoSource implements SourceFunction, CheckpointedFunction {
private long count = 0L;
private volatile boolean isRunning = true;
private transient ListStatecheckPointedCount;
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
this.checkPointedCount.clear();
this.checkPointedCount.add(count);
System.out.println("snapshotState>>>>>>>");
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
System.out.println("initializeState>>>>>>>");
this.checkPointedCount = context
.getOperatorStateStore()
.getListState(new ListStateDescriptor<>("count", Long.class));
if (context.isRestored()) {
for (Long count : this.checkPointedCount.get()) {
this.count = count;
}
}
}
@Override
public void run(SourceContext ctx) throws Exception {
while (isRunning && count < 500) {
Thread.sleep(1000);
synchronized (ctx.getCheckpointLock()) {
ctx.collect(count);
count++;
}
}
}
@Override
public void cancel() {
System.out.println("cancel>>>>>>>");
isRunning = false;
}
}
需要实现SourceFunction接口,如果需要checkpoint还需要实现CheckpointedFunction接口
我的标签
随笔档案
- 2020-04 (3)
- 2019-09 (2)
- 2019-08 (1)
- 2019-01 (1)
- 2017-09 (1)
- 2017-06 (1)
- 2017-05 (3)
- 2017-03 (1)
- 2017-02 (2)
- 2017-01 (3)
- 2016-12 (2)
- 2016-10 (3)
- 2016-09 (2)
- 2016-08 (3)
- 2016-07 (2)
- 2016-06 (2)
- 2016-05 (3)
- 2016-04 (2)
- 2016-03 (2)
- 2016-01 (3)
- 2015-12 (3)
- 2015-11 (2)
- 2015-10 (3)
- 2015-09 (1)
- 2015-08 (1)
- 2015-07 (4)
- 2015-05 (3)
- 2015-04 (4)
- 2015-02 (2)
- 2015-01 (4)
- 2014-12 (4)
- 2014-11 (2)
- 2014-10 (19)