Python 调用 ES、Solr、Phoenix的示例代码
作者:LeoZhanggg 时间:2023-10-03 04:52:57
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# *************************************
# @Time : 2019/8/12
# @Author : Zhang Fan
# @Desc : Library
# @File : MyDatabases.py
# @Update : 2019/8/23
# *************************************
import elasticsearch
import phoenixdb
import pysolr
import pymysql
class MyELS(object):
"""
===================================================================
===================== MyELS =========================
===================================================================
"""
def __init__(self):
self.els_conn = None
def connect_to_els(self, host, port):
"""
连接到ElasticSearch服务器.
"""
self.els_conn = elasticsearch.Elasticsearch([{'host': host, 'port': port}])
print('Executing : Connect To Elastic Search | %s' % self.els_conn)
def get_els_data(self, query, index):
"""
获取ElasticSearch数据
"""
print('Executing : Search | %s' % query)
try:
rst = self.els_conn.search(index=index, q=query)
return rst['hits']
except Exception as e:
print('Elastic Search Error | %s' % e)
raise Exception(e)
class MyPhoenix(object):
"""
===================================================================
===================== MyPhoenix ======================
===================================================================
"""
def __init__(self):
self.phoenix_conn = None
self.phoenix_cursor = None
def connect_to_phoenix(self, host, port=8765):
"""
连接到phoenix服务器
"""
address = 'http://{0}:{1}/'.format(host, port)
print('Executing : Connect To Phoenix | %s' % address)
self.phoenix_conn = phoenixdb.connect(address, autocommit=True)
self.phoenix_cursor = self.phoenix_conn.cursor()
def set_schema(self, sql, schema):
"""
设置schema
"""
pre_sub, sub, fol_sub = sql.upper().partition('FROM')
fol_sub = ' ' + schema + '.' + fol_sub.strip()
new_sql = ''.join([pre_sub, sub, fol_sub])
return new_sql
def execute_phoenix_sql(self, sql):
"""
执行sql语句
"""
# sql = self.set_schema(sql, schema)
print('Executing : Execute | %s' % sql)
self.phoenix_cursor.execute(sql)
def get_from_phoenix(self, sql):
"""
获取phoenix数据
"""
# sql = self.set_schema(sql, schema)
print('Executing : Query | %s' % sql)
try:
self.phoenix_cursor.execute(sql)
except Exception as e:
print('Phoenix Error | %s' % e)
raise Exception(e)
return self.phoenix_cursor.fetchall()
def disconnect_from_phoenix(self):
"""
断开phoenix连接
"""
print('Executing : Disconnect From HBase')
self.phoenix_cursor.close()
self.phoenix_conn.close()
class MySolr(object):
"""
===================================================================
===================== MySolr =========================
===================================================================
"""
def __init__(self):
self.solr_conn = None
self.base_url = None
def connect_to_solr(self, address, selector):
"""连接到solr服务器.
"""
self.base_url = 'http://{0}/solr/{1}/'.format(address, selector)
self.solr_conn = pysolr.Solr(self.base_url)
print('Executing : Connect To Solr | %s' % self.base_url)
def get_solr_data(self, query):
"""
获取solr数据
"""
results = list()
print('Executing : Search | %s' % query)
try:
items = self.solr_conn.search(query)
for item in items:
results.append(item)
except Exception as e:
print('Solr Error | %s' % e)
raise Exception(e)
return results
def add_solr_data(self, data):
"""
添加solr数据
"""
print('Executing : add | %s' % data)
try:
self.solr_conn.add([data])
self.solr_conn.commit()
except Exception as e:
print('Solr Error | %s' % e)
raise Exception(e)
def del_solr_byId(self, data):
"""
删除solr数据
"""
print('Executing : del | %s' % data)
try:
self.solr_conn.delete(id=data)
self.solr_conn.commit()
except Exception as e:
print('Solr Error | %s' % e)
raise Exception(e)
if __name__ == '__main__':
print('This is test.')
ms = MySolr()
me = MyELS()
mp = MyPhoenix()
来源:https://www.cnblogs.com/leozhanggg/p/11401570.html
标签:Python,调用,ES,Solr,Phoenix
0
投稿
猜你喜欢
js实现微信聊天效果
2024-04-16 09:14:33
Python的for和break循环结构中使用else语句的技巧
2022-07-02 16:59:26
Python基础语法(Python基础知识点)
2021-03-04 11:10:34
详解Python给照片换底色(蓝底换红底)
2023-02-03 02:07:45
基于Vue实现页面切换左右滑动效果
2023-07-02 16:55:10
Python使用Requests请求网页方式
2022-10-08 06:01:09
Python网络爬虫信息提取mooc代码实例
2022-01-02 12:18:23
本地windows安装两个mysql服务器,配置主从同步
2024-01-15 13:57:30
vue-router命名路由和编程式路由传参讲解
2024-05-02 17:03:39
mysql如何分组统计并求出百分比
2024-01-22 02:07:51
初学者必读:提高SQL执行效率的几点建议
2009-05-07 13:52:00
一篇文章弄懂Python中的内建函数
2023-01-18 00:36:36
完美解决jupyter由于无法import新包的问题
2021-09-19 01:21:11
Python的MongoDB模块PyMongo操作方法集锦
2021-02-05 17:59:03
教你轻松解决几种常见的SQL疑难问题
2009-01-07 14:25:00
go redis实现滑动窗口限流的方式(redis版)
2024-02-02 18:04:27
MYSQL主从库不同步故障一例解决方法
2010-06-09 19:12:00
解决 myJSFrame 框架中 Ajax 方法一处明显的内存泄露
2008-03-09 19:14:00
pytorch常见的Tensor类型详解
2022-10-25 19:34:01
Logo 设计准则[译]
2009-07-22 21:05:00