对python操作kafka写入json数据的简单demo分享
作者:Liu-YanLin 时间:2023-05-04 21:24:08
如下所示:
安装kafka支持库pip install kafka-python
from kafka import KafkaProducer
import json
'''
生产者demo
向test_lyl2主题中循环写入10条json数据
注意事项:要写入json数据需加上value_serializer参数,如下代码
'''
producer = KafkaProducer(
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
bootstrap_servers=['192.168.12.101:6667','192.168.12.102:6667','192.168.12.103:6667']
)
for i in range(10):
data={
"name":"李四",
"age":23,
"gender":"男",
"id":i
}
producer.send('test_lyl2', data)
producer.close()
from kafka import KafkaConsumer
import json
'''
消费者demo
消费test_lyl2主题中的数据
注意事项:如需以json格式读取数据需加上value_deserializer参数
'''
consumer = KafkaConsumer('test_lyl2',group_id="lyl-gid1",
bootstrap_servers=['192.168.12.101:6667','192.168.12.102:6667','192.168.12.103:6667'],
auto_offset_reset='earliest',value_deserializer=json.loads
)
for message in consumer:
print(message.value)
来源:https://blog.csdn.net/qq_32502511/article/details/82109933
标签:python,kafka,json
0
投稿
猜你喜欢
Golang之defer 延迟调用操作
2023-08-04 18:21:48
浅谈golang的http cookie用法
2024-02-12 06:14:59
php+mysql实现简单登录注册修改密码网页
2024-04-30 08:49:54
python进行TCP端口扫描的实现
2021-04-18 12:33:32
Python写的服务监控程序实例
2022-09-01 13:12:31
解决Django删除migrations文件夹中的文件后出现的异常问题
2022-03-05 20:47:57
JS实现TITLE悬停长久显示效果完整示例
2024-04-16 09:54:00
Perl eval函数使用实例
2022-12-21 05:25:55
python解析html提取数据,并生成word文档实例解析
2023-10-19 13:50:38
关于mysql left join 查询慢时间长的踩坑总结
2024-01-23 14:20:29
JavaScript之解构赋值的理解
2024-04-10 10:44:07
深度学习TextLSTM的tensorflow1.14实现示例
2022-07-12 06:26:46
使用wxPython获取系统剪贴板中的数据的教程
2023-11-05 18:43:41
浅谈Python中eval的强大与危害
2022-05-03 08:15:13
关于数据库优化问题收集汇总
2024-01-28 19:05:28
如何用Frontpage下载别人的网站模板
2008-03-03 12:58:00
mysql数据库远程访问设置方法
2024-01-14 11:25:34
解析:快速的掌握 MySQL支持的操作系统
2008-12-31 17:18:00
常见JS前端接口校验方式总结
2024-04-17 10:00:00
python写入已存在的excel数据实例
2021-05-17 15:08:17