Java使用pulsar-flink-connector读取pulsar catalog元数据代码剖析

作者:CNBLOG 时间:2023-11-05 17:25:41 

简介

通过 pulsar-flink-connector 读取到 Apache pulsar 中的namespaces、topics的元数据信息。
pulsar-flink-connector 的 github: https://github.com/streamnative/pulsar-flink

Maven


<dependency>
  <groupId>io.streamnative.connectors</groupId>
  <artifactId>pulsar-flink-connector-2.11-1.12</artifactId>
  <version>2.7.3</version>
</dependency>

<!-- JAR repositories -->
  <repositories>
       <repository>
           <id>central</id>
           <layout>default</layout>
           <url>https://repo1.maven.org/maven2</url>
       </repository>
       <repository>
           <id>bintray-streamnative-maven</id>
           <name>bintray</name>
           <url>https://dl.bintray.com/streamnative/maven</url>
       </repository>
   </repositories>

CODE

使用PulsarMetadataReader获取元数据


package com.levi.demo;

import org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Test.
*
* @author levi
* @version 1.0
**/
public class Test {

public static void main(String[] args)  {
       final ClientConfigurationData configurationData = new ClientConfigurationData();
       configurationData.setServiceUrl("pulsar://127.0.0.1:6650");
       //Your Pulsar Token
       final AuthenticationToken token =
               new AuthenticationToken(
                       "eyJxxxxxxxxxxx.eyxxxxxxxxxxxxx.xxxxxxxxxxx");
       configurationData.setAuthentication(token);

try (final PulsarMetadataReader reader =
                    new PulsarMetadataReader("http://127.0.0.1:8443",
                            configurationData,
                            "",
                            new HashMap(),
                            -1,
                            -1)) {
           //获取namespaces
           final List<String> namespaces = reader.listNamespaces();
           System.out.println("namespaces: " + namespaces.toString());

for (final String namespace : namespaces) {
               //获取Topics
               final List<String> topics = reader.getTopics(namespace);
               System.out.println("topic: " + topics.toString());

for (String topic : topics) {
                   //获取字段SchemaInfo
                   final SchemaInfo schemaInfo = reader.getPulsarSchema(topic);
                   final String name = schemaInfo.getName();
                   System.out.println("SchemaName:" + name); //topicName
                   final SchemaType type = schemaInfo.getType();
                   System.out.println("SchemaType:" + type.toString());// "JSON"...
                   final Map<String, String> properties = schemaInfo.getProperties();
                   System.out.println(properties);
                   final String schemaDefinition = schemaInfo.getSchemaDefinition();
                   System.out.println(schemaDefinition); // Field info.
               }
           }

} catch (IOException | PulsarAdminException e) {
           e.printStackTrace();
       }

}

}

来源:https://www.cnblogs.com/levi125/p/14500436.html

标签:Java,pulsar,catalog,元数据
0
投稿

猜你喜欢

  • Seata AT模式TransactionHook被删除探究

    2022-01-12 14:56:49
  • C#实现启动,关闭与查找进程的方法

    2023-01-12 23:27:35
  • springboot如何重定向外部网页

    2022-11-12 05:19:19
  • java编写Http服务器下载工具

    2021-11-08 08:07:38
  • android通过usb读取U盘的方法

    2023-03-14 07:43:25
  • Mybatis Generator最完美配置文件详解(完整版)

    2021-06-13 05:11:06
  • C#对DataTable中的某列进行分组

    2021-12-10 23:06:45
  • Android通过记住密码功能学习数据存储类SharedPreferences详解及实例

    2023-05-21 21:37:49
  • Android 仿QQ头像自定义截取功能

    2023-05-14 00:13:21
  • 服务器端C#实现的CSS解析器

    2022-01-25 12:26:20
  • C#网站生成静态页面的实例讲解

    2021-11-01 16:33:08
  • java实现马踏棋盘算法(骑士周游问题)

    2022-03-17 20:29:46
  • 一文了解Spring中拦截器的原理与使用

    2023-06-30 13:28:05
  • API处理Android安全距离详情

    2023-12-24 05:16:19
  • Android实现调用震动的方法

    2021-10-03 19:33:24
  • Java使用OpenCV3.2实现视频读取与播放

    2023-11-23 06:14:02
  • Android使用GPS获取用户地理位置并监听位置变化的方法

    2022-03-29 14:24:17
  • java内存泄漏与内存溢出关系解析

    2023-08-07 01:58:59
  • C#实现农历日历的方法

    2022-08-17 21:27:29
  • 导入项目出现Java多个工程相互引用异常A cycle was detected in the build path of project的解决办法

    2023-06-26 16:27:17
  • asp之家 软件编程 m.aspxhome.com