返回顶部
首页 > 资讯 > 数据库 >DataStream API(一)
  • 642
分享到

DataStream API(一)

DataStreamAPI(一) 2018-10-20 16:10:50 642人浏览 猪猪侠
摘要

DataStream api(一) 在了解DataStream API之前我们先来了解一下flink API的构成。Flink API是分层的。由最底层的Stateful Stream Process到最顶层的sql分为四层。如下

DataStream API(一)

DataStream api(一)

在了解DataStream API之前我们先来了解一下flink API的构成。Flink API是分层的。由最底层的Stateful Stream Process到最顶层的sql分为四层。如下图:

API-Level

DataStream API 顾名思义,就是DataStream类的API,DataStream表示Flink程序中的流式数据集合。它是一个包含重复项的不可变数据集合,这些数据可以是有界的也可以是无界的,处理他们的API是相同的。

DataStream是不可变的,这意味着一旦它们被创建,就不能添加或删除元素。也不能简单地检查内部的元素,而只能使用DataStream API(TransfORM)来处理它们。

Flink程序基本部分组成:

  1. 获得执行环境(Environment),
  2. 加载/创建初始数据(Source),
  3. 指定此数据的转换(Transform),
  4. 指定计算结果的存放位置(Sink),
  5. 触发程序执行(Execut)

下面我们一起来了解一下Flink DataStream的执行环境。

Environment

Flink的执行环境包括两种,分别是StreamExecutionEnvironment和ExecutionEnvironment,他们分别对应StreamData和DataSet。StreamData是流式数据集,DataSet是批量数据集。

StreamExecutionEnvironment是所有Flink流式处理程序的基础。它为我们提供了三种实例化的方法,分别是:

getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(String host, int port, String... jarFiles)

createLocalEnvironment()

这个方法是获取本地的执行环境。它有两个重载,分别是:

//parallelism表示并行度
createLocalEnvironment(int parallelism)
createLocalEnvironment(int parallelism, Configuration configuration)

createRemoteEnvironment(String host, int port, String... jarFiles)

这个方法是获取集群的执行环境。与createLocalEnvironment()类似,它也有两个重载,分别是:

createRemoteEnvironment(String host, int port, int parallelism, String... jarFiles)
createRemoteEnvironment(String host, int port, Configuration clientConfig, String... jarFiles)

getExecutionEnvironment()

getExecutionEnvironment()可以自行判断我们当前程序的执行环境并为我们返回与之相对应的实例。换句话说,通常情况下我们不需要自己判断到底是使用createLocalEnvironment还是使用createRemoteEnvironment,一律用getExecutionEnvironment就OK了。

Environment实例

通过Environment实例我们可以做很多事情,比如:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
env.setParallelism(1);
//获取数据源
//当然获取数据源的方式有很多种,下面会一一介绍
env.readTextFile("FilePath");
//执行程序
env.Execute();
//...

Data Sources

Data Source的核心组件包括三个,分别是Split、SourceReader、SplitEnumerator。

  • Split:Split是数据源的一部分,例如:文件的一部分或者一条日志。Splits是Source并行读取数据和分发作业的基础。
  • SourceReader:SourceReader向SplitEnumerator请求Splits并处理请求到的Splits。SourceReader位于TaskManager中,这意味着它是并行运行的,同时,它可以产生并行的事件流/记录流。
  • SplitEnumerator:SplitEnumerator负责管理Splits并且将他们发送给SourceReader。它是运行在JobManager上的单个实例。负责维护正在进行的分片的备份日志并将这些分片均衡的分发到SourceReader中。

具体的过程可以参考以下图片:

Source

下面我们来介绍一下几个常用的获取数据源的方式。

从集合中读取

ArrayList strList = new ArrayList();
strList.add("are");
strList.add("you");
strList.add("ok");
env.fromCollection(strList);

从文件中读取

DataStreamSource inputData = env.readTextFile("FilePath");

消费kafka中的数据

引入kafka依赖


    org.apache.flink
    flink-connector-kafka_2.11
    1.11.0

代码示例

//kafka配置
Properties pro = new Properties();
pro.setProperty("bootstrap.servers", "localhost:9092");
pro.setProperty("group.id", "consumer-group");
pro.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
pro.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
pro.setProperty("auto.offset.reset", "latest");
//消费kafka数据
DataStreamSource inputStream = env.addSource(new FlinkKafkaConsumer("topic", new SimpleStringSchema(), pro));

当然,出了kafka之外Flink还为我们提供了elasticsearchhdfsRabbitMQ、JDBC等作为数据源的接口。此处就不一一介绍了。

自定义数据源

其实深入研究之后我们会发现,FlinkKafkaConsumer其实是实现了一个SourceFunction接口。so,我们可以通过实现SourceFunction的方式来自定义我们自己的数据源。

有了这个功能我们可以很轻松的模拟真实的业务场景。毕竟,绝大多数的项目开发阶段并不会有真实的业务场景来提供数据源。

DataStreamSource inputStream = env.addSource(new MyDataSource());

private static class MyDataSource implements SourceFunction {
    private boolean running = true;

    public void run(SourceContext sourceContext) throws Exception {
        while(running){
            //读取数据,可以从cvs文件...自定义数据源读取数据
            Thread.sleep(100);
        }
    }
    public void cancel() {
        running = false;
    }
}
您可能感兴趣的文档:

--结束END--

本文标题: DataStream API(一)

