返回顶部
首页 > 资讯 > 数据库 >「Flink」事件时间与水印
  • 220
分享到

「Flink」事件时间与水印

「Flink」事件时间与水印 2019-07-13 00:07:50 220人浏览 猪猪侠
摘要

我们先来以滚动时间窗口为例,来看一下窗口的几个时间参数与flink流处理系统时间特性的关系。获取窗口开始时间Flink源代码获取窗口的开始时间为以下代码:org.apache.flink.streaming.api.windowi

「Flink」事件时间与水印

我们先来以滚动时间窗口为例,来看一下窗口的几个时间参数与flink流处理系统时间特性的关系。

获取窗口开始时间Flink源代码

获取窗口的开始时间为以下代码:

org.apache.flink.streaming.api.windowing.windows.TimeWindow

public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
        return timestamp - (timestamp - offset + windowSize) % windowSize;
    }

这一段代码,我们可以认为Flink并不是把时间戳直接作为窗口的开始时间,而是做了一些“对齐”操作,确保时间能够整除8。

不同时间类型的窗口时间计算

1、当TimeCharacteristic为ProcessingTime时

窗口的开始时间:与窗口接收到的第一条消息的处理时间有关。例如:window operator是2020-02-06 22:02:33接收到的第一条消息,那么窗口的开始时间就是2020-02-06 22:02:33。

窗口的结束时间:一旦窗口的开始时间确定了,因为窗口的长度是固定的。那么窗口的结束时间就确定下来了,例如:假设这里的时间窗口是3秒,那么窗口的结束时间就是2020-02-06 22:02:36。

窗口的触发计算时间:假设有一条新的消息到达window operator,此时如果对应operator的系统时间,大于结束时间,就会触发计算。

一旦窗口的开始时间确定了,那么后续窗口的开始时间,也就都确定下来了。

问题:

假设某个时间窗口,2020-2-6 22:12:20 - 2020-2-6 22:12:23,之间没有任何一条数据进来。Flink会如何处理?

Flink会直接抛弃掉这个时间窗口,新来的事件消息会到其他的时间窗口中计算。


2、当TimeCharacteristic为IngestionTime时

窗口的开始时间:与source operator接收到的第一条消息有关。例如:source接收到这条消息的时间是2020-2-6 22:14:50,那么窗口的开始时间就是2020-2-6 22:14:50

窗口的结束时间:与ProcessTime一致

窗口的触发计算时间:假设有一条新的消息到达source operator,那么此时的时间如果大于结束时间,就会触发计算。


除了窗口的开始时间、触发时间都是与source operator算子有关,其他与Processing Time是类似的。


3、但TimeCharacteristic为EventTime时

窗口的开始时间:与window operator接收到的第一条消息的事件时间有关,例如:如果这条消息的水印时间是2020-2-6 22:17:50,那么窗口的的开始时间就是2020-2-6 22:17:50

窗口的结束时间:与ProcessTime一致

窗口的触发计算时间:假设有一条新的消息到达window operator,如果该事件的水印时间大于窗口的结束时间,就会触发计算。

通常,我们会让水印时间比事件时间允许延迟几秒钟。这样,如果是因为网络延迟消息晚到了几秒,也不会影响到统计结果了。

