目录需求分析及确定方案实现部分代码踩坑总结需求 将HBase数据,解析后推送到RocketMQ。Redis使用list数据类型,存储了需要推送的数据的RowKey及表名。 简单画个流
简单画个流程图就是:
Redis
{"rowkey":rowkey,"table":table}
解析出rowkey;LRANGE
;LTRIM
;LLEN
;RPUSH
;删除LREM
等等;Hbase
Get
,没用scan
;CellUtil.clone
相关方法;RocketMQ
配置
#server configuration
server.port=8896
#log config
logging.file.path=./logs
#redis-standalone
redis.standalone.host=
redis.standalone.port=6379
redis.standalone.passWord=
redis.standalone.enable=true
#redis-cluster
redis.cluster.nodes=
redis.cluster.password=
redis.cluster.timeout=30000
redis.cluster.enable=false
# ZooKeeper 集群地址,逗号分隔
hbase.zookeeper.quorum=
# Zookeeper 端口
hbase.zookeeper.property.clientPort=2181
# 消息目的rocketmq地址
rocketmq.server.host=
# 发送消息间隔时间,防止发送过快mq受不了
rocketmq.send.interval.millisec=10
# 每次从redis读取数据量限制。
data.access.redisDataSize=100
# 失败数据重试次数,超过的直接丢弃
data.access.retryNum=10
# 需要接入的表,需要发送到rocketmq的topic和在redis中的key的映射。xxx.xxx.xxx[topic]=redisKey
data.access.topicKeyMap[weibo_hbase]=data:sync:notice:suanzi:weibo:back
data.access.topicKeyMap[wechat_hbase]=data:sync:notice:suanzi:wechat:back
获取配置,其余的直接@Value("${}")
:
@Setter
@Getter
@Configuration
@ConfigurationProperties(prefix = "data.access")
public class AccessRedisMqConfig {
private Map<String, String> topicKeyMap = new HashMap<>();
private long redisDataSize = 50;
private int retryNum = 10;
}
开启接入:
@Component
public class AdapterRunner implements ApplicationRunner {
@Resource
private DataAccessService dataAccessService;
@Override
public void run(ApplicationArguments args) {
System.out.println("项目已启动,开始接入数据到RocketMQ……");
dataAccessService.accessData2Mq();
}
}
其他代码其实也在分析里了。
mq发送问题
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: invokeAsync call timeout
at org.apache.rocketmq.remoting.Netty.NettyRemotinGClient.invokeAsync(NettyRemotingClient.java:525)
at org.apache.rocketmq.client.impl.MQClientapiImpl.sendMessageAsync(MQClientAPIImpl.java:523)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.onExceptionImpl(MQClientAPIImpl.java:610)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.access$100(MQClientAPIImpl.java:167)
at org.apache.rocketmq.client.impl.MQClientAPIImpl$1.operationComplete(MQClientAPIImpl.java:572)
at org.apache.rocketmq.remoting.netty.ResponseFuture.executeInvokeCallback(ResponseFuture.java:54)
at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$2.run(NettyRemotingAbstract.java:319)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Wo
上面分析也说了,注意发送速度,有多少资源就接入多快。还有注意相关三个端口是否开放。
程序很简单,主要涉及方案的是,获取redis的list数据时,是考虑效率,及加入重试策略,保证数据不丢失等。
--结束END--
本文标题: Redis+Hbase+RocketMQ 实际使用问题案例讲解
本文链接: https://lsjlt.com/news/178312.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-03-01
2024-03-01
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0