本篇文章为大家展示了Java中怎么利用pulsar-flink-connector读取pulsar catalog元数据,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。简介通过 pulsar-flin
本篇文章为大家展示了Java中怎么利用pulsar-flink-connector读取pulsar catalog元数据,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。
通过 pulsar-flink-connector 读取到 Apache pulsar 中的namespaces、topics的元数据信息。
pulsar-flink-connector 的 GitHub: https://github.com/streamnative/pulsar-flink
<dependency> <groupId>io.streamnative.connectors</groupId> <artifactId>pulsar-flink-connector-2.11-1.12</artifactId> <version>2.7.3</version> </dependency> <!-- jar repositories --> <repositories> <repository> <id>central</id> <layout>default</layout> <url>Https://repo1.maven.org/maven2</url> </repository> <repository> <id>bintray-streamnative-maven</id> <name>bintray</name> <url>https://dl.bintray.com/streamnative/maven</url> </repository> </repositories>
使用PulsarMetadataReader获取元数据
package com.levi.demo;import org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader;import org.apache.pulsar.client.admin.PulsarAdminException;import org.apache.pulsar.client.impl.auth.AuthenticationToken;import org.apache.pulsar.client.impl.conf.ClientConfigurationData;import org.apache.pulsar.common.schema.Schemainfo;import org.apache.pulsar.common.schema.SchemaType;import java.io.IOException;import java.util.HashMap;import java.util.List;import java.util.Map;public class Test { public static void main(String[] args) { final ClientConfigurationData configurationData = new ClientConfigurationData(); configurationData.setServiceUrl("pulsar://127.0.0.1:6650"); //Your Pulsar Token final AuthenticationToken token = new AuthenticationToken( "eyJxxxxxxxxxxx.eyxxxxxxxxxxxxx.xxxxxxxxxxx"); configurationData.setAuthentication(token); try (final PulsarMetadataReader reader = new PulsarMetadataReader("http://127.0.0.1:8443", configurationData, "", new HashMap(), -1, -1)) { //获取namespaces final List<String> namespaces = reader.listNamespaces(); System.out.println("namespaces: " + namespaces.toString()); for (final String namespace : namespaces) { //获取Topics final List<String> topics = reader.getTopics(namespace); System.out.println("topic: " + topics.toString()); for (String topic : topics) { //获取字段SchemaInfo final SchemaInfo schemaInfo = reader.getPulsarSchema(topic); final String name = schemaInfo.getName(); System.out.println("SchemaName:" + name); //topicName final SchemaType type = schemaInfo.getType(); System.out.println("SchemaType:" + type.toString());// "JSON"... final Map<String, String> properties = schemaInfo.getProperties(); System.out.println(properties); final String schemaDefinition = schemaInfo.getSchemaDefinition(); System.out.println(schemaDefinition); // Field info. } } } catch (IOException | PulsarAdminException e) { e.printStackTrace(); } }}
上述内容就是Java中怎么利用pulsar-flink-connector读取pulsar catalog元数据,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注编程网精选频道。
--结束END--
本文标题: Java中怎么利用pulsar-flink-connector读取pulsar catalog元数据
本文链接: https://lsjlt.com/news/299402.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0