Java使用pulsar-flink-connector读取pulsar catalog元数据代码剖析
作者:CNBLOG 时间:2023-11-05 17:25:41
简介
通过 pulsar-flink-connector 读取到 Apache pulsar 中的namespaces、topics的元数据信息。
pulsar-flink-connector 的 github: https://github.com/streamnative/pulsar-flink
Maven
<dependency>
<groupId>io.streamnative.connectors</groupId>
<artifactId>pulsar-flink-connector-2.11-1.12</artifactId>
<version>2.7.3</version>
</dependency>
<!-- JAR repositories -->
<repositories>
<repository>
<id>central</id>
<layout>default</layout>
<url>https://repo1.maven.org/maven2</url>
</repository>
<repository>
<id>bintray-streamnative-maven</id>
<name>bintray</name>
<url>https://dl.bintray.com/streamnative/maven</url>
</repository>
</repositories>
CODE
使用PulsarMetadataReader获取元数据
package com.levi.demo;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Test.
*
* @author levi
* @version 1.0
**/
public class Test {
public static void main(String[] args) {
final ClientConfigurationData configurationData = new ClientConfigurationData();
configurationData.setServiceUrl("pulsar://127.0.0.1:6650");
//Your Pulsar Token
final AuthenticationToken token =
new AuthenticationToken(
"eyJxxxxxxxxxxx.eyxxxxxxxxxxxxx.xxxxxxxxxxx");
configurationData.setAuthentication(token);
try (final PulsarMetadataReader reader =
new PulsarMetadataReader("http://127.0.0.1:8443",
configurationData,
"",
new HashMap(),
-1,
-1)) {
//获取namespaces
final List<String> namespaces = reader.listNamespaces();
System.out.println("namespaces: " + namespaces.toString());
for (final String namespace : namespaces) {
//获取Topics
final List<String> topics = reader.getTopics(namespace);
System.out.println("topic: " + topics.toString());
for (String topic : topics) {
//获取字段SchemaInfo
final SchemaInfo schemaInfo = reader.getPulsarSchema(topic);
final String name = schemaInfo.getName();
System.out.println("SchemaName:" + name); //topicName
final SchemaType type = schemaInfo.getType();
System.out.println("SchemaType:" + type.toString());// "JSON"...
final Map<String, String> properties = schemaInfo.getProperties();
System.out.println(properties);
final String schemaDefinition = schemaInfo.getSchemaDefinition();
System.out.println(schemaDefinition); // Field info.
}
}
} catch (IOException | PulsarAdminException e) {
e.printStackTrace();
}
}
}
来源:https://www.cnblogs.com/levi125/p/14500436.html
标签:Java,pulsar,catalog,元数据
![](/images/zang.png)
![](/images/jiucuo.png)
猜你喜欢
Seata AT模式TransactionHook被删除探究
2022-01-12 14:56:49
![](https://img.aspxhome.com/file/2023/2/63492_0s.png)
C#实现启动,关闭与查找进程的方法
2023-01-12 23:27:35
![](https://img.aspxhome.com/file/2023/0/108810_0s.jpg)
springboot如何重定向外部网页
2022-11-12 05:19:19
java编写Http服务器下载工具
2021-11-08 08:07:38
android通过usb读取U盘的方法
2023-03-14 07:43:25
Mybatis Generator最完美配置文件详解(完整版)
2021-06-13 05:11:06
C#对DataTable中的某列进行分组
2021-12-10 23:06:45
![](https://img.aspxhome.com/file/2023/5/104375_0s.jpg)
Android通过记住密码功能学习数据存储类SharedPreferences详解及实例
2023-05-21 21:37:49
![](https://img.aspxhome.com/file/2023/1/95011_0s.png)
Android 仿QQ头像自定义截取功能
2023-05-14 00:13:21
![](https://img.aspxhome.com/file/2023/9/128979_0s.gif)
服务器端C#实现的CSS解析器
2022-01-25 12:26:20
C#网站生成静态页面的实例讲解
2021-11-01 16:33:08
java实现马踏棋盘算法(骑士周游问题)
2022-03-17 20:29:46
![](https://img.aspxhome.com/file/2023/1/94361_0s.jpg)
一文了解Spring中拦截器的原理与使用
2023-06-30 13:28:05
![](https://img.aspxhome.com/file/2023/5/65735_0s.png)
API处理Android安全距离详情
2023-12-24 05:16:19
Android实现调用震动的方法
2021-10-03 19:33:24
Java使用OpenCV3.2实现视频读取与播放
2023-11-23 06:14:02
![](https://img.aspxhome.com/file/2023/5/82425_0s.jpg)
Android使用GPS获取用户地理位置并监听位置变化的方法
2022-03-29 14:24:17
![](https://img.aspxhome.com/file/2023/8/92628_0s.png)
java内存泄漏与内存溢出关系解析
2023-08-07 01:58:59
C#实现农历日历的方法
2022-08-17 21:27:29
导入项目出现Java多个工程相互引用异常A cycle was detected in the build path of project的解决办法
2023-06-26 16:27:17
![](https://img.aspxhome.com/file/2023/5/101935_0s.png)