返回顶部
首页 > 资讯 > 后端开发 > 其他教程 >springboot整合mongodb changestream的示例代码
  • 433
分享到

springboot整合mongodb changestream的示例代码

2024-04-02 19:04:59 433人浏览 独家记忆
摘要

目录前言Change Stream 介绍环境准备Java客户端操作changestream1、引入Maven依赖2、测试类核心代码下面来看看具体的整合步骤1、引入核心依赖2、核心配置

前言

changestream是mongGodb的3.6版本之后出现的一种基于collection(数据库集合)的变更事件流,应用程序通过db.collection.watch()这样的命令可以获得被监听对象的实时变更

想必对Mysql主从复制原理比较熟悉的同学应该知道,其根本就是从节点通过监听binlog日志,然后解析binlog日志数据达到数据同步的目的,于是,基于mysql主从复制原理,阿里开源了canal这样的数据同步中间件工具

Change Stream 介绍

Chang Stream(变更记录流) 是指collection(数据库集合)的变更事件流,应用程序通过db.collection.watch()这样的命令可以获得被监听对象的实时变更。

关于changestream做如下说明,提供参考

  • 在该特性出现之前,开发者可通过拉取 oplog达到同样的目的;
  • 但 oplog 的处理及解析相对复杂,而且存在被回滚的风险,如果使用不当的话还会带来性能问题;
  • Change Stream 可以与aggregate framework结合使用,对变更集进行进一步的过滤或转换;
  • 由于Change Stream 利用了存储在 oplog 中的信息,因此对于单进程部署的mongoDB无法支持Change Stream功能,其只能用于启用了副本集的独立集群或分片集群

