返回顶部
首页 > 资讯 > 后端开发 > Python >SpringBoot整合Pulsar的实现示例
  • 353
分享到

SpringBoot整合Pulsar的实现示例

2024-04-02 19:04:59 353人浏览 独家记忆

Python 官方文档:入门教程 => 点击学习

摘要

目录一、添加pom.xml依赖二、Pulsar 参数类三、Pulsar 配置类四、不同消费数据类型的监听器五、Pulsar的核心服务类六、Pulsar整合spring cloud一、

一、添加pom.xml依赖

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.7.0</version>
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-WEB</artifactId>
    </dependency>

    <dependency>
        <groupId>org.apache.pulsar</groupId>
        <artifactId>pulsar-client</artifactId>
        <version>2.10.0</version>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.24</version>
        <scope>provided</scope>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.Maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
                <source>8</source>
                <target>8</target>
            </configuration>
        </plugin>
    </plugins>
</build>    

二、Pulsar 参数类

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

import java.util.Map;



@Component
@ConfigurationProperties(prefix = "tdMQ.pulsar")
@Data
public class PulsarProperties {

    
    private String serviceurl;

    
    private String tdcNamespace;

    
    private String tdcToken;

    
    private String cluster;

    
    private Map<String, String> topicMap;

    
    private Map<String, String> subMap;

    
    private String onOff;
}

三、Pulsar 配置类

import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;



@Configuration
@EnableConfigurationProperties(PulsarProperties.class)
public class PulsarConfig {

    @Autowired
    PulsarProperties pulsarProperties;

    @Bean
    public PulsarClient getPulsarClient() {

        try {
            return PulsarClient.builder()
                    .authentication(AuthenticationFactory.token(pulsarProperties.getTdcToken()))
                    .serviceUrl(pulsarProperties.getServiceurl())
                    .build();
        } catch (PulsarClientException e) {
            System.out.println(e);
            throw new RuntimeException("初始化Pulsar Client失败");
        }
    }

}

四、不同消费数据类型的监听器

import com.yibo.pulsar.pojo.User;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.springframework.stereotype.Component;



@Component
public class UserMessageListener implements MessageListener<User> {

    @Override
    public void received(Consumer<User> consumer, Message<User> msg) {
        try {
            User user = msg.getValue();
            System.out.println(user);
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    }
}
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.springframework.stereotype.Component;



@Component
public class StringMessageListener implements MessageListener<String> {

    @Override
    public void received(Consumer<String> consumer, Message<String> msg) {
        try {
            System.out.println(msg.getValue());
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    }
}

五、Pulsar的核心服务类

import com.yibo.pulsar.common.listener.StringMessageListener;
import com.yibo.pulsar.common.listener.UserMessageListener;
import com.yibo.pulsar.pojo.User;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;



@Component
public class PulsarCommon {

    @Autowired
    private PulsarProperties pulsarProperties;

    @Autowired
    private PulsarClient client;

    @Autowired
    private UserMessageListener userMessageListener;

    @Autowired
    private StringMessageListener stringMessageListener;


    
    public <T> Producer<T> createProducer(String topic, Schema<T> schema) {

        try {
            return client.newProducer(schema)
                    .topic(pulsarProperties.getCluster() + "/" + pulsarProperties.getTdcNamespace() + "/" + topic)
                    .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
                    .sendTimeout(10, TimeUnit.SECONDS)
                    .blockIfQueueFull(true)
                    .create();
        } catch (PulsarClientException e) {
            throw new RuntimeException("初始化Pulsar Producer失败");
        }
    }


    
    public <T> Consumer<T> createConsumer(String topic, String subscription,
                                   MessageListener<T> messageListener, Schema<T> schema) {
        try {
            return client.newConsumer(schema)
                    .topic(pulsarProperties.getCluster() + "/" + pulsarProperties.getTdcNamespace() + "/" + topic)
                    .subscriptionName(subscription)
                    .ackTimeout(10, TimeUnit.SECONDS)
                    .subscriptionType(SubscriptionType.Shared)
                    .messageListener(messageListener)
                    .subscribe();
        } catch (PulsarClientException e) {
            throw new RuntimeException("初始化Pulsar Consumer失败");
        }
    }

    
    
    public <T> void sendAsyncMessage(T message, Producer<T> producer) {
        producer.sendAsync(message).thenAccept(msgId -> {
        });
    }
    
    
    
