返回顶部
首页 > 资讯 > 精选 >Kafka中如何将数据导入到Elasticsearch
  • 838
分享到

Kafka中如何将数据导入到Elasticsearch

2023-06-02 18:06:29 838人浏览 八月长安
摘要

本篇文章为大家展示了kafka中如何将数据导入到elasticsearch,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。Elasticsearch作为当前主流的全文检索引擎,除了强大的全文检索能力和

本篇文章为大家展示了kafka中如何将数据导入到elasticsearch,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。

Elasticsearch作为当前主流的全文检索引擎,除了强大的全文检索能力和高扩展性之外,对多种数据源的兼容能力也是其成功的秘诀之一。而Elasticsearch强大的数据源兼容能力,主要来源于其核心组件之一的Logstash, Logstash通过插件的形式实现了对多种数据源的输入和输出。Kafka是一种高吞吐量的分布式发布订阅消息系统,是一种常见的数据源,也是Logstash支持的众多输入输出源的其中一个。将从实践的角度,研究使用Logstash Kafka Input插件实现将Kafka中数据导入到Elasticsearch的过程。

Kafka中如何将数据导入到Elasticsearch

使用Logstash Kafka插件连接Kafka和Elasticsearch

1 Logstash Kafka input插件简介

Logstash Kafka Input插件使用Kafka api从Kafka topic中读取数据信息,使用时需要注意Kafka的版本及对应的插件版本是否一致。该插件支持通过SSL和Kerveros SASL方式连接Kafka。另外该插件提供了group管理,并使用默认的offset管理策略来操作Kafka topic。

Logstash默认情况下会使用一个单独的group来订阅Kafka消息,每个Logstash Kafka Consumer会使用多个线程来增加吞吐量。当然也可以多个Logstash实例使用同一个group_id,来均衡负载。另外建议把Consumer的个数设置为Kafka分区的大小,以提供更好的性能。

2 测试环境准备 2.1 创建Elasticsearch集群

为了简化搭建过程,本文使用了腾讯云Elasticsearch service。腾讯云Elasticsearch service不仅可以实现Elasticsearch集群的快速搭建,还提供了内置Kibana,集群监控,专用主节点,Ik分词插件等功能,极大的简化了Elasticsearch集群的创建和管理工作。

2 创建Kafka服务

Kafka服务的搭建采用腾讯云CKafka来完成。与Elasticsearch Service一样,腾讯云CKafka可以实现Kafka服务的快速创建,100%兼容开源Kafka API(0.9版本)。

3 服务器

除了准备Elasticsearch和Kafka,另外还需要准备一台服务器,用于运行Logstash以连接Elasticsearch和Kafka。本文采用腾讯云CVM服务器

4 注意事项

1) 需要将Elasticsearch、Kafka和服务器创建在同一个网络下,以便实现网络互通。由于本文采用的是腾讯云相关的技术服务,因此只需要将Elasticsearch service,CKafka和CVM创建在同一个私有网路(VPC)下即可。

2) 注意获取Elasticsearch serivce,CKafka和CVM的内网地址和端口,以便后续服务使用

本次测试中:

服务 ip port




Elasticsearch service192.168.0.89200
Ckafka192.168.13.109092
CVM192.168.0.13-

3 使用Logstash连接Elasticsearch和Kafka 3.1 Kafka准备

可以参考[CKafka 使用入门]

按照上面的教程

1) 创建名为kafka_es_test的topic

2) 安装jdk

3) 安装Kafka工具

4) 创建producer和consumer验证kafka功能

2 安装Logstash

Logstash的安装和使用可以参考[一文快速上手Logstash]

3 配置Logstash Kafka input插件

创建kafka_test_pipeline.conf文件内容如下:

input{        kafka{                bootstrap_servers=>"192.168.13.10:9092"                topics=>["kafka_es_test"]                group_id=>"logstash_kafka_test"        }}output{        elasticsearch{                hosts=>["192.168.0.8:9200"]        }}

其中定义了一个kafka的input和一个elasticsearch的output

对于Kafka input插件上述三个参数为必填参数,除此之外还有一些对插件行为进行调整的一些参数如:

auto_commit_interval_ms 用于设置Consumer提交offset给Kafka的时间间隔

consumer_threads 用于设置Consumer的线程数,默认为1,实际中应设置与Kafka Topic分区数一致

