python消费kafka数据批量插入到es的方法

作者:亮亮爱吃虾 时间:2023-04-30 18:10:47 

1、es的批量插入

这是为了方便后期配置的更改,把配置信息放在logging.conf中

用elasticsearch来实现批量操作,先安装依赖包,sudo pip install Elasticsearch2


from elasticsearch import Elasticsearch
class ImportEsData:

logging.config.fileConfig("logging.conf")
 logger = logging.getLogger("msg")

def __init__(self,hosts,index,type):
   self.es = Elasticsearch(hosts=hosts.strip(',').split(','), timeout=5000)
   self.index = index
   self.type = type

def set_date(self,data):
   # 批量处理
   # es.index(index="test-index",doc_type="test-type",id=42,body={"any":"data","timestamp":datetime.now()})
   self.es.index(index=self.index,doc_type=self.index,body=data)

2、使用pykafka消费kafka

1.因为kafka是0.8,pykafka不支持zk,只能用get_simple_consumer来实现

2.为了实现多个应用同时消费而且不重消费,所以一个应用消费一个partition

3. 为是确保消费数据量在不满足10000这个批量值,能在一个时间范围内插入到es中,这里设置consumer_timeout_ms一个超时等待时间,退出等待消费阻塞。

4.退出等待消费阻塞后导致无法再消费数据,因此在获取self.consumer 的外层加入了while True 一个死循环


#!/usr/bin/python
# -*- coding: UTF-8 -*-
from pykafka import KafkaClient
import logging
import logging.config
from ConfigUtil import ConfigUtil
import datetime

class KafkaPython:
 logging.config.fileConfig("logging.conf")
 logger = logging.getLogger("msg")
 logger_data = logging.getLogger("data")

def __init__(self):
   self.server = ConfigUtil().get("kafka","kafka_server")
   self.topic = ConfigUtil().get("kafka","topic")
   self.group = ConfigUtil().get("kafka","group")
   self.partition_id = int(ConfigUtil().get("kafka","partition"))
   self.consumer_timeout_ms = int(ConfigUtil().get("kafka","consumer_timeout_ms"))
   self.consumer = None
   self.hosts = ConfigUtil().get("es","hosts")
   self.index_name = ConfigUtil().get("es","index_name")
   self.type_name = ConfigUtil().get("es","type_name")

def getConnect(self):
   client = KafkaClient(self.server)
   topic = client.topics[self.topic]
   p = topic.partitions
   ps={p.get(self.partition_id)}

self.consumer = topic.get_simple_consumer(
     consumer_group=self.group,
     auto_commit_enable=True,
     consumer_timeout_ms=self.consumer_timeout_ms,
     # num_consumer_fetchers=1,
     # consumer_id='test1',
     partitions=ps
     )
   self.starttime = datetime.datetime.now()

def beginConsumer(self):
   print("beginConsumer kafka-python")
   imprtEsData = ImportEsData(self.hosts,self.index_name,self.type_name)
   #创建ACTIONS
   count = 0
   ACTIONS = []

while True:
     endtime = datetime.datetime.now()
     print (endtime - self.starttime).seconds
     for message in self.consumer:
       if message is not None:
         try:
           count = count + 1
           # print(str(message.partition.id)+","+str(message.offset)+","+str(count))
           # self.logger.info(str(message.partition.id)+","+str(message.offset)+","+str(count))
           action = {
             "_index": self.index_name,
             "_type": self.type_name,
             "_source": message.value
           }
           ACTIONS.append(action)
           if len(ACTIONS) >= 10000:
             imprtEsData.set_date(ACTIONS)
             ACTIONS = []
             self.consumer.commit_offsets()
             endtime = datetime.datetime.now()
             print (endtime - self.starttime).seconds
             #break
         except (Exception) as e:
           # self.consumer.commit_offsets()
           print(e)
           self.logger.error(e)
           self.logger.error(str(message.partition.id)+","+str(message.offset)+","+message.value+"\n")
           # self.logger_data.error(message.value+"\n")
         # self.consumer.commit_offsets()

if len(ACTIONS) > 0:
       self.logger.info("等待时间超过,consumer_timeout_ms,把集合数据插入es")
       imprtEsData.set_date(ACTIONS)
       ACTIONS = []
       self.consumer.commit_offsets()

def disConnect(self):
   self.consumer.close()

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
class ImportEsData:

logging.config.fileConfig("logging.conf")
 logger = logging.getLogger("msg")

def __init__(self,hosts,index,type):
   self.es = Elasticsearch(hosts=hosts.strip(',').split(','), timeout=5000)
   self.index = index
   self.type = type

def set_date(self,data):
   # 批量处理
   success = bulk(self.es, data, index=self.index, raise_on_error=True)
   self.logger.info(success)

3、运行


if __name__ == '__main__':
 kp = KafkaPython()
 kp.getConnect()
 kp.beginConsumer()
 # kp.disConnect()

注:简单的写了一个从kafka中读取数据到一个list里,当数据达到一个阈值时,在批量插入到 es的插件

现在还在批量的压测中。。。

来源:https://blog.csdn.net/liagliang/article/details/78712475

标签:python,kafka,es
0
投稿

猜你喜欢

  • 如何使用SublimeText3配置 PHP IDE环境

    2024-04-30 09:58:51
  • js实现屏蔽默认快捷键调用自定义事件示例

    2023-09-05 09:28:31
  • 详解Python编程中time模块的使用

    2023-08-29 16:57:08
  • python中pdb模块实例用法

    2023-10-14 19:04:48
  • JavaScript监听触摸事件代码实例

    2023-08-20 19:12:54
  • 详解Java使用sqlite 数据库如何生成db文件

    2024-01-17 17:07:47
  • golang获取网卡信息操作

    2024-02-22 01:57:17
  • python的sys.path模块路径添加方式

    2021-04-06 05:52:30
  • Python2和Python3中@abstractmethod使用方法

    2021-12-29 08:55:36
  • sql 2000清空后让表的id从1开始等数据库操作

    2024-01-18 11:30:15
  • python报错TypeError: ‘NoneType‘ object is not subscriptable的解决方法

    2023-01-11 08:11:07
  • laravel入门知识点整理

    2023-05-31 13:42:33
  • 深入理解Python虚拟机中的Code obejct

    2023-05-05 02:39:31
  • vue pages 多入口项目 + chainWebpack 全局引用缩写说明

    2024-05-21 10:30:37
  • python 多个参数不为空校验方法

    2022-12-15 10:48:56
  • Vue.js中v-bind指令的用法介绍

    2024-04-30 10:18:30
  • python dataframe astype 字段类型转换方法

    2022-02-19 07:58:50
  • python框架Django实战商城项目之工程搭建过程图文详解

    2022-12-16 16:25:57
  • SQL Server 2008 安装和配置图解教程(附官方下载地址)

    2024-01-12 20:28:17
  • python实现不同数据库间数据同步功能

    2024-01-18 15:58:52
  • asp之家 网络编程 m.aspxhome.com