Python 调用 ES、Solr、Phoenix的示例代码

作者:LeoZhanggg 时间:2023-10-03 04:52:57 


#!/usr/bin/env python
# -*- coding:utf-8 -*-
# *************************************
# @Time  : 2019/8/12
# @Author : Zhang Fan
# @Desc  : Library
# @File  : MyDatabases.py
# @Update : 2019/8/23
# *************************************
import elasticsearch
import phoenixdb
import pysolr
import pymysql

class MyELS(object):
 """
 ===================================================================
 =====================    MyELS    =========================
 ===================================================================
 """
 def __init__(self):
   self.els_conn = None

def connect_to_els(self, host, port):
   """
   连接到ElasticSearch服务器.
   """
   self.els_conn = elasticsearch.Elasticsearch([{'host': host, 'port': port}])
   print('Executing : Connect To Elastic Search | %s' % self.els_conn)

def get_els_data(self, query, index):
   """
   获取ElasticSearch数据
   """
   print('Executing : Search | %s' % query)
   try:
     rst = self.els_conn.search(index=index, q=query)
     return rst['hits']
   except Exception as e:
     print('Elastic Search Error | %s' % e)
     raise Exception(e)

class MyPhoenix(object):
 """
 ===================================================================
 =====================    MyPhoenix    ======================
 ===================================================================
 """
 def __init__(self):
   self.phoenix_conn = None
   self.phoenix_cursor = None

def connect_to_phoenix(self, host, port=8765):
   """
   连接到phoenix服务器
   """
   address = 'http://{0}:{1}/'.format(host, port)
   print('Executing : Connect To Phoenix | %s' % address)
   self.phoenix_conn = phoenixdb.connect(address, autocommit=True)
   self.phoenix_cursor = self.phoenix_conn.cursor()

def set_schema(self, sql, schema):
   """
   设置schema
   """
   pre_sub, sub, fol_sub = sql.upper().partition('FROM')
   fol_sub = ' ' + schema + '.' + fol_sub.strip()
   new_sql = ''.join([pre_sub, sub, fol_sub])
   return new_sql

def execute_phoenix_sql(self, sql):
   """
   执行sql语句
   """
   # sql = self.set_schema(sql, schema)
   print('Executing : Execute | %s' % sql)
   self.phoenix_cursor.execute(sql)

def get_from_phoenix(self, sql):
   """
   获取phoenix数据
   """
   # sql = self.set_schema(sql, schema)
   print('Executing : Query | %s' % sql)
   try:
     self.phoenix_cursor.execute(sql)
   except Exception as e:
     print('Phoenix Error | %s' % e)
     raise Exception(e)
   return self.phoenix_cursor.fetchall()

def disconnect_from_phoenix(self):
   """
   断开phoenix连接
   """
   print('Executing : Disconnect From HBase')
   self.phoenix_cursor.close()
   self.phoenix_conn.close()

class MySolr(object):
 """
 ===================================================================
 =====================    MySolr    =========================
 ===================================================================
 """
 def __init__(self):
   self.solr_conn = None
   self.base_url = None

def connect_to_solr(self, address, selector):
   """连接到solr服务器.
   """
   self.base_url = 'http://{0}/solr/{1}/'.format(address, selector)
   self.solr_conn = pysolr.Solr(self.base_url)
   print('Executing : Connect To Solr | %s' % self.base_url)

def get_solr_data(self, query):
   """
   获取solr数据
   """
   results = list()
   print('Executing : Search | %s' % query)
   try:
     items = self.solr_conn.search(query)
     for item in items:
       results.append(item)
   except Exception as e:
     print('Solr Error | %s' % e)
     raise Exception(e)
   return results

def add_solr_data(self, data):
   """
   添加solr数据
   """
   print('Executing : add | %s' % data)
   try:
     self.solr_conn.add([data])
     self.solr_conn.commit()
   except Exception as e:
     print('Solr Error | %s' % e)
     raise Exception(e)

def del_solr_byId(self, data):
   """
   删除solr数据
   """
   print('Executing : del | %s' % data)
   try:
     self.solr_conn.delete(id=data)
     self.solr_conn.commit()
   except Exception as e:
     print('Solr Error | %s' % e)
     raise Exception(e)

if __name__ == '__main__':
 print('This is test.')
 ms = MySolr()
 me = MyELS()
 mp = MyPhoenix()

来源:https://www.cnblogs.com/leozhanggg/p/11401570.html

标签:Python,调用,ES,Solr,Phoenix
0
投稿

猜你喜欢

  • js实现微信聊天效果

    2024-04-16 09:14:33
  • Python的for和break循环结构中使用else语句的技巧

    2022-07-02 16:59:26
  • Python基础语法(Python基础知识点)

    2021-03-04 11:10:34
  • 详解Python给照片换底色(蓝底换红底)

    2023-02-03 02:07:45
  • 基于Vue实现页面切换左右滑动效果

    2023-07-02 16:55:10
  • Python使用Requests请求网页方式

    2022-10-08 06:01:09
  • Python网络爬虫信息提取mooc代码实例

    2022-01-02 12:18:23
  • 本地windows安装两个mysql服务器,配置主从同步

    2024-01-15 13:57:30
  • vue-router命名路由和编程式路由传参讲解

    2024-05-02 17:03:39
  • mysql如何分组统计并求出百分比

    2024-01-22 02:07:51
  • 初学者必读:提高SQL执行效率的几点建议

    2009-05-07 13:52:00
  • 一篇文章弄懂Python中的内建函数

    2023-01-18 00:36:36
  • 完美解决jupyter由于无法import新包的问题

    2021-09-19 01:21:11
  • Python的MongoDB模块PyMongo操作方法集锦

    2021-02-05 17:59:03
  • 教你轻松解决几种常见的SQL疑难问题

    2009-01-07 14:25:00
  • go redis实现滑动窗口限流的方式(redis版)

    2024-02-02 18:04:27
  • MYSQL主从库不同步故障一例解决方法

    2010-06-09 19:12:00
  • 解决 myJSFrame 框架中 Ajax 方法一处明显的内存泄露

    2008-03-09 19:14:00
  • pytorch常见的Tensor类型详解

    2022-10-25 19:34:01
  • Logo 设计准则[译]

    2009-07-22 21:05:00
  • asp之家 网络编程 m.aspxhome.com