fetch_max_wait_ms 用于指定Consumer等待一个fetch请求达到fetch_min_bytes的最长时间

fetch_min_bytes 用于指定Consumer fetch请求应返回的最小数据量

topics_pattern 用于通过正则订阅符合某一规则的一组topic

更多参数参考:[Kafka Input Configuration Options]

4 启动Logstash

以下操作在Logstash根目录中进行

1) 验证配置

./bin/logstash -f kafka_test_pipeline.conf --config.test_and_exit

如有错误,根据提示修改配置文件。若配置正确会得到如下结果

Sending Logstash's logs to /root/logstash-5.6.13/logs which is now configured via log4j2.properties[2018-11-11T15:24:01,598][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/root/logstash-5.6.13/modules/netflow/configuration"}[2018-11-11T15:24:01,603][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/root/logstash-5.6.13/modules/fb_apache/configuration"}Configuration OK[2018-11-11T15:24:01,746][INFO ][logstash.runner          ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash

2) 启动Logstash

./bin/logstash -f kafka_test_pipeline.conf --config.reload.automatic

观察日志是否有错误提示,并及时处理

4 启动Kafka Producer

以下操作在Kafka工具包根目录下进行

./bin/kafka-console-producer.sh --broker-list 192.168.13.10:9092 --topic kafka_es_test

写入测试数据

This is a message

5 Kibana验证结果

登录Elasticsearch对应Kibana, 在Dev Tools中进行如下操作

1) 查看索引

GET _cat/indices

可以看到一个名为logstash-xxx.xx.xx的索引被创建成功

green open .kibana             QUw45tN0SHqeHbF9-QVU6A 1 1 1 0 5.5kb 2.7kbgreen open logstash-2018.11.11 DejRdNJVQ1e1MwbyJjJjLw 5 1 1 0 8.7kb 4.3kb

2) 查看写入的数据

GET logstash-2018.11.11/_search

可以看到数据已经被成功写入

{  "took": 0,  "timed_out": false,  "_shards": {    "total": 5,    "successful": 5,    "skipped": 0,    "failed": 0  },  "hits": {    "total": 1,    "max_score": 1,    "hits": [      {        "_index": "logstash-2018.11.11",        "_type": "logs",        "_id": "AWcBsEegMu-Dkjm1ap3H",        "_score": 1,        "_source": {          "message": "This is a message",          "@version": "1",          "@timestamp": "2018-11-11T07:33:09.079Z"        }      }    ]  }}

Logstash作为Elastic Stack中数据采集和处理的核心组件,为Elasticsearch提供了强大的数据源兼容能力。从测试过程可以看出,使用Logstash实现kafka和Elaticsearch的连接过程相当简单方便。另外Logstash的数据处理功能,也使得采用该架构的系统对数据映射和处理有天然的优势。

上述内容就是Kafka中如何将数据导入到Elasticsearch,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注编程网精选频道。

--结束END--

本文标题: Kafka中如何将数据导入到Elasticsearch

