本篇文章给大家分享的是有关如何进行stORM1.1.3与kafka1.0.0整合,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。package hgs.core.sk;
本篇文章给大家分享的是有关如何进行stORM1.1.3与kafka1.0.0整合,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。
package hgs.core.sk;import java.util.Map;import org.apache.storm.Config;import org.apache.storm.LocalCluster;import org.apache.storm.StormSubmitter;import org.apache.storm.kafka.BrokerHosts;import org.apache.storm.kafka.KafkaSpout;import org.apache.storm.kafka.SpoutConfig;import org.apache.storm.kafka.ZkHosts;import org.apache.storm.task.OutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.TopologyBuilder;import org.apache.storm.topology.base.BaseRichBolt;import org.apache.storm.tuple.Tuple;@SuppressWarnings("deprecation")public class StormKafkaMainTest {public static void main(String[] args) {TopologyBuilder builder = new TopologyBuilder();//ZooKeeper链接地址BrokerHosts hosts = new ZkHosts("bigdata01:2181,bigdata02:2181,bigdata03:2181");//KafkaSpout需要一个config,参数代表的意义1:zookeeper链接,2:消费kafka的topic,3,4:记录消费offset的zookeeper地址 ,这里会保存在 zookeeper//集群的/test7/consume下面SpoutConfig sconfig = new SpoutConfig(hosts, "test7", "/test7", "consume");//消费的时候忽略offset从头开始消费,这里可以注释掉,因为消费的offset在zookeeper中可以找到sconfig.ignoreZkOffsets=true;//sconfig.scheme = new SchemeAsMultiScheme( new StringScheme() );builder.setSpout("kafkaspout", new KafkaSpout(sconfig), 1);builder.setBolt("mybolt1", new MyboltO(), 1).shuffleGrouping("kafkaspout"); Config config = new Config(); config.setNumWorkers(1); try {StormSubmitter.submitTopology("storm----kafka--test", config, builder.createTopology());} catch (Exception e) {e.printStackTrace();} }}class MyboltO extends BaseRichBolt{private static final long serialVersionUID = 1L;OutputCollector collector = null;public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;}public void execute(Tuple input) {//这里把消息大一出来,在对应的woker下面的日志可以找到打印的内容//因为得到的内容是byte数组,所以需要转换String out = new String((byte[])input.getValue(0));System.out.println(out);collector.ack(input);}public void declareOutputFields(OutputFieldsDeclarer declarer) {}}
pom.xml文件的依赖
<project xmlns="Http://Maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>hgs</groupId> <artifactId>core.sk</artifactId> <version>1.0.0-SNAPSHOT</version> <packaging>jar</packaging> <name>core.sk</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>1.1.3</version></dependency><dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.1.3</version> <scope>provided</scope></dependency><dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>1.0.0</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions></dependency><!-- <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka-monitor</artifactId> <version>1.2.2</version></dependency> --><!-- <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.8.2.1</version></dependency> --><dependency> <groupId>org.clojure</groupId> <artifactId>clojure</artifactId> <version>1.7.0</version></dependency><!-- 尝试了很多次 都会有这个错误:java.lang.NullPointerException at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.getOffsetLags(KafkaOffsetLagUtil.java:272)最后修改为kafka相应的kafka-clients版本后问题得到解决,应该是该出的问题--><dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.0</version></dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>2.2</version> <configuration> <arcHive> <manifest> <!-- 我运行这个jar所运行的主类 --> <mainClass>hgs.core.sk.StormKafkaMainTest</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef> <!-- 必须是这样写 --> jar-with-dependencies </descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <Goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build></project>
以上就是如何进行storm1.1.3与kafka1.0.0整合,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注编程网精选频道。
--结束END--
本文标题: 如何进行storm1.1.3与kafka1.0.0整合
本文链接: https://lsjlt.com/news/231215.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0