目录前言Change Stream 介绍环境准备Java客户端操作changestream1、引入Maven依赖2、测试类核心代码下面来看看具体的整合步骤1、引入核心依赖2、核心配置
changestream是mongGodb的3.6版本之后出现的一种基于collection(数据库集合)的变更事件流,应用程序通过db.collection.watch()这样的命令可以获得被监听对象的实时变更
想必对Mysql主从复制原理比较熟悉的同学应该知道,其根本就是从节点通过监听binlog日志,然后解析binlog日志数据达到数据同步的目的,于是,基于mysql主从复制原理,阿里开源了canal这样的数据同步中间件工具
Chang Stream(变更记录流) 是指collection(数据库集合)的变更事件流,应用程序通过db.collection.watch()这样的命令可以获得被监听对象的实时变更。
关于changestream做如下说明,提供参考
changestream可用于监听的mongodb目标类型
一个Change Stream Event的基本结构如下所示:
{
_id : { <BSON Object> },
"operationType" : "<operation>",
"fullDocument" : { <document> },
"ns" : {
"db" : "<database>",
"coll" : "<collection"
},
"documenTKEy" : { "_id" : <ObjectId> },
"updateDescription" : {
"updatedFields" : { <document> },
"removedFields" : [ "<field>", ... ]
}
"clusterTime" : <Timestamp>,
"txnNumber" : <NumberLong>,
"lsid" : {
"id" : <UUID>,
"uid" : <BinData>
}
}
关于上面的数据结构,做简单的解释说明,
Change Steram支持的变更类型,对于上面的operationType 这个参数,主要包括有以下几个:
以上的几种类型,可以简单理解为,监听的mongo用户操作的事件类型,比如新增数据,删除数据,修改数据等
以上为changestream的必备理论知识,想要深入学习的话无比要了解,下面通过实操来展示下changestream的使用
mongdb复制集群,本例的复制集群对应的mongodb版本为 4.0.X
登录primary节点,创建一个数据库
友情提醒:数据库需要提前创建
1、启动两个Mongo shell,一个操作数据库,一个watch
在其中一个窗口执行如下命令,开启监听
cursor = db.comment.watch()
2、在另一个窗口下,给上面的articledb插入一条数据
数据写入成功后,在第一个窗口下,执行下面的命令:
cursor.next()
说明已经成功监听到新增的数据,修改、删除事件可以做类似的操作即可
以上先通过shell窗口展示了一下changestream的使用效果,接下来,将通过程序演示下如何在客户端集成并使用changestream
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>3.12.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastJSON</artifactId>
<version>1.2.75</version>
</dependency>
import com.mongodb.*;
import com.mongodb.client.MongoDatabase;
import org.bson.conversions.Bson;
import java.util.List;
import static java.util.Collections.singletonList;
import com.alibaba.fastjson.JSONObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static java.util.Arrays.asList;
public class MongoTest {
private static Logger logger = LoggerFactory.getLogger(MongoTest.class);
public static void main(String[] args) {
showmogodbdata();
}
private static void showmogodbdata() {
String sURI = "mongodb://IP:27017";
MongoClient mongoClient = new MongoClient(new MongoClientURI(sURI));
MongoDatabase database = mongoClient.getDatabase("articledb");
MongoCollection<Document> collec = database.getCollection("comment");
List<Bson> pipeline = singletonList(Aggregates.match(Filters.or(
Document.parse("{'fullDocument.articleid': '100007'}"),
Filters.in("operationType", asList("insert", "update", "delete")))));
MongoCursor<ChangeStreamDocument<Document>> cursor = collec.watch(pipeline).iterator();
while (cursor.hasNext()) {
ChangeStreamDocument<Document> next = cursor.next();
logger.info("输出mogodb的next的对应的值" + next.toString());
String Operation = next.getOperationType().getValue();
String tableNames = next.getNamespace().getCollectionName();
System.out.println(tableNames);
//获取主键id的值
String pk_id = next.getDocumentKey().toString();
//同步修改数据的操作
if (next.getUpdateDescription() != null) {
JSONObject jsonObject = JSONObject.parseObject(next.getUpdateDescription().getUpdatedFields().toJson());
System.out.println(jsonObject);
}
//同步插入数据的操作
if (next.getFullDocument() != null) {
JSONObject jsonObject = JSONObject.parseObject(next.getFullDocument().toJson());
//同步删除数据的操作
if (next.getUpdateDescription() == null && Operation.matches("delete")) {
JSONObject jsonObject = JSONObject.parseObject(pk_id);
}
}
这段程序主要分为几个核心部分,做如下解释说明,
启动这段程序,观察控制台日志数据
在未对articledb数据库下的comment集合做任何操作之前,由于watch为检测到任何数据变化,所以无法进入到while循环中,接下来,从shell端给comment集合新增一条数据,然后再次观察控制台数据变化
可以看到,控制台很快就检测到变化的数据
以下为完整的日志数据
{ operationType=OperationType{value='insert'}, resumeToken={"_data": "8262138891000000022B022C0100296E5A1004B9065629412942F8852D592B9FD441B946645F696400646213889158B116A29C3FD1140004"}, namespace=articledb.comment, destinationNamespace=null, fullDocument=Document{{_id=6213889158b116a29c3fd114, articleid=100010, content=hello kafka, userid=1010, nickname=marry}}, documentKey={"_id": {"$oid": "6213889158b116a29c3fd114"}}, clusterTime=Timestamp{value=7067142396626075650, seconds=1645447313, inc=2}, updateDescription=null, txnNumber=null, lsid=null}
至于在业务中的具体使用,可以结合自身的情况,举例来说,应用程序只想监听修改数据的事件,那么就可以在修改数据事件的监听逻辑中,解析变化后的数据做后续的操作
SpringBoot整合changestream
在实际开发中,更通用的场景是整合到springboot工程中使用,有过一定的开发经验的同学应该很容易想到核心的逻辑长什么样了,和canal的客户端操作类似,需要在一个配置类去监听即可
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-WEB</artifactId>
</dependency>
<dependency>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<artifactId>spring-boot-starter</artifactId>
本例演示的是基于上文搭建的mongodb复制集群
server.port=8081
#mongodb配置
spring.data.mongodb.uri=mongodb://IP:27017,IP:27018,IP:27019/articledb?maxPoolSize=512
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
@Document(collection="comment")
public class Comment {
@Id
private String articleid;
private String content;
private String userid;
private String nickname;
private Date createdatetime;
public String getArticleid() {
return articleid;
}
public void setArticleid(String articleid) {
this.articleid = articleid;
public String getContent() {
return content;
public void setContent(String content) {
this.content = content;
public String getUserid() {
return userid;
public void setUserid(String userid) {
this.userid = userid;
public String getNickname() {
return nickname;
public void setNickname(String nickname) {
this.nickname = nickname;
public Date getCreatedatetime() {
return createdatetime;
public void setCreatedatetime(Date createdatetime) {
this.createdatetime = createdatetime;
}
简单的添加2个用接口测试的方法
import com.congge.entity.Comment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class MongoDbService {
private static final Logger logger = LoggerFactory.getLogger(MongoDbService.class);
@Autowired
private MongoTemplate mongoTemplate;
public List<Comment> findAll() {
return mongoTemplate.findAll(Comment.class);
}
}
}
ChangeStream 类 ,事件注册类,即开篇中提到的那几种事件类型的操作等
import com.congge.entity.Comment;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest;
import org.springframework.data.mongodb.core.messaging.MessageListenerContainer;
import org.springframework.data.mongodb.core.query.Criteria;
@Configuration
public class ChangeStream implements CommandLineRunner {
@Autowired
private MongoMessageListener mongoMessageListener;
private MessageListenerContainer messageListenerContainer;
@Override
public void run(String... args) throws Exception{
ChangeStreamRequest<Comment> request = ChangeStreamRequest.builder(mongoMessageListener)
.collection("comment")
.filter(Aggregation.newAggregation(Aggregation.match(Criteria.where("operationType").in("insert","update","replace"))))
.build();
messageListenerContainer.reGISter(request,Comment.class);
}
}
MongoConfig 配置MessageListenerContainer 容器的相关参数
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.messaging.DefaultMessageListenerContainer;
import org.springframework.data.mongodb.core.messaging.MessageListenerContainer;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@Configuration
public class MongoConfig {
@Bean
MessageListenerContainer messageListenerContainer(MongoTemplate mongoTemplate){
Executor executor = Executors.newFixedThreadPool(5);
return new DefaultMessageListenerContainer(mongoTemplate,executor){
@Override
public boolean isAutoStartup(){
return true;
}
};
}
}
3个类添加完成后,再次启动程序,并观察控制台数据日志
测试1:通过shell窗口登录primary节点,并给comment集合添加一条数据
几乎是实时的监听到事件操作的数据变化,下面是完整的输出日志
测试2:通过shell窗口删除上面新增的这条数据
如果一个系统的数据需要迁移到另一个系统,可以考虑使用mongodb changestream这种方式,试想,如果老系统数据非常杂乱,并且文档中存在一些脏数据时,为了确保迁移后的数据能较快的投产,通过应用程序的方式,能够原始的数据做类似ETL的处理,这样更加方便
如果您的系统对数据监管较为严格,可以考虑使用changestream这种方式,订阅特定事件的数据操作,比如修改和删除数据的事件,然后及时的发送告警通知
我们知道,mongodb作为一款性能优秀的分布式文档型数据库,其实是可以存储海量数据的,在一些大数据场景下,比如下游其他的应用采用大数据技术,需要对mongo中的数据做轨迹行为分析,changestream就是一种不错的选择,当监听到特定事件的数据变化时,向消息队列,比如kafka推送相应的消息,下游相关的大数据应用就可以做后续的业务处理了
到此这篇关于springboot整合mongodb changestream的示例代码的文章就介绍到这了,更多相关springboot整合mongodb changestream内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!
--结束END--
本文标题: springboot整合mongodb changestream的示例代码
本文链接: https://lsjlt.com/news/140063.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-03-01
2024-03-01
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0