DataStream api(一) 在了解DataStream API之前我们先来了解一下flink API的构成。Flink API是分层的。由最底层的Stateful Stream Process到最顶层的sql分为四层。如下
在了解DataStream API之前我们先来了解一下flink API的构成。Flink API是分层的。由最底层的Stateful Stream Process到最顶层的sql分为四层。如下图:
DataStream API 顾名思义,就是DataStream类的API,DataStream表示Flink程序中的流式数据集合。它是一个包含重复项的不可变数据集合,这些数据可以是有界的也可以是无界的,处理他们的API是相同的。
DataStream是不可变的,这意味着一旦它们被创建,就不能添加或删除元素。也不能简单地检查内部的元素,而只能使用DataStream API(TransfORM)来处理它们。
Flink程序基本部分组成:
下面我们一起来了解一下Flink DataStream的执行环境。
Flink的执行环境包括两种,分别是StreamExecutionEnvironment和ExecutionEnvironment,他们分别对应StreamData和DataSet。StreamData是流式数据集,DataSet是批量数据集。
StreamExecutionEnvironment是所有Flink流式处理程序的基础。它为我们提供了三种实例化的方法,分别是:
getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(String host, int port, String... jarFiles)
这个方法是获取本地的执行环境。它有两个重载,分别是:
//parallelism表示并行度
createLocalEnvironment(int parallelism)
createLocalEnvironment(int parallelism, Configuration configuration)
这个方法是获取集群的执行环境。与createLocalEnvironment()类似,它也有两个重载,分别是:
createRemoteEnvironment(String host, int port, int parallelism, String... jarFiles)
createRemoteEnvironment(String host, int port, Configuration clientConfig, String... jarFiles)
getExecutionEnvironment()可以自行判断我们当前程序的执行环境并为我们返回与之相对应的实例。换句话说,通常情况下我们不需要自己判断到底是使用createLocalEnvironment还是使用createRemoteEnvironment,一律用getExecutionEnvironment就OK了。
通过Environment实例我们可以做很多事情,比如:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
env.setParallelism(1);
//获取数据源
//当然获取数据源的方式有很多种,下面会一一介绍
env.readTextFile("FilePath");
//执行程序
env.Execute();
//...
Data Source的核心组件包括三个,分别是Split、SourceReader、SplitEnumerator。
具体的过程可以参考以下图片:
下面我们来介绍一下几个常用的获取数据源的方式。
ArrayList strList = new ArrayList();
strList.add("are");
strList.add("you");
strList.add("ok");
env.fromCollection(strList);
DataStreamSource inputData = env.readTextFile("FilePath");
引入kafka依赖
org.apache.flink
flink-connector-kafka_2.11
1.11.0
代码示例
//kafka配置
Properties pro = new Properties();
pro.setProperty("bootstrap.servers", "localhost:9092");
pro.setProperty("group.id", "consumer-group");
pro.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
pro.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
pro.setProperty("auto.offset.reset", "latest");
//消费kafka数据
DataStreamSource inputStream = env.addSource(new FlinkKafkaConsumer("topic", new SimpleStringSchema(), pro));
当然,出了kafka之外Flink还为我们提供了elasticsearch、hdfs、RabbitMQ、JDBC等作为数据源的接口。此处就不一一介绍了。
其实深入研究之后我们会发现,FlinkKafkaConsumer其实是实现了一个SourceFunction接口。so,我们可以通过实现SourceFunction的方式来自定义我们自己的数据源。
有了这个功能我们可以很轻松的模拟真实的业务场景。毕竟,绝大多数的项目在开发阶段并不会有真实的业务场景来提供数据源。
DataStreamSource inputStream = env.addSource(new MyDataSource());
private static class MyDataSource implements SourceFunction {
private boolean running = true;
public void run(SourceContext sourceContext) throws Exception {
while(running){
//读取数据,可以从cvs文件...自定义数据源读取数据
Thread.sleep(100);
}
}
public void cancel() {
running = false;
}
}
--结束END--
本文标题: DataStream API(一)
本文链接: https://lsjlt.com/news/7259.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-10-23
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0