changestream可用于监听的mongodb目标类型

  • 单个集合,除系统库(admin/local/config)之外的集合,3.6版本支持
  • 单个数据库,除系统库(admin/local/config)之外的数据库集合,4.0版本支持
  • 整个集群,整个集群内除去系统库( (admin/local/config)之外的集合 ,4.0版本支持

一个Change Stream Event的基本结构如下所示:

{
   _id : { <BSON Object> },
   "operationType" : "<operation>",
   "fullDocument" : { <document> },
   "ns" : {
      "db" : "<database>",
      "coll" : "<collection"
   },
   "documenTKEy" : { "_id" : <ObjectId> },
   "updateDescription" : {
      "updatedFields" : { <document> },
      "removedFields" : [ "<field>", ... ]
   }
   "clusterTime" : <Timestamp>,
   "txnNumber" : <NumberLong>,
   "lsid" : {
      "id" : <UUID>,
      "uid" : <BinData>
   }
}

关于上面的数据结构,做简单的解释说明,

  • _id,变更事件的Token对象
  • operationType,变更类型(见下面介绍)
  • fullDocument,文档内容
  • ns,监听的目标
  • ns.db,变更的数据库
  • ns.coll,变更的集合
  • documentKey,变更文档的键值,含_id字段
  • updateDescription,变更描述
  • updateDescription.updatedFields,变更中更新字段
  • updateDescription.removedFields,变更中删除字段
  • clusterTime,对应oplog的时间戳
  • txnNumber,事务编号,仅在多文档事务中出现,4.0版本支持
  • lsid,事务关联的会话编号,仅在多文档事务中出现,4.0版本支持

Change Steram支持的变更类型,对于上面的operationType 这个参数,主要包括有以下几个:

  • insert,插入文档
  • delete,删除文档
  • replace,替换文档,当执行replace操作指定upsert时,可能是insert事件
  • update,更新文档,当执行update操作指定upsert时,可能是insert事件
  • invalidate,失效事件,比如执行了collection.drop或collection.rename

以上的几种类型,可以简单理解为,监听的mongo用户操作的事件类型,比如新增数据,删除数据,修改数据等

以上为changestream的必备理论知识,想要深入学习的话无比要了解,下面通过实操来展示下changestream的使用

环境准备

mongdb复制集群,本例的复制集群对应的mongodb版本为 4.0.X

登录primary节点,创建一个数据库

友情提醒:数据库需要提前创建

1、启动两个Mongo shell,一个操作数据库,一个watch

在其中一个窗口执行如下命令,开启监听

cursor = db.comment.watch()

2、在另一个窗口下,给上面的articledb插入一条数据

数据写入成功后,在第一个窗口下,执行下面的命令:

cursor.next()

说明已经成功监听到新增的数据,修改、删除事件可以做类似的操作即可

以上先通过shell窗口展示了一下changestream的使用效果,接下来,将通过程序演示下如何在客户端集成并使用changestream

Java客户端操作changestream

1、引入maven依赖

<dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>mongo-java-driver</artifactId>
            <version>3.12.2</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastJSON</artifactId>
            <version>1.2.75</version>
        </dependency>

2、测试类核心代码

import com.mongodb.*;
import com.mongodb.client.MongoDatabase;
import org.bson.conversions.Bson;

import java.util.List;
import static java.util.Collections.singletonList;
import com.alibaba.fastjson.JSONObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static java.util.Arrays.asList;
public class MongoTest {
    private static Logger logger = LoggerFactory.getLogger(MongoTest.class);
    public static void main(String[] args) {
        showmogodbdata();
    }
    private static void showmogodbdata() {
		
        String sURI = "mongodb://IP:27017";
        MongoClient mongoClient = new MongoClient(new MongoClientURI(sURI));
        MongoDatabase database = mongoClient.getDatabase("articledb");
        MongoCollection<Document> collec = database.getCollection("comment");
        List<Bson> pipeline = singletonList(Aggregates.match(Filters.or(
                Document.parse("{'fullDocument.articleid': '100007'}"),
                Filters.in("operationType", asList("insert", "update", "delete")))));
        MongoCursor<ChangeStreamDocument<Document>> cursor = collec.watch(pipeline).iterator();
        while (cursor.hasNext()) {
            ChangeStreamDocument<Document> next = cursor.next();
            logger.info("输出mogodb的next的对应的值" + next.toString());
            String Operation = next.getOperationType().getValue();
            String tableNames = next.getNamespace().getCollectionName();
            System.out.println(tableNames);
            //获取主键id的值
            String pk_id = next.getDocumentKey().toString();
            //同步修改数据的操作
            if (next.getUpdateDescription() != null) {
                JSONObject jsonObject = JSONObject.parseObject(next.getUpdateDescription().getUpdatedFields().toJson());
                System.out.println(jsonObject);
            }
            //同步插入数据的操作
            if (next.getFullDocument() != null) {
                JSONObject jsonObject = JSONObject.parseObject(next.getFullDocument().toJson());
            //同步删除数据的操作
            if (next.getUpdateDescription() == null && Operation.matches("delete")) {
                JSONObject jsonObject = JSONObject.parseObject(pk_id);
        }
}

这段程序主要分为几个核心部分,做如下解释说明,

  • 连接mogodb服务端及相关配置
  • 通过pipline开启watch监听
  • 监听到特定数据库下集合的数据变化,然后打印出变化的数据

启动这段程序,观察控制台日志数据

在未对articledb数据库下的comment集合做任何操作之前,由于watch为检测到任何数据变化,所以无法进入到while循环中,接下来,从shell端给comment集合新增一条数据,然后再次观察控制台数据变化

可以看到,控制台很快就检测到变化的数据

以下为完整的日志数据

{ operationType=OperationType{value='insert'}, resumeToken={"_data": "8262138891000000022B022C0100296E5A1004B9065629412942F8852D592B9FD441B946645F696400646213889158B116A29C3FD1140004"}, namespace=articledb.comment, destinationNamespace=null, fullDocument=Document{{_id=6213889158b116a29c3fd114, articleid=100010, content=hello kafka, userid=1010, nickname=marry}}, documentKey={"_id": {"$oid": "6213889158b116a29c3fd114"}}, clusterTime=Timestamp{value=7067142396626075650, seconds=1645447313, inc=2}, updateDescription=null, txnNumber=null, lsid=null}

至于在业务中的具体使用,可以结合自身的情况,举例来说,应用程序只想监听修改数据的事件,那么就可以在修改数据事件的监听逻辑中,解析变化后的数据做后续的操作

SpringBoot整合changestream

在实际开发中,更通用的场景是整合到springboot工程中使用,有过一定的开发经验的同学应该很容易想到核心的逻辑长什么样了,和canal的客户端操作类似,需要在一个配置类去监听即可

下面来看看具体的整合步骤

1、引入核心依赖

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-WEB</artifactId>
        </dependency>

        <dependency>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <artifactId>spring-boot-starter</artifactId>

2、核心配置文件

本例演示的是基于上文搭建的mongodb复制集群

server.port=8081

#mongodb配置
spring.data.mongodb.uri=mongodb://IP:27017,IP:27018,IP:27019/articledb?maxPoolSize=512

3、编写实体类,映射comment集合中的字段

import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;

@Document(collection="comment")
public class Comment {
    @Id
    private String articleid;
    private String content;
    private String userid;
    private String nickname;
    private Date createdatetime;
    public String getArticleid() {
        return articleid;
    }
    public void setArticleid(String articleid) {
        this.articleid = articleid;
    public String getContent() {
        return content;
    public void setContent(String content) {
        this.content = content;
    public String getUserid() {
        return userid;
    public void setUserid(String userid) {
        this.userid = userid;
    public String getNickname() {
        return nickname;
    public void setNickname(String nickname) {
        this.nickname = nickname;
    public Date getCreatedatetime() {
        return createdatetime;
    public void setCreatedatetime(Date createdatetime) {
        this.createdatetime = createdatetime;
}

4、编写一个服务类

简单的添加2个用接口测试的方法

import com.congge.entity.Comment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Service;

import java.util.List;
@Service
public class MongoDbService {
    private static final Logger logger = LoggerFactory.getLogger(MongoDbService.class);
    @Autowired
    private MongoTemplate mongoTemplate;
    
    public List<Comment> findAll() {
        return mongoTemplate.findAll(Comment.class);
    }
    
    }
}

ChangeStream 类 ,事件注册类,即开篇中提到的那几种事件类型的操作等

import com.congge.entity.Comment;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest;
import org.springframework.data.mongodb.core.messaging.MessageListenerContainer;
import org.springframework.data.mongodb.core.query.Criteria;

@Configuration
public class ChangeStream implements CommandLineRunner {
    @Autowired
    private MongoMessageListener mongoMessageListener;
    private MessageListenerContainer messageListenerContainer;
    @Override
    public void run(String... args) throws Exception{
        ChangeStreamRequest<Comment> request = ChangeStreamRequest.builder(mongoMessageListener)
                .collection("comment")
                .filter(Aggregation.newAggregation(Aggregation.match(Criteria.where("operationType").in("insert","update","replace"))))
                .build();
        messageListenerContainer.reGISter(request,Comment.class);
    }
}

MongoConfig 配置MessageListenerContainer 容器的相关参数

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.messaging.DefaultMessageListenerContainer;
import org.springframework.data.mongodb.core.messaging.MessageListenerContainer;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@Configuration
public class MongoConfig {
    @Bean
    MessageListenerContainer messageListenerContainer(MongoTemplate mongoTemplate){
        Executor executor = Executors.newFixedThreadPool(5);
        return new DefaultMessageListenerContainer(mongoTemplate,executor){
            @Override
            public boolean isAutoStartup(){
                return true;
            }
        };
    }
}

3个类添加完成后,再次启动程序,并观察控制台数据日志

测试1:通过shell窗口登录primary节点,并给comment集合添加一条数据

几乎是实时的监听到事件操作的数据变化,下面是完整的输出日志

测试2:通过shell窗口删除上面新增的这条数据

典型应用场景

数据迁移

如果一个系统的数据需要迁移到另一个系统,可以考虑使用mongodb changestream这种方式,试想,如果老系统数据非常杂乱,并且文档中存在一些脏数据时,为了确保迁移后的数据能较快的投产,通过应用程序的方式,能够原始的数据做类似ETL的处理,这样更加方便

应用监控

如果您的系统对数据监管较为严格,可以考虑使用changestream这种方式,订阅特定事件的数据操作,比如修改和删除数据的事件,然后及时的发送告警通知

对接大数据应用

我们知道,mongodb作为一款性能优秀的分布式文档型数据库,其实是可以存储海量数据的,在一些大数据场景下,比如下游其他的应用采用大数据技术,需要对mongo中的数据做轨迹行为分析,changestream就是一种不错的选择,当监听到特定事件的数据变化时,向消息队列,比如kafka推送相应的消息,下游相关的大数据应用就可以做后续的业务处理了

到此这篇关于springboot整合mongodb changestream的示例代码的文章就介绍到这了,更多相关springboot整合mongodb changestream内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

--结束END--

本文标题: springboot整合mongodb changestream的示例代码

本文链接: https://lsjlt.com/news/140063.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • springboot整合mongodb changestream的示例代码
    目录前言Change Stream 介绍环境准备Java客户端操作changestream1、引入maven依赖2、测试类核心代码下面来看看具体的整合步骤1、引入核心依赖2、核心配置...
    99+
    2024-04-02
  • SpringBoot整合MongoDB完整实例代码
    目录一、新建项目二、docker-compose 配置mongoDB三、SpringBoot配置MongoDB问题:Exception authenticating MongoCre...
    99+
    2024-04-02
  • SpringBoot整合SpringDataRedis的示例代码
      本文介绍下SpringBoot如何整合SpringDataRedis框架的,SpringDataRedis具体的内容在前面已经介绍过了,可自行参考。 1....
    99+
    2024-04-02
  • springboot 整合sentinel的示例代码
    目录1. 安装sentinel2.客户端连接1. 安装sentinel         下载地址:https://github.com/ali...
    99+
    2024-04-02
  • SpringBoot整合ShardingSphere的示例代码
    目录一、相关依赖二、Nacos数据源配置三、项目配置四、验证概要: ShardingSphere是一套开源的分布式数据库中间件解决方案组成的生态圈,它由Sharding-JDBC、S...
    99+
    2024-04-02
  • Springboot整合kafka的示例代码
    目录1.整合kafka2.消息发送2.1发送类型2.2序列化2.3分区策略3.消息消费3.1消息组别3.2位移提交1. 整合kafka 1、引入依赖 <dependency&...
    99+
    2024-04-02
  • SpringBoot整合Liquibase的示例代码
    目录整合1整合2SpringBoot整合Liquibase虽然不难但坑还是有一点的,主要集中在配置路径相关的地方,在此记录一下整合的步骤,方便以后自己再做整合时少走弯路,当然也希望能...
    99+
    2024-04-02
  • SpringBoot整合logback的示例代码
    Logback简介 1、logback和log4j是同一个作者,logback可以看作是log4j的升级版 2、logback分为三个模块, logback-core, logbac...
    99+
    2024-04-02
  • springboot 整合hbase的示例代码
    目录前言HBase 定义HBase 数据模型物理存储结构数据模型1、Name Space2、Region3、Row4、Column5、Time Stamp6、Cell搭建步骤1、官网...
    99+
    2024-04-02
  • SpringBoot整合jersey的示例代码
    这篇文章主要从以下几个方面来介绍。简单介绍下jersey,springboot,重点介绍如何整合springboot与jersey。 什么是jersey 什么是springboot 为什么要使用springboot+jersey 如...
    99+
    2023-05-31
    springboot jersey ers
  • SpringBoot整合ElasticSearch的示例代码
    ElasticSearch作为基于Lucene的搜索服务器,既可以作为一个独立的服务部署,也可以签入Web应用中。SpringBoot作为Spring家族的全新框架,使得使用SpringBoot开发Spring应用变得非常简单。本文要介绍如...
    99+
    2023-05-31
    spring boot elasticsearch
  • SpringBoot整合Minio的示例代码
    SpringBoot整合Minio 进入Minio官网,下载对应的Minio版本 官网安装文档 下载完成之后,启动(windows版) minio.exe server D:\m...
    99+
    2022-12-27
    SpringBoot整合Minio SpringBoot Minio整合 SpringBoot Minio
  • SpringBoot整合aws的示例代码
    业务需求 将本地的一些文件保存到aws上 引入依赖 创建client 工具类 引入依赖 <dependency> ...
    99+
    2024-04-02
  • SpringBoot整合JdbcTemplate的示例代码
    目录前言初始化SpringBoot项目使用IDEA创建项目导入JDBC依赖导入数据库驱动修改配置文件数据库sys_user表结构测试类代码查询sys_user表数据量查询sys_us...
    99+
    2024-04-02
  • SpringBoot整合MyBatis-Plus的示例代码
    目录前言源码环境开发工具 SQL脚本 正文单工程POM文件(注意) application.properties(注意)自定义配置(注意)实体类(注意)...
    99+
    2024-04-02
  • springboot整合xxl-job的示例代码
    目录关于xxl-job调度中心执行器关于xxl-job 在我看来,总体可以分为三大块: 调度中心执行器配置定时任务 调度中心 简单来讲就是 xxl-job-admin那个模块,配置:...
    99+
    2024-04-02
  • SpringBoot 整合mongoDB并自定义连接池的示例代码
    目录Maven 依赖配置文件MongoConfig配置类得力于SpringBoot的特性,整合mongoDB是很容易的,我们整合mongoDB的目的就是想用它给我们提供的mongoT...
    99+
    2024-04-02
  • SpringBoot整合MongoDB的实现代码
    目录一、添加Maven依赖二、application.yml配置三、代码中使用Mongo1.Mongo适用场景2.以我最近博客开发的一个联系我为例(这里我使用Mongo)MongoD...
    99+
    2024-04-02
  • SpringBoot整合Redis管道的示例代码
    目录1. Redis 之管道(pipeline)2. SpringBoot 整合 Redis 管道实例1. Redis 之管道(pipeline) 执行一个Redis命令,Redis...
    99+
    2024-04-02
  • SpringBoot框架整合SwaggerUI的示例代码
    整合swagger进行模块测试 注意事项:为方便SpringBoot更好的整合Swagger,需要专门放置在一个模块中(maven子工程) 创建公共模块,整合swagger,为了所有...
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作