本文链接: https://lsjlt.com/news/7259.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • DataStream API(一)
    DataStream API(一) 在了解DataStream API之前我们先来了解一下Flink API的构成。Flink API是分层的。由最底层的Stateful Stream Process到最顶层的SQL分为四层。如下...
    99+
    2018-10-20
    DataStream API(一)
  • Flink DataStream API
    1.  API基本概念 Flink程序可以对分布式集合进行转换(例如: filtering, mapping, updating state, joining, grouping, defining windows, aggreg...
    99+
    2019-07-15
    Flink DataStream API
  • 如何掌握Table与DataStream之间的互转
    本篇内容介绍了“如何掌握Table与DataStream之间的互转”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有...
    99+
    2024-04-02
  • Zookeeper的Java API操作(一)
    环境搭建 创建一个普通的Maven项目 导入log4j.properties日志文件到项目的根目录或者resource文件下。 在pom.xml中添加Zookeeper的相关依赖: org.apache.zookeeper...
    99+
    2021-02-15
    Zookeeper的Java API操作(一)
  • 一些斗鱼TV Web API [Some
      去年TI5前开发了dotaonly.com,网站需要用到各个直播平台API。不像国外网站Twitch那样开放,都有现成的API可用,国内网站都很封闭,对开发者不太友好。本文所涉及API皆抓取自斗鱼IOS手机客户端。 获取当前全部...
    99+
    2023-01-31
    TV Web API
  • 一文详解React组件API
    目录setState参数说明注意事项使用replaceState参数说明setProps参数说明replaceProps参数说明forceUpdate参数说明findDOMNodei...
    99+
    2023-05-17
    React组件API React组件 React API
  • java中如何写出一个api
    要编写一个Java API,你需要按照以下步骤进行操作:1. 设计API:首先,你需要明确定义你的API的功能和目标。确定你的API...
    99+
    2023-10-18
    java
  • python编程之API入门: (一)使
    在网络编程中,我们会和API打交道。那么,什么是API如何使用API呢?本文分享了一下我对API的理解以及百度地图API的使用。 API是"Application Programming Interface(应用程序编程接口)"的缩写。如...
    99+
    2023-01-31
    入门 python API
  • Python C API的使用详解(一)
    简介 介绍一下Python虚拟机的初始化及退出,Python基本数据类型的对象创建以及C和Python之间的数据类型互相转换。 Python虚拟机的初始化及退出 初始化Python虚拟机需要调用Py_Initialize()来实现。 Py_...
    99+
    2023-01-31
    详解 Python API
  • Wordpress教程(一) Rest Api简单入门
    Wordpress是当今最强大的博客+cms系统,最近在用wordpress给客户搭建一款小程序,涉及到Rest Api,于是有了本篇,本篇根据官方文档和个人的理解编写,如有错误或者疏漏,请同学指...
    99+
    2023-09-04
    php
  • ASP和Linux的API之争:对象API是哪一方的必备武器?
    随着人工智能技术的快速发展,云计算、物联网等技术的普及,企业对于高效、稳定的应用程序接口(API)的需求也越来越高。而在ASP和Linux两大操作系统中,对于API的选择也成为了一个热门话题。ASP采用的是COM对象API,而Linux则...
    99+
    2023-09-07
    linux 对象 api
  • ASP Web API 与其他 Web API 框架对比:哪一个更适合你?
    ASP Web API 是一个使用 C# 编写的开源 Web API 框架,它专为构建 RESTful 服务而设计,ASP Web API 是一个轻量级框架,易于使用和学习,它支持多种数据格式,包括 JSON、XML 和 HTML,ASP...
    99+
    2024-02-24
    ASP.NET Web API, Flask, Django REST framework, Node.js Express
  • Flask如何搭建一个API服务器
    小编给大家分享一下Flask如何搭建一个API服务器,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!一、API列表 1. 获取作品列表① 获取手工制作作品列表。请求格式:http://api.mculover666...
    99+
    2023-06-15
  • 用Python写一个简单的api接口
    python框架有很多,例如:Flask,Django,FastAPI 等。本文将使用 Flask 来编写 API 接口。 安装Flask 首先,您需要安装 Flask: pip install fl...
    99+
    2023-10-08
    python 后端 flask
  • 怎么用Windows api添加一个图片
    要使用Windows API在窗口中添加图片,你可以按照以下步骤进行操作:1. 首先,确保你已经包含了 `windows.h` 头文...
    99+
    2023-10-09
    Windows
  • 手写PHP API框架(一)之PSR规范
    本篇文章给大家带来了关于php的相关知识,其中主要跟大家聊一聊编码规范,也建议大家在开发中尽量遵循规范,感兴趣的朋友下面一起来看一下吧,希望对大家有帮助。创作初衷有没有用烦了CURD?各种框架是不是有点头大?有没有尝试自己设计一个框架?学了...
    99+
    2023-05-14
    php 编码规范
  • 一篇文章看懂Vue组合式API
    目录一. 为什么要使用Composition API1.1.一个Options API实例1.2.Options API存在的问题1.3.Composition API简介二.Com...
    99+
    2023-05-14
    vue 组合式api的意义 vue3组合式api教程 vue2使用组合式api
  • 一文学会使用Remix写API接口
    目录本文提要接口种类RESTful API 特点Remix 中如何处理 api 特点Loader 函数处理 Get 请求action 处理非 GET 方法添加控制层和服务层暴露 lo...
    99+
    2023-05-15
    Remix API接口 Remix API
  • 一文带你了解ChatGPT API的使用
    目录1.概述2.内容2.1 ChatGPT优点2.2 ChatGPT的应用场景2.3 ChatGPT的发展前景3.API应用4.API代码实现4.1 Python...
    99+
    2023-02-27
    ChatGPT API使用 ChatGPT API
  • docker remote api一键TLS加密的实现
    目录1、修改docker的2375端口为另外的端口,这只是权宜之计。2、给docker进行tls加密最近公司服务器被挖矿了,最后原因定位到docker的2375端口。 让我们来理一下...
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作