返回顶部
首页 > 资讯 > 精选 >如何进行storm1.1.3与kafka1.0.0整合
  • 564
分享到

如何进行storm1.1.3与kafka1.0.0整合

2023-06-02 20:06:38 564人浏览 薄情痞子
摘要

本篇文章给大家分享的是有关如何进行stORM1.1.3与kafka1.0.0整合,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。package hgs.core.sk;

本篇文章给大家分享的是有关如何进行stORM1.1.3与kafka1.0.0整合,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。

package hgs.core.sk;import java.util.Map;import org.apache.storm.Config;import org.apache.storm.LocalCluster;import org.apache.storm.StormSubmitter;import org.apache.storm.kafka.BrokerHosts;import org.apache.storm.kafka.KafkaSpout;import org.apache.storm.kafka.SpoutConfig;import org.apache.storm.kafka.ZkHosts;import org.apache.storm.task.OutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.TopologyBuilder;import org.apache.storm.topology.base.BaseRichBolt;import org.apache.storm.tuple.Tuple;@SuppressWarnings("deprecation")public class StormKafkaMainTest {public static void main(String[] args) {TopologyBuilder builder = new TopologyBuilder();//ZooKeeper链接地址BrokerHosts hosts = new ZkHosts("bigdata01:2181,bigdata02:2181,bigdata03:2181");//KafkaSpout需要一个config,参数代表的意义1:zookeeper链接,2:消费kafka的topic,3,4:记录消费offset的zookeeper地址 ,这里会保存在 zookeeper//集群的/test7/consume下面SpoutConfig sconfig = new SpoutConfig(hosts, "test7", "/test7", "consume");//消费的时候忽略offset从头开始消费,这里可以注释掉,因为消费的offset在zookeeper中可以找到sconfig.ignoreZkOffsets=true;//sconfig.scheme = new SchemeAsMultiScheme( new StringScheme() );builder.setSpout("kafkaspout", new KafkaSpout(sconfig), 1);builder.setBolt("mybolt1", new MyboltO(), 1).shuffleGrouping("kafkaspout");     Config config = new Config();     config.setNumWorkers(1);     try {StormSubmitter.submitTopology("storm----kafka--test", config, builder.createTopology());} catch (Exception e) {e.printStackTrace();}      }}class  MyboltO extends  BaseRichBolt{private static final long serialVersionUID = 1L;OutputCollector collector = null;public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;}public void execute(Tuple input) {//这里把消息大一出来,在对应的woker下面的日志可以找到打印的内容//因为得到的内容是byte数组,所以需要转换String out = new String((byte[])input.getValue(0));System.out.println(out);collector.ack(input);}public void declareOutputFields(OutputFieldsDeclarer declarer) {}}
pom.xml文件的依赖
<project xmlns="Http://Maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  <modelVersion>4.0.0</modelVersion>  <groupId>hgs</groupId>  <artifactId>core.sk</artifactId>  <version>1.0.0-SNAPSHOT</version>  <packaging>jar</packaging>  <name>core.sk</name>  <url>http://maven.apache.org</url>  <properties>    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>  </properties>  <dependencies>    <dependency>      <groupId>junit</groupId>      <artifactId>junit</artifactId>      <version>3.8.1</version>      <scope>test</scope>    </dependency>        <dependency>    <groupId>org.apache.storm</groupId>    <artifactId>storm-kafka</artifactId>    <version>1.1.3</version></dependency><dependency>  <groupId>org.apache.storm</groupId>  <artifactId>storm-core</artifactId>  <version>1.1.3</version>  <scope>provided</scope></dependency><dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka_2.11</artifactId>    <version>1.0.0</version>    <exclusions>    <exclusion>          <groupId>org.slf4j</groupId>          <artifactId>slf4j-log4j12</artifactId>        </exclusion>        <exclusion>            <groupId>org.apache.zookeeper</groupId>            <artifactId>zookeeper</artifactId>       </exclusion>    </exclusions></dependency><!-- <dependency>    <groupId>org.apache.storm</groupId>    <artifactId>storm-kafka-monitor</artifactId>    <version>1.2.2</version></dependency> --><!-- <dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka-clients</artifactId>    <version>0.8.2.1</version></dependency> --><dependency>    <groupId>org.clojure</groupId>    <artifactId>clojure</artifactId>    <version>1.7.0</version></dependency><!-- 尝试了很多次 都会有这个错误:java.lang.NullPointerException at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.getOffsetLags(KafkaOffsetLagUtil.java:272)最后修改为kafka相应的kafka-clients版本后问题得到解决,应该是该出的问题--><dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka-clients</artifactId>    <version>1.0.0</version></dependency> </dependencies>        <build>        <plugins>            <plugin>                <artifactId>maven-assembly-plugin</artifactId>                <version>2.2</version>                <configuration>                    <arcHive>                        <manifest>                            <!-- 我运行这个jar所运行的主类 -->                            <mainClass>hgs.core.sk.StormKafkaMainTest</mainClass>                        </manifest>                    </archive>                    <descriptorRefs>                        <descriptorRef>                            <!-- 必须是这样写 -->                            jar-with-dependencies                        </descriptorRef>                    </descriptorRefs>                </configuration>                                <executions>                    <execution>                        <id>make-assembly</id>                        <phase>package</phase>                        <Goals>                            <goal>single</goal>                        </goals>                    </execution>                </executions>            </plugin>                         <plugin>                <groupId>org.apache.maven.plugins</groupId>                <artifactId>maven-compiler-plugin</artifactId>                <configuration>                    <source>1.8</source>                    <target>1.8</target>                </configuration>            </plugin>        </plugins>    </build></project>

以上就是如何进行storm1.1.3与kafka1.0.0整合,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注编程网精选频道。

--结束END--

本文标题: 如何进行storm1.1.3与kafka1.0.0整合

本文链接: https://lsjlt.com/news/231215.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • 如何进行storm1.1.3与kafka1.0.0整合
    本篇文章给大家分享的是有关如何进行storm1.1.3与kafka1.0.0整合,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。package hgs.core.sk;...
    99+
    2023-06-02
  • 如何将spring与quartz进行整合
    如何将spring与quartz进行整合?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。第0步:在spring配置包扫描以及在 pom导入包spring.xml:pom.xml1...
    99+
    2023-05-31
    quartz spring art
  • springboot如何进行整合mongodb
    springboot如何进行整合mongodb,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。准备工作安装 MongoDBjdk 1.8maven 3.0idea环境依赖在p...
    99+
    2023-06-19
  • 如何进行SpringBoot 整合JPA
    如何进行SpringBoot 整合JPA,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。JPA全称Java Persistence API.JPA通过JDK 5....
    99+
    2023-06-05
  • 使用spring如何实现springmvc与mybatis进行整合
    使用spring如何实现springmvc与mybatis进行整合?针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。jar包 引入web.xml文件<conte...
    99+
    2023-05-31
    spring springmvc mybatis
  • 如何对SSM框架进行整合
    今天就跟大家聊聊有关如何对SSM框架进行整合,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。SSM(Spring+SpringMVC+Mybatis)是目前较为主流的企业级架构方案,不...
    99+
    2023-05-31
    ssm
  • 如何进行SpringBoot整合JWT的实现
    这篇文章跟大家分析一下“如何进行SpringBoot整合JWT的实现”。内容详细易懂,对“如何进行SpringBoot整合JWT的实现”感兴趣的朋友可以跟着小编的思路慢慢深入来阅读一下,希望阅读后能够对大家有所帮助。下面跟着小编一起深入学习...
    99+
    2023-06-26
  • 使用MongoDB如何对Spring进行整合
    本篇文章给大家分享的是有关使用MongoDB如何对Spring进行整合,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。添加依赖<dependency> &n...
    99+
    2023-05-31
    mongodb spring
  • 怎么对struts、spring与hibernate进行整合
    怎么对struts、spring与hibernate进行整合?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。准备三个框架结合的lib包Spring3结合Struts2的步骤如下:...
    99+
    2023-05-31
    struts hibernate spring
  • 使用Spring Boot如何对Mybatis进行整合
    今天就跟大家聊聊有关使用Spring Boot如何对Mybatis进行整合,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。依赖配置结合前面的内容,这里我们要嵌入数据库的操作,这里以操作...
    99+
    2023-05-31
    springboot mybatis
  • 如何利用VSTS跟Kubernetes整合进行CI/CD
    这篇文章主要讲解了“如何利用VSTS跟Kubernetes整合进行CI/CD”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“如何利用VSTS跟Kubernetes整合进行CI/CD”吧!为什么...
    99+
    2023-06-19
  • SpringBoot详解如何进行整合Druid数据源
    目录1.自定义方式1.添加依赖2.编写配置3.测试2.starter方式(推荐)1.添加依赖2.编写配置3.测试Druid是数据库连接池,它能够提供强大的监控和扩展功能。官方文档 S...
    99+
    2024-04-02
  • 使用SpringBoot如何实现对ElasticSearch进行整合
    这篇文章给大家介绍使用SpringBoot如何实现对ElasticSearch进行整合,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。一、实体设计:Tutorial.javapublic class Tutorial i...
    99+
    2023-05-31
    springboot elasticsearch
  • 使用springboot如何实现对freemarker进行整合
    这篇文章将为大家详细讲解有关使用springboot如何实现对freemarker进行整合,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。前提:开发工具:idea框架:spring boot、...
    99+
    2023-05-31
    springboot freemarker
  • 利用Android如何实现对 ToolBar进行整合
    本篇文章给大家分享的是有关利用Android如何实现对 ToolBar进行整合,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。CustomeToolBar继承原生ToolBarp...
    99+
    2023-05-31
    android toolbar roi
  • 使用springboot如何实现对mongodb进行整合
    使用springboot如何实现对mongodb进行整合?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。环境依赖在pom文件引入spring-boot-starter-data-...
    99+
    2023-05-31
    springboot mongodb
  • 使用springboot如何实现对 beatlsql进行整合
    这篇文章给大家介绍使用springboot如何实现对 beatlsql进行整合,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。BeetSql是一个全功能DAO工具, 同时具有hibernate 优点 & Myba...
    99+
    2023-05-31
    springboot beatlsql bea
  • 使用spring如何对cxf框架进行整合
    使用spring如何对cxf框架进行整合?很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。1.创建动态web项目2.导入cxf和spring相关jar包(CXF核心...
    99+
    2023-05-31
    cxf spring
  • 使用Spring4如何实现对Hibernate5进行整合
    使用Spring4如何实现对Hibernate5进行整合?很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。Spring与Hiberante整合通过hibernate...
    99+
    2023-05-31
    spring hibernate5
  • redis与ssm如何整合
    这篇文章主要介绍redis与ssm如何整合,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!SSM+redis整合ssm框架之前已经搭建过了,这里不再做代码复制工作。这里主要是利用redis去做mybatis的二级缓存,...
    99+
    2023-05-30
    ssm redis
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作