本文链接: https://lsjlt.com/news/230990.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • Kafka中如何将数据导入到Elasticsearch
    本篇文章为大家展示了Kafka中如何将数据导入到Elasticsearch,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。Elasticsearch作为当前主流的全文检索引擎,除了强大的全文检索能力和...
    99+
    2023-06-02
  • 如何将kafka中的数据快速导入Hadoop
    如何将kafka中的数据快速导入Hadoop,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。Kafka是一个分布式发布—订阅系统,由于其强大的分布式和性能特性,迅...
    99+
    2023-06-02
  • elasticsearch数据如何导入导出
    Elasticsearch 中数据的导入导出可以通过以下几种方式实现: 使用 Elasticsearch 提供的 API: 使...
    99+
    2024-04-02
  • 如何将sqlite3中数据导入到mysql中
    小编给大家分享一下如何将sqlite3中数据导入到mysql中,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!方法如下:从sqli...
    99+
    2024-04-02
  • 如何从mysql中将数据导入到oracle数据库中
    这篇文章主要讲解了“如何从mysql中将数据导入到oracle数据库中”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“如何从mysql中将数据导入到oracl...
    99+
    2024-04-02
  • 如何将matlab数据导入到Python中使用
    相信不少小伙伴都遇到过和我一样的问题,就是在尝试使用scipy.io.loadmat将matlab类型的数据导入python中的时候遇到如下错误提示。 import scipy as...
    99+
    2022-12-15
    matlab数据导入到Python中使用 matlab导入Python
  • 如何将 .sql 文件导入到 MySQL 数据库?
    导入 SQL 文件到 MySQL 数据库是一个常见的任务,本文将介绍如何执行该操作。我们将逐步讲解具体的步骤,帮助您完成这个过程。 步骤 1: 打开命令提示符或终端 首先,在您的计算机上打开命令提示符或终端窗口。这将是执行导入操作的地方。 ...
    99+
    2023-09-18
    数据库 mysql sql
  • MapReduce将文本数据导入到HBase中
    整体描述:将本地文件的数据整理之后导入到hbase中在HBase中创建表数据格式MapReduce程序map程序package com.hadoop.mapreduce.test.map; im...
    99+
    2024-04-02
  • 怎么将oracle数据导入到mysql中
    本篇文章为大家展示了怎么将oracle数据导入到mysql中,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。    1、在PL/SQL中用sele...
    99+
    2024-04-02
  • 使用Logstash和JDBC将MySQL的数据导入到Elasticsearch(ES)的过程
    使用Logstash和JDBC将MySQL的数据导入到Elasticsearch(ES)的过程包含多个步骤。请注意,首先你需要准备好的JDBC驱动,Logstash实例,Elasticsearch实例...
    99+
    2023-09-11
    mysql elasticsearch jenkins
  • Python将数据库数据导入到EXCEL
          每次给运营导数据的时候,如果不用工具的话,就是直接生成.csv格式的文件,这样的文件不支持'sheet',每次还有手工进行,相当的不科学,今天试试Python生成excel文件。        写的糙了点,但是这是一个很好的开始...
    99+
    2023-01-31
    导入到 数据库 数据
  • 如何将RRD数据库中数据导入MYSQL中
    本篇文章为大家展示了如何将RRD数据库中数据导入MYSQL中,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。将RRD数据库中数据导入MYSQL中一、RRD数据库及RR...
    99+
    2024-04-02
  • 如何使用python批量导入数据进Elasticsearch中
    本文小编为大家详细介绍“如何使用python批量导入数据进Elasticsearch中”,内容详细,步骤清晰,细节处理妥当,希望这篇“如何使用python批量导入数据进Elasticsearch中”文章能帮助大家解决疑惑,下面跟着小编的思路...
    99+
    2023-06-05
  • Impala中如何导入数据到表中
    要在Impala中导入数据到表中,可以使用IMPALA LOAD DATA语句。以下是一个示例: LOAD DATA INPATH ...
    99+
    2024-03-07
    Impala
  • 如何将数据库导入navicat
    小编给大家分享一下如何将数据库导入navicat,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!在开发时,数据库的设计与程序的设计是分开进行的。你在开发时经常会需要导入别的给你的数据库到你的...
    99+
    2024-04-02
  • Sqoop怎么将MySQL数据导入到hive中
    这篇文章主要讲解了“Sqoop怎么将MySQL数据导入到hive中”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Sqoop怎么将MySQL数据导入到hive中”吧!MySQL表:mysql&...
    99+
    2023-06-02
  • 将excel中数据导入到指定的数据库表中
    1、先在数据库中查看表的结构:2、根据表的结构整理excel表结构及数据:3、右击数据库-任务-导入数据:4、选择源数据:5、选择目标:6、选择复制数据模式:7、选择目标数据表:8、点击下一步,然后执行。 ...
    99+
    2024-04-02
  • 将excel文件导入到数据库
    参考:http://blog.csdn.net/jayxujia123/article/details/13684313 参考:http://kevin850115.iteye.com/blog/57814...
    99+
    2024-04-02
  • 怎么将Mysql数据导入到MongoDB
    今天就跟大家聊聊有关怎么将Mysql数据导入到MongoDB,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。(1)从mysql导出数据位csv文件,如...
    99+
    2024-04-02
  • PHP如何将MySQL中数据导入表单
    要将MySQL中的数据导入到表单中,可以使用PHP和MySQL的组合来实现。以下是一个简单的例子: 首先,创建一个连接到MySQL...
    99+
    2024-04-09
    PHP MySQL
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作