    public <T> void sendSyncMessage(T message, Producer<T> producer) throws PulsarClientException {
        MessageId send = producer.send(message);
        System.out.println();
        System.out.println();
        System.out.println();
        System.out.println();
        System.out.println(send);
    }

    
    //-----------consumer-----------
    @Bean(name = "comment-publish-topic-consumer")
    public Consumer<String> getCommentPublishTopicConsumer() {
        return this.createConsumer(pulsarProperties.getTopicMap().get("comment-publish-topic"),
                pulsarProperties.getSubMap().get("comment-publish-topic-test"),
                stringMessageListener, Schema.STRING);
    }


    @Bean(name = "reply-publish-topic-consumer")
    public Consumer<User> getReplyPublishTopicConsumer() {
        return this.createConsumer(pulsarProperties.getTopicMap().get("reply-publish-topic"),
                pulsarProperties.getSubMap().get("reply-publish-topic-test"),
                userMessageListener, AvroSchema.of(User.class));
    }


    //-----------producer-----------
    @Bean(name = "comment-publish-topic-producer")
    public Producer<String> getCommentPublishTopicProducer() {
        return this.createProducer(pulsarProperties.getTopicMap().get("comment-publish-topic"),Schema.STRING);
    }


    @Bean(name = "reply-publish-topic-producer")
    public Producer<User> getReplyPublishTopicProducer() {
        return this.createProducer(pulsarProperties.getTopicMap().get("reply-publish-topic"), AvroSchema.of(User.class));
    }
}

六、Pulsar整合Spring Cloud

后来发现如上代码会导致BUG-> 在更新Nacos配置之后 Consumer会挂掉
经排查发现结果是由于@RefreshScope注解导致,此注解将摧毁Bean,PulsarConsumer和Producer都将被摧毁,只是说Producer将在下⼀次调⽤中完成重启,Consumer则不能重启,因为没有调⽤,那么怎么解决呢?

就是发布系列事件以刷新容器

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;



@Component
@Slf4j
public class RefreshPulsarListener implements ApplicationListener {

    @Autowired
    ApplicationContext applicationContext;

