这篇文章主要讲解了“如何将数据按指定格式存入ZooKeeper”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“如何将数据按指定格式存入zookeeper”吧!环境: Scala版本
这篇文章主要讲解了“如何将数据按指定格式存入ZooKeeper”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“如何将数据按指定格式存入zookeeper”吧!
环境:
Scala版本:2.11.8
zookeeper版本:3.4.5-cdh6.7.0
package com.ruozedata.zkimport java.util.concurrent.TimeUnitimport org.apache.curator.framework.CuratorFrameworkFactoryimport org.apache.curator.framework.recipes.locks.InterProceSSMuteximport org.apache.curator.retry.ExponentialBackoffRetryimport org.slf4j.LoggerFactoryimport scala.collection.JavaConversions._import scala.collection.mutableobject ZkConnectApp{ val LOG = LoggerFactory.getLogger(ZkConnectApp.getClass) val client = { val client = CuratorFrameworkFactory .builder .connectString("172.16.100.31:2181") .retryPolicy(new ExponentialBackoffRetry(1000, 3)) .namespace("consumers") .build() client.start() client } def lock(path: String)(body: => Unit) { val lock = new InterProcessMutex(client, path) lock.acquire() try { body } finally { lock.release() } } def tryDo(path: String)(body: => Unit): Boolean = { val lock = new InterProcessMutex(client, path) if (!lock.acquire(10, TimeUnit.SECONDS)) { LOG.info(s"不能获得锁 {$path},已经有任务在运行,本次任务退出") return false } try { LOG.info("获准运行") body true } finally { lock.release() LOG.info(s"释放锁 {$path}") } } //zookeeper创建路径 def ensurePathExists(path: String): Unit = { if (client.checkExists().forPath(path) == null) { client.create().creatingParentsIfNeeded().forPath(path) } } case class OffsetRange( val topic:String, // 主题 val partition:Int, // 分区 val fromOffset:Long, // 起始偏移量 val utilOffset:Long // 终止偏移量 ) def storeOffsets(OffsetsRanges:Array[OffsetRange],groupName:String)={ val offsetRootPath = s"/"+groupName if (client.checkExists().forPath(offsetRootPath) == null) { client.create().creatingParentsIfNeeded().forPath(offsetRootPath) } for(els <- OffsetsRanges ){ val data = String.valueOf(els.utilOffset).getBytes val path = s"$offsetRootPath/offsets/${els.topic}/partition/${els.partition}" // 创建路径 ensurePathExists(path) // 写入数据 client.setData().forPath(path, data) } } case class TopicAndPartition( topic:String, // 主题 partition:Int // 分区 ) def obtainOffsets(topic:String,groupName:String):Map[TopicAndPartition,Long]={ // 定义一个空的HashMap val maps = mutable.HashMap[TopicAndPartition,Long]() // offset的路径 val offsetRootPath = s"/"+groupName+"/offsets/"+topic+"/partition" // 判断路径是否存在 val stat = client.checkExists().forPath(s"$offsetRootPath") if (stat == null ){ println(stat) // 路径不存在 就将路径打印在控制台,检查路径 }else{ // 获取 offsetRootPath路径下一级的所有子目录 // 我们这里是获取的所有分区 val children = client.getChildren.forPath(s"$offsetRootPath") // 遍历所有的分区 for ( lines <- children ){ // 获取分区的数据 val data = new String(client.getData().forPath(s"$offsetRootPath/"+lines)).toLong // 将 topic partition 和数据赋值给 maps maps(TopicAndPartition(topic,lines.toInt)) = data } } // 按partition排序后 返回map对象 maps.toList.sortBy(_._1.partition).toMap } def main(args: Array[String]) { //定义初始化数据 val off1 = OffsetRange("ruoze_offset_topic",0,0,7) val off2 = OffsetRange("ruoze_offset_topic",1,0,3) val off3 = OffsetRange("ruoze_offset_topic",2,0,5) val arr = Array(off1,off2,off3) //获取到namespace// println(client.getNamespace) // 创建路径// val offsetRootPath = "/G322"// if (client.checkExists().forPath(offsetRootPath) == null) {// client.create().creatingParentsIfNeeded().forPath(offsetRootPath)// } //存储值 storeOffsets(arr,"G322") //获取值 val result = obtainOffsets("ruoze_offset_topic","G322") for (map <- result){ println("topic:"+map._1.topic+"\t" +"partition:"+map._1.partition+"\t"+"offset:"+map._2) } }}
感谢各位的阅读,以上就是“如何将数据按指定格式存入zookeeper”的内容了,经过本文的学习后,相信大家对如何将数据按指定格式存入zookeeper这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是编程网,小编将为大家推送更多相关知识点的文章,欢迎关注!
--结束END--
本文标题: 如何将数据按指定格式存入zookeeper
本文链接: https://lsjlt.com/news/231467.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0