public class WordCountWindow {
    public static void main(String[] args) throws Exception {
        // 1. 初始化流式运行环境
        Configuration conf = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWEBUI(conf);

        // 2. 设置时间处理类型,这里设置的方式处理时间
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 3. 定义数据源,每秒发送一个hadoop单词
        SingleOutputStreamOperator> wordDSWithWaterMark = env.addSource(new RichSourceFunction>() {

            private boolean isCanaled = false;
            private int TOTAL_NUM = 20;

            @Override
            public void run(SourceContext> ctx) throws Exception {
                while (!isCanaled) {
                    ctx.collect(Tuple2.of("hadooop", System.currentTimeMillis()));

                    // 打印窗口开始、结束时间
                    SimpleDateFORMat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                    System.out.println("事件发送时间:" + sdf.format(System.currentTimeMillis()));
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {
                isCanaled = true;
            }
        }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor>(Time.seconds(5)) {
            @Override
            public long extractTimestamp(Tuple2 element) {
                return element.f1;
            }
        });

        // 4. 每5秒进行一次,分组统计
        // 4.1 转换为元组
        wordDSWithWaterMark.map(word -> {
            return Tuple2.of(word.f0, 1);

        })
                // 指定返回类型
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                // 按照单词进行分组
                .keyBy(t -> t.f0)
                // 滚动窗口,3秒计算一次
                .timeWindow(Time.seconds(3))
                .reduce(new ReduceFunction>() {
                    @Override
                    public Tuple2 reduce(Tuple2 value1, Tuple2 value2) throws Exception {
                        return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                    }
                }, new RichWindowFunction, Tuple2, String, TimeWindow>() {
                    @Override
                    public void apply(String word, TimeWindow window, Iterable> input, Collector> out) throws Exception {

                        // 打印窗口开始、结束时间
                        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        System.out.println("窗口开始时间:" + sdf.format(window.getStart())
                                + " 窗口结束时间:" + sdf.format(window.getEnd())
                                + " 窗口计算时间:" + sdf.format(System.currentTimeMillis()));

                        int sum = 0;
                        Iterator> iterator = input.iterator();
                        while(iterator.hasNext()) {
                            Integer count = iterator.next().f1;
                            sum += count;
                        }
                        out.collect(Tuple2.of(word, sum));
                    }
                }).print();

        env.execute("app");
    }
}

输出结果如下:

事件发送时间:2020-02-06 22:35:08
事件发送时间:2020-02-06 22:35:09
事件发送时间:2020-02-06 22:35:10
事件发送时间:2020-02-06 22:35:11
事件发送时间:2020-02-06 22:35:12
事件发送时间:2020-02-06 22:35:13
事件发送时间:2020-02-06 22:35:14
窗口开始时间:2020-02-06 22:35:06 窗口结束时间:2020-02-06 22:35:09 窗口计算时间:2020-02-06 22:35:14
4> (hadooop,1)

事件发送时间:2020-02-06 22:35:15
事件发送时间:2020-02-06 22:35:16
事件发送时间:2020-02-06 22:35:17
窗口开始时间:2020-02-06 22:35:09 窗口结束时间:2020-02-06 22:35:12 窗口计算时间:2020-02-06 22:35:17
4> (hadooop,3)


参考文件:

https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/event_time.html

您可能感兴趣的文档:

--结束END--

本文标题: 「Flink」事件时间与水印

本文链接: https://lsjlt.com/news/4102.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • 「Flink」事件时间与水印
    我们先来以滚动时间窗口为例,来看一下窗口的几个时间参数与Flink流处理系统时间特性的关系。获取窗口开始时间Flink源代码获取窗口的开始时间为以下代码:org.apache.flink.streaming.api.windowi...
    99+
    2019-07-13
    「Flink」事件时间与水印
  • Flink中怎么处理事件时间
    在Flink中处理事件时间有两种方式:Event Time和Processing Time。 Event Time:事件时间是数据...
    99+
    2024-04-02
  • Android实现拍照添加时间水印
    本文实例为大家分享了Android实现拍照添加时间水印的具体代码,供大家参考,具体内容如下 效果如下图 : 1、拍照 // 非空判断 拍照  if (mCamera0 != nul...
    99+
    2024-04-02
  • Android视频处理之动态时间水印效果
    最近的项目中遇到一个非常头痛的需求,在Android端录制视频的时候动态添加像监控画面一样的精确到秒的时间信息,关键是,并不是说只在播放器的界面显示时间就可以了,而是录制到视频...
    99+
    2022-06-06
    动态 Android
  • redis文件事件和时间事件是什么
    今天小编给大家分享一下redis文件事件和时间事件是什么的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来...
    99+
    2024-04-02
  • 什么是JavaScript时间事件
    本篇内容主要讲解“什么是JavaScript时间事件”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“什么是JavaScript时间事件”吧!一、前言setTime...
    99+
    2024-04-02
  • JavaScript中如何实现事件冒泡与时间捕获
    小编给大家分享一下JavaScript中如何实现事件冒泡与时间捕获,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!一、事件冒泡与事件捕获当我们在Web页面单击某一个元素的时候,比如某个p元素...
    99+
    2024-04-02
  • python获取文件修改时间与创建时间
    转载自:  http://blog.csdn.net/liyuan_669/article/details/25347037 import os import time  y = time.ctie(os.stat("D:\\test\...
    99+
    2023-01-31
    时间 文件 python
  • Redis 事务与过期时间详细介绍
    Redis 事务与过期时间详细介绍 一、Redis事务: Redis中支持事务,事务即为当我们需要执行几条命令时,要么这几条命令都不执行,要么都执行: 1、开始事务写入: multi 2、然后...
    99+
    2022-06-04
    详细介绍 事务 时间
  • 阿里云服务器泡水事件回顾与解决方案
    阿里云服务器是一种高可用性的云计算产品,提供稳定可靠的计算能力。然而,近日有一份关于阿里云服务器泡水的报告,引起了人们的关注。这篇文章将详细回顾该事件,分析其原因,并提出解决方案。 事件回顾:近日,一份关于阿里云服务器泡水的报告在网络上流传...
    99+
    2023-11-02
    阿里 解决方案 事件
  • mybatis 插件: 打印 sql 及其执行时间实现方法
    Plugins摘一段来自MyBatis官方文档的文字。MyBatis允许你在某一点拦截已映射语句执行的调用。默认情况下,MyBatis允许使用插件来拦截方法调用:Executor(update、query、flushStateme...
    99+
    2023-05-31
    mybatis 打印 执行sql
  • Beam中Watermark是怎么处理事件时间的
    在Beam中,Watermark是用来衡量事件时间进展的一种机制,它通常会被用来处理延迟数据,保证数据的完整性和正确性。Waterm...
    99+
    2024-03-15
    Beam Watermark
  • MySQLFlinkWatermark实现事件时间处理的关键技术
    目录1.概述2.SQL案例-演示Watermark为零的情况3.SQL案例-演示Watermark不为零的情况1.概述 生活中有种场景: 车辆进入隧道,信号不好,出了隧道后,信号就正...
    99+
    2023-05-19
    MySQL Flink Watermark事件时间处理 Flink Watermark
  • Beam中事件时间处理是怎么工作的
    Beam中的事件时间处理是通过Watermark和Timestamp来实现的。Watermark是用来表示事件时间进度的指示器,它表...
    99+
    2024-04-02
  • web分布式系统时间、时钟和事件顺序是什么
    这篇文章主要介绍“web分布式系统时间、时钟和事件顺序是什么”,在日常操作中,相信很多人在web分布式系统时间、时钟和事件顺序是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”web分布式系统时间、时钟和事...
    99+
    2023-06-04
  • Linux下与文件相关的时间有哪些
    这篇文章将为大家详细讲解有关Linux下与文件相关的时间有哪些,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。与文件相关的几个时间:   1、访问时间,读一次这个文件的内容,这个时间就会更新。比如对这个文件...
    99+
    2023-06-13
  • web开发中测试是不是一件浪费时间的事
    本篇文章为大家展示了测试是一件浪费时间的事,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。让我们详细地说明作为开发人员,我们都知道我们应...
    99+
    2024-04-02
  • Android中日期与时间设置控件用法实例
    本文实例讲述了Android中日期与时间设置控件用法。分享给大家供大家参考。具体如下: 1、日期设置控件:DatePickerDialog 2、时间设置控件:TimePicke...
    99+
    2022-06-06
    Android
  • 我们如何为不同类型的 MySQL 事件制定时间表?
    基本上,我们需要为两种事件指定时间表:一次性事件一次性事件意味着它将在特定时间表上仅执行一次。如果我们想创建一个一次性事件,那么我们需要在 ON SCHEDULE 子句之后放置以下语法:AT Timestamp[+INTERVAL]重复事件...
    99+
    2023-10-22
  • linux下获取文件的创建时间与实战教程
    背景 有时候我们需要获取文件的创建时间。 例如: 我在研究 《xtrabackup 原理图》的时候,想通过观察确认 xtrabackup_log 是最早创建 并且是 最晚保存的文件。我们就需要知道 xtrabacku...
    99+
    2022-06-04
    linux如何查看文件的创建时间 linux查询文件创建时间 linux 文件创建时间
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作