    @Override
    public void onApplicationEvent(ApplicationEvent event) {

        if (event.getSource().equals("__refreshAll__")) {
            log.info("Nacos配置中心配置修改 重启Pulsar====================================");
            log.info("重启PulsarClient,{}", applicationContext.getBean("getPulsarClient"));
            log.info("重启PulsarConsumer,{}", applicationContext.getBean("comment-publish-topic-consumer"));
            log.info("重启PulsarConsumer,{}", applicationContext.getBean("reply-publish-topic-consumer"));
        }
    }

}

参考:

https://wenku.baidu.com/view/4d3337ab6b0203D8ce2f0066f5335a8102d266a7.html

Https://gitee.com/zhaoyuxuan66/pulsar-SpringCloud_boot-demo/tree/master/

https://blog.csdn.net/weixin_56227932/article/details/122897075

http://www.zzvips.com/article/219361.html

https://mp.weixin.qq.com/s/4w0eucDNcrYrsiDXHzLwuQ

到此这篇关于SpringBoot整合Pulsar的实现示例的文章就介绍到这了,更多相关SpringBoot整合Pulsar内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

--结束END--

本文标题: SpringBoot整合Pulsar的实现示例

本文链接: https://lsjlt.com/news/153223.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • SpringBoot整合Pulsar的实现示例
    目录一、添加pom.xml依赖二、Pulsar 参数类三、Pulsar 配置类四、不同消费数据类型的监听器五、Pulsar的核心服务类六、Pulsar整合Spring Cloud一、...
    99+
    2024-04-02
  • SpringBoot整合Apache Pulsar教程示例
    目录正文准备工作创建 SpringBoot 项目添加 Maven 依赖编写消息生产者编写消息消费者测试总结正文 推荐一个基于SpringBoot开发的全平台数据(数据库管理工具)功...
    99+
    2023-03-10
    SpringBoot整合Apache Pulsar SpringBoot 整合
  • SpringBoot怎么整合Pulsar
    这篇文章主要介绍了SpringBoot怎么整合Pulsar的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇SpringBoot怎么整合Pulsar文章都会有所收获,下面我们一起来看看吧。一、添加pom.xml依赖...
    99+
    2023-07-02
  • SpringBoot怎么整合Apache Pulsar
    这篇文章主要介绍了SpringBoot怎么整合Apache Pulsar的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇SpringBoot怎么整合Apache Pulsar文章都会有所收获,...
    99+
    2023-07-05
  • Springboot整合FreeMarker的实现示例
    目录一、项目搭建1、新建模块2、导入依赖 :将不相关的依赖删掉3、新建软件包,放入student实体类4、新建StudentMapper接口5、Springboot04Applica...
    99+
    2024-04-02
  • springboot 整合 clickhouse的实现示例
    目录前言前置准备使用jdbc方式操作clickhouse与springboot的整合代码完整整合步骤前言 了解了clickhouse的基础概念和相关的理论之后,本篇将通过实例代码演示...
    99+
    2024-04-02
  • SpringBoot整合Redis的实现示例
    目录1.需求说明2.整合实现2.1.创建Springboot工程2.2.redis配置3.编写测试类4.注意事项和细节1.需求说明 在 springboot 中 , 整合 redis...
    99+
    2023-01-28
    SpringBoot整合Redis SpringBoot Redis整合
  • SpringBoot整合WebService的实现示例
    目录SpringBoot搭建WebService程序一、定义规范接口二、搭建WebService服务端三、搭建WebService客户端WebService是一种传统的SOA技术架构...
    99+
    2024-04-02
  • SpringBoot整合JWT的实现示例
    目录一. JWT简介二. Java实现JWT(SpringBoot方式整合)JWT总结一. JWT简介 1. 什么是JWT? JWT(JSON Web Token)是为了在...
    99+
    2024-04-02
  • springboot整合spring-retry的实现示例
    目录1、背景2、解决方案2.1 pom文件2.2 applicat启动类2.3 controller类2.4 service测试类(重点)2.5 项目启动2.6 使用swagger进...
    99+
    2024-04-02
  • springboot 整合canal实现示例解析
    目录前言环境准备一、快速搭建canal服务搭建步骤1、服务器使用docker快速安装一个mysql并开启binlog日志2、上传canal安装包并解压3、进入到第二步解压后的文件目录...
    99+
    2024-04-02
  • SpringBoot整合Netty实现WebSocket的示例代码
    目录一、pom.xml依赖配置二、代码2.1、NettyServer 类2.2、SocketHandler 类2.3、ChannelHandlerPool 类2.4、Applicat...
    99+
    2024-04-02
  • SpringBoot整合MybatisPlus实现基本CRUD的示例代码
    目录一、引入相应的依赖二、进行配置三、新建数据库表四、配置 Mybatis Plus 自动填充五、实现User实体类、UserMapper、UserService六、使用Restfu...
    99+
    2023-05-19
    SpringBoot MybatisPlus实现CRUD SpringBoot MybatisPlus CRUD
  • SpringBoot整合Mybatis Plus多数据源的实现示例
    目录导读添加依赖application.properties 2种方式创建DataSource Master配置,使用druid连接池 Slave配置 启动类演示导读   有一个这样...
    99+
    2024-04-02
  • SpringBoot整合SpringDataRedis的示例代码
      本文介绍下SpringBoot如何整合SpringDataRedis框架的,SpringDataRedis具体的内容在前面已经介绍过了,可自行参考。 1....
    99+
    2024-04-02
  • springboot 整合sentinel的示例代码
    目录1. 安装sentinel2.客户端连接1. 安装sentinel         下载地址:https://github.com/ali...
    99+
    2024-04-02
  • SpringBoot整合ShardingSphere的示例代码
    目录一、相关依赖二、Nacos数据源配置三、项目配置四、验证概要: ShardingSphere是一套开源的分布式数据库中间件解决方案组成的生态圈,它由Sharding-JDBC、S...
    99+
    2024-04-02
  • Springboot整合kafka的示例代码
    目录1.整合kafka2.消息发送2.1发送类型2.2序列化2.3分区策略3.消息消费3.1消息组别3.2位移提交1. 整合kafka 1、引入依赖 <dependency&...
    99+
    2024-04-02
  • SpringBoot整合Liquibase的示例代码
    目录整合1整合2SpringBoot整合Liquibase虽然不难但坑还是有一点的,主要集中在配置路径相关的地方,在此记录一下整合的步骤,方便以后自己再做整合时少走弯路,当然也希望能...
    99+
    2024-04-02
  • SpringBoot整合logback的示例代码
    Logback简介 1、logback和log4j是同一个作者,logback可以看作是log4j的升级版 2、logback分为三个模块, logback-core, logbac...
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作