使用python生成大量数据写入es数据库并查询操作(2)

作者:IT之一小 时间:2024-01-14 02:10:27 

前言 :

上一篇文章:如何使用python生成大量数据写入es数据库并查询操作

模拟学生个人信息写入es数据库,包括姓名、性别、年龄、特点、科目、成绩,创建时间。

方案一

在写入数据时未提前创建索引mapping,而是每插入一条数据都包含了索引的信息。

示例代码:【多线程写入数据】【一次性写入10000*1000条数据】  【本人亲测耗时3266秒】

from elasticsearch import Elasticsearch
from elasticsearch import helpers
from datetime import datetime
from queue import Queue
import random
import time
import threading
es = Elasticsearch(hosts='http://127.0.0.1:9200')
# print(es)

names = ['刘一', '陈二', '张三', '李四', '王五', '赵六', '孙七', '周八', '吴九', '郑十']
sexs = ['男', '女']
age = [25, 28, 29, 32, 31, 26, 27, 30]
character = ['自信但不自负,不以自我为中心',
            '努力、积极、乐观、拼搏是我的人生信条',
            '抗压能力强,能够快速适应周围环境',
            '敢做敢拼,脚踏实地;做事认真负责,责任心强',
            '爱好所学专业,乐于学习新知识;对工作有责任心;踏实,热情,对生活充满激情',
            '主动性强,自学能力强,具有团队合作意识,有一定组织能力',
            '忠实诚信,讲原则,说到做到,决不推卸责任',
            '有自制力,做事情始终坚持有始有终,从不半途而废',
            '肯学习,有问题不逃避,愿意虚心向他人学习',
            '愿意以谦虚态度赞扬接纳优越者,权威者',
            '会用100%的热情和精力投入到工作中;平易近人',
            '为人诚恳,性格开朗,积极进取,适应力强、勤奋好学、脚踏实地',
            '有较强的团队精神,工作积极进取,态度认真']
subjects = ['语文', '数学', '英语', '生物', '地理']
grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86]
create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

def save_to_es(num):
   """
   批量写入数据到es数据库
   :param num:
   :return:
   """
   start = time.time()
   action = [
       {
           "_index": "personal_info_10000000",
           "_type": "doc",
           "_id": i,
           "_source": {
               "id": i,
               "name": random.choice(names),
               "sex": random.choice(sexs),
               "age": random.choice(age),
               "character": random.choice(character),
               "subject": random.choice(subjects),
               "grade": random.choice(grades),
               "create_time": create_time
           }
       } for i in range(10000 * num, 10000 * num + 10000)
   ]
   helpers.bulk(es, action)
   end = time.time()
   print(f"{num}耗时{end - start}s!")

def run():
   global queue
   while queue.qsize() > 0:
       num = queue.get()
       print(num)
       save_to_es(num)

if __name__ == '__main__':
   start = time.time()
   queue = Queue()
   # 序号数据进队列
   for num in range(1000):
       queue.put(num)

# 多线程执行程序
   consumer_lst = []
   for _ in range(10):
       thread = threading.Thread(target=run)
       thread.start()
       consumer_lst.append(thread)
   for consumer in consumer_lst:
       consumer.join()
   end = time.time()
   print('程序执行完毕!花费时间:', end - start)

运行结果:

使用python生成大量数据写入es数据库并查询操作(2)

使用python生成大量数据写入es数据库并查询操作(2)

使用python生成大量数据写入es数据库并查询操作(2)

 自动创建的索引mapping:

GET personal_info_10000000/_mapping
{
 "personal_info_10000000" : {
   "mappings" : {
     "properties" : {
       "age" : {
         "type" : "long"
       },
       "character" : {
         "type" : "text",
         "fields" : {
           "keyword" : {
             "type" : "keyword",
             "ignore_above" : 256
           }
         }
       },
       "create_time" : {
         "type" : "text",
         "fields" : {
           "keyword" : {
             "type" : "keyword",
             "ignore_above" : 256
           }
         }
       },
       "grade" : {
         "type" : "long"
       },
       "id" : {
         "type" : "long"
       },
       "name" : {
         "type" : "text",
         "fields" : {
           "keyword" : {
             "type" : "keyword",
             "ignore_above" : 256
           }
         }
       },
       "sex" : {
         "type" : "text",
         "fields" : {
           "keyword" : {
             "type" : "keyword",
             "ignore_above" : 256
           }
         }
       },
       "subject" : {
         "type" : "text",
         "fields" : {
           "keyword" : {
             "type" : "keyword",
             "ignore_above" : 256
           }
         }
       }
     }
   }
 }
}

方案二

1.顺序插入5000000条数据

先创建索引personal_info_5000000,确定好mapping后,再插入数据。

新建索引并设置mapping信息:

PUT personal_info_5000000
{
 "settings": {
   "number_of_shards": 3,
   "number_of_replicas": 1
 },
 "mappings": {
   "properties": {
     "id": {
       "type": "long"
     },
     "name": {
       "type": "text",
       "fields": {
         "keyword": {
           "type": "keyword",
           "ignore_above": 32
         }
       }
     },
     "sex": {
       "type": "text",
       "fields": {
         "keyword": {
           "type": "keyword",
           "ignore_above": 8
         }
       }
     },
     "age": {
       "type": "long"
     },
     "character": {
       "type": "text",
       "analyzer": "ik_smart",
       "fields": {
         "keyword": {
           "type": "keyword",
           "ignore_above": 256
         }
       }
     },
     "subject": {
       "type": "text",
       "fields": {
         "keyword": {
           "type": "keyword",
           "ignore_above": 256
         }
       }
     },
     "grade": {
       "type": "long"
     },
     "create_time": {
       "type": "date",
       "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
     }
   }
 }
}

查看新建索引信息:

GET personal_info_5000000

{
 "personal_info_5000000" : {
   "aliases" : { },
   "mappings" : {
     "properties" : {
       "age" : {
         "type" : "long"
       },
       "character" : {
         "type" : "text",
         "fields" : {
           "keyword" : {
             "type" : "keyword",
             "ignore_above" : 256
           }
         },
         "analyzer" : "ik_smart"
       },
       "create_time" : {
         "type" : "date",
         "format" : "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
       },
       "grade" : {
         "type" : "long"
       },
       "id" : {
         "type" : "long"
       },
       "name" : {
         "type" : "text",
         "fields" : {
           "keyword" : {
             "type" : "keyword",
             "ignore_above" : 32
           }
         }
       },
       "sex" : {
         "type" : "text",
         "fields" : {
           "keyword" : {
             "type" : "keyword",
             "ignore_above" : 8
           }
         }
       },
       "subject" : {
         "type" : "text",
         "fields" : {
           "keyword" : {
             "type" : "keyword",
             "ignore_above" : 256
           }
         }
       }
     }
   },
   "settings" : {
     "index" : {
       "routing" : {
         "allocation" : {
           "include" : {
             "_tier_preference" : "data_content"
           }
         }
       },
       "number_of_shards" : "3",
       "provided_name" : "personal_info_50000000",
       "creation_date" : "1663471072176",
       "number_of_replicas" : "1",
       "uuid" : "5DfmfUhUTJeGk1k4XnN-lQ",
       "version" : {
         "created" : "7170699"
       }
     }
   }
 }
}

开始插入数据:

示例代码: 【单线程写入数据】【一次性写入10000*500条数据】  【本人亲测耗时7916秒】

from elasticsearch import Elasticsearch
from datetime import datetime
from queue import Queue
import random
import time
import threading
es = Elasticsearch(hosts='http://127.0.0.1:9200')
# print(es)
names = ['刘一', '陈二', '张三', '李四', '王五', '赵六', '孙七', '周八', '吴九', '郑十']
sexs = ['男', '女']
age = [25, 28, 29, 32, 31, 26, 27, 30]
character = ['自信但不自负,不以自我为中心',
            '努力、积极、乐观、拼搏是我的人生信条',
            '抗压能力强,能够快速适应周围环境',
            '敢做敢拼,脚踏实地;做事认真负责,责任心强',
            '爱好所学专业,乐于学习新知识;对工作有责任心;踏实,热情,对生活充满激情',
            '主动性强,自学能力强,具有团队合作意识,有一定组织能力',
            '忠实诚信,讲原则,说到做到,决不推卸责任',
            '有自制力,做事情始终坚持有始有终,从不半途而废',
            '肯学习,有问题不逃避,愿意虚心向他人学习',
            '愿意以谦虚态度赞扬接纳优越者,权威者',
            '会用100%的热情和精力投入到工作中;平易近人',
            '为人诚恳,性格开朗,积极进取,适应力强、勤奋好学、脚踏实地',
            '有较强的团队精神,工作积极进取,态度认真']
subjects = ['语文', '数学', '英语', '生物', '地理']
grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86]
create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

# 添加程序耗时的功能
def timer(func):
   def wrapper(*args, **kwargs):
       start = time.time()
       res = func(*args, **kwargs)
       end = time.time()
       print('id{}共耗时约 {:.2f} 秒'.format(*args, end - start))
       return res

return wrapper

@timer
def save_to_es(num):
   """
   顺序写入数据到es数据库
   :param num:
   :return:
   """
   body = {
       "id": num,
       "name": random.choice(names),
       "sex": random.choice(sexs),
       "age": random.choice(age),
       "character": random.choice(character),
       "subject": random.choice(subjects),
       "grade": random.choice(grades),
       "create_time": create_time
   }
   # 此时若索引不存在时会新建
   es.index(index="personal_info_5000000", id=num, doc_type="_doc", document=body)

def run():
   global queue
   while queue.qsize() > 0:
       num = queue.get()
       print(num)
       save_to_es(num)

if __name__ == '__main__':
   start = time.time()
   queue = Queue()
   # 序号数据进队列
   for num in range(5000000):
       queue.put(num)

# 多线程执行程序
   consumer_lst = []
   for _ in range(10):
       thread = threading.Thread(target=run)
       thread.start()
       consumer_lst.append(thread)
   for consumer in consumer_lst:
       consumer.join()
   end = time.time()
   print('程序执行完毕!花费时间:', end - start)

运行结果:

使用python生成大量数据写入es数据库并查询操作(2)

2.批量插入5000000条数据

先创建索引personal_info_5000000_v2,确定好mapping后,再插入数据。

新建索引并设置mapping信息:

PUT personal_info_5000000_v2
{
 "settings": {
   "number_of_shards": 3,
   "number_of_replicas": 1
 },
 "mappings": {
   "properties": {
     "id": {
       "type": "long"
     },
     "name": {
       "type": "text",
       "fields": {
         "keyword": {
           "type": "keyword",
           "ignore_above": 32
         }
       }
     },
     "sex": {
       "type": "text",
       "fields": {
         "keyword": {
           "type": "keyword",
           "ignore_above": 8
         }
       }
     },
     "age": {
       "type": "long"
     },
     "character": {
       "type": "text",
       "analyzer": "ik_smart",
       "fields": {
         "keyword": {
           "type": "keyword",
           "ignore_above": 256
         }
       }
     },
     "subject": {
       "type": "text",
       "fields": {
         "keyword": {
           "type": "keyword",
           "ignore_above": 256
         }
       }
     },
     "grade": {
       "type": "long"
     },
     "create_time": {
       "type": "date",
       "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
     }
   }
 }
}

查看新建索引信息:

GET personal_info_5000000_v2

{
 "personal_info_5000000_v2" : {
   "aliases" : { },
   "mappings" : {
     "properties" : {
       "age" : {
         "type" : "long"
       },
       "character" : {
         "type" : "text",
         "fields" : {
           "keyword" : {
             "type" : "keyword",
             "ignore_above" : 256
           }
         },
         "analyzer" : "ik_smart"
       },
       "create_time" : {
         "type" : "date",
         "format" : "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
       },
       "grade" : {
         "type" : "long"
       },
       "id" : {
         "type" : "long"
       },
       "name" : {
         "type" : "text",
         "fields" : {
           "keyword" : {
             "type" : "keyword",
             "ignore_above" : 32
           }
         }
       },
       "sex" : {
         "type" : "text",
         "fields" : {
           "keyword" : {
             "type" : "keyword",
             "ignore_above" : 8
           }
         }
       },
       "subject" : {
         "type" : "text",
         "fields" : {
           "keyword" : {
             "type" : "keyword",
             "ignore_above" : 256
           }
         }
       }
     }
   },
   "settings" : {
     "index" : {
       "routing" : {
         "allocation" : {
           "include" : {
             "_tier_preference" : "data_content"
           }
         }
       },
       "number_of_shards" : "3",
       "provided_name" : "personal_info_5000000_v2",
       "creation_date" : "1663485323617",
       "number_of_replicas" : "1",
       "uuid" : "XBPaDn_gREmAoJmdRyBMAA",
       "version" : {
         "created" : "7170699"
       }
     }
   }
 }
}

批量插入数据:

通过elasticsearch模块导入helper,通过helper.bulk来批量处理大量的数据。首先将所有的数据定义成字典形式,各字段含义如下:

  • _index对应索引名称,并且该索引必须存在。

  • _type对应类型名称。

  • _source对应的字典内,每一篇文档的字段和值,可有有多个字段。

示例代码:  【程序中途异常,写入4714000条数据】

from elasticsearch import Elasticsearch
from elasticsearch import helpers
from datetime import datetime
from queue import Queue
import random
import time
import threading
es = Elasticsearch(hosts='http://127.0.0.1:9200')
# print(es)
names = ['刘一', '陈二', '张三', '李四', '王五', '赵六', '孙七', '周八', '吴九', '郑十']
sexs = ['男', '女']
age = [25, 28, 29, 32, 31, 26, 27, 30]
character = ['自信但不自负,不以自我为中心',
            '努力、积极、乐观、拼搏是我的人生信条',
            '抗压能力强,能够快速适应周围环境',
            '敢做敢拼,脚踏实地;做事认真负责,责任心强',
            '爱好所学专业,乐于学习新知识;对工作有责任心;踏实,热情,对生活充满激情',
            '主动性强,自学能力强,具有团队合作意识,有一定组织能力',
            '忠实诚信,讲原则,说到做到,决不推卸责任',
            '有自制力,做事情始终坚持有始有终,从不半途而废',
            '肯学习,有问题不逃避,愿意虚心向他人学习',
            '愿意以谦虚态度赞扬接纳优越者,权威者',
            '会用100%的热情和精力投入到工作中;平易近人',
            '为人诚恳,性格开朗,积极进取,适应力强、勤奋好学、脚踏实地',
            '有较强的团队精神,工作积极进取,态度认真']
subjects = ['语文', '数学', '英语', '生物', '地理']
grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86]
create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 添加程序耗时的功能
def timer(func):
   def wrapper(*args, **kwargs):
       start = time.time()
       res = func(*args, **kwargs)
       end = time.time()
       print('id{}共耗时约 {:.2f} 秒'.format(*args, end - start))
       return res

return wrapper

@timer
def save_to_es(num):
   """
   批量写入数据到es数据库
   :param num:
   :return:
   """
   action = [
       {
           "_index": "personal_info_5000000_v2",
           "_type": "_doc",
           "_id": i,
           "_source": {
               "id": i,
               "name": random.choice(names),
               "sex": random.choice(sexs),
               "age": random.choice(age),
               "character": random.choice(character),
               "subject": random.choice(subjects),
               "grade": random.choice(grades),
               "create_time": create_time
           }
       } for i in range(10000 * num, 10000 * num + 10000)
   ]
   helpers.bulk(es, action)
def run():
   global queue
   while queue.qsize() > 0:
       num = queue.get()
       print(num)
       save_to_es(num)
if __name__ == '__main__':
   start = time.time()
   queue = Queue()
   # 序号数据进队列
   for num in range(500):
       queue.put(num)

# 多线程执行程序
   consumer_lst = []
   for _ in range(10):
       thread = threading.Thread(target=run)
       thread.start()
       consumer_lst.append(thread)
   for consumer in consumer_lst:
       consumer.join()
   end = time.time()
   print('程序执行完毕!花费时间:', end - start)

运行结果:

使用python生成大量数据写入es数据库并查询操作(2)

使用python生成大量数据写入es数据库并查询操作(2)

3.批量插入50000000条数据

先创建索引personal_info_5000000_v2,确定好mapping后,再插入数据。

此过程是在上面批量插入的前提下进行优化,采用python生成器。

建立索引和mapping同上,直接上代码:

示例代码: 【程序中途异常,写入3688000条数据】

from elasticsearch import Elasticsearch
from elasticsearch import helpers
from datetime import datetime
from queue import Queue
import random
import time
import threading
es = Elasticsearch(hosts='http://127.0.0.1:9200')
# print(es)

names = ['刘一', '陈二', '张三', '李四', '王五', '赵六', '孙七', '周八', '吴九', '郑十']
sexs = ['男', '女']
age = [25, 28, 29, 32, 31, 26, 27, 30]
character = ['自信但不自负,不以自我为中心',
            '努力、积极、乐观、拼搏是我的人生信条',
            '抗压能力强,能够快速适应周围环境',
            '敢做敢拼,脚踏实地;做事认真负责,责任心强',
            '爱好所学专业,乐于学习新知识;对工作有责任心;踏实,热情,对生活充满激情',
            '主动性强,自学能力强,具有团队合作意识,有一定组织能力',
            '忠实诚信,讲原则,说到做到,决不推卸责任',
            '有自制力,做事情始终坚持有始有终,从不半途而废',
            '肯学习,有问题不逃避,愿意虚心向他人学习',
            '愿意以谦虚态度赞扬接纳优越者,权威者',
            '会用100%的热情和精力投入到工作中;平易近人',
            '为人诚恳,性格开朗,积极进取,适应力强、勤奋好学、脚踏实地',
            '有较强的团队精神,工作积极进取,态度认真']
subjects = ['语文', '数学', '英语', '生物', '地理']
grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86]
create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

# 添加程序耗时的功能
def timer(func):
   def wrapper(*args, **kwargs):
       start = time.time()
       res = func(*args, **kwargs)
       end = time.time()
       print('id{}共耗时约 {:.2f} 秒'.format(*args, end - start))
       return res

return wrapper
@timer
def save_to_es(num):
   """
   使用生成器批量写入数据到es数据库
   :param num:
   :return:
   """
   action = (
       {
           "_index": "personal_info_5000000_v3",
           "_type": "_doc",
           "_id": i,
           "_source": {
               "id": i,
               "name": random.choice(names),
               "sex": random.choice(sexs),
               "age": random.choice(age),
               "character": random.choice(character),
               "subject": random.choice(subjects),
               "grade": random.choice(grades),
               "create_time": create_time
           }
       } for i in range(10000 * num, 10000 * num + 10000)
   )
   helpers.bulk(es, action)

def run():
   global queue
   while queue.qsize() > 0:
       num = queue.get()
       print(num)
       save_to_es(num)

if __name__ == '__main__':
   start = time.time()
   queue = Queue()
   # 序号数据进队列
   for num in range(500):
       queue.put(num)

# 多线程执行程序
   consumer_lst = []
   for _ in range(10):
       thread = threading.Thread(target=run)
       thread.start()
       consumer_lst.append(thread)
   for consumer in consumer_lst:
       consumer.join()
   end = time.time()
   print('程序执行完毕!花费时间:', end - start)

运行结果:

使用python生成大量数据写入es数据库并查询操作(2)

使用python生成大量数据写入es数据库并查询操作(2)

来源:https://blog.csdn.net/weixin_44799217/article/details/126911481

标签:python,生成,数据,es,数据库
0
投稿

猜你喜欢

  • XHTML中用途相似的标签

    2008-03-24 19:33:00
  • Mysql的服务无法启动的1067错误解决

    2012-01-05 19:31:56
  • php json_encode与json_decode详解及实例

    2023-07-04 22:46:27
  • pycharm中import导入包呈现灰色的问题及解决

    2023-10-03 23:12:10
  • MYSQL教程:my.cnf缓存优化

    2009-07-30 08:58:00
  • python神经网络tf.name_scope和tf.variable_scope函数区别

    2021-01-24 13:10:48
  • Python简单几步画个钻石戒指

    2023-04-26 13:59:27
  • Python利用pythonping处理ping的示例详解

    2023-08-12 00:28:45
  • 全面了解CSS内置颜色(color)值

    2008-11-19 12:26:00
  • Django 返回json数据的实现示例

    2021-03-06 21:33:17
  • 初瞥 Google Chrome Frame

    2009-10-06 14:41:00
  • 一文详解Python中logging模块的用法

    2022-03-27 23:09:38
  • pip/anaconda修改镜像源,加快python模块安装速度的操作

    2022-06-01 10:42:26
  • Spring Data JPA的Audit功能审计数据库的变更

    2024-01-21 18:30:29
  • python 生成图形验证码的方法示例

    2021-10-01 23:31:03
  • JS数组去重的九种高阶方法(亲测有效)

    2024-04-19 10:57:45
  • javascript发表评论或者留言时的展开效果

    2024-05-02 17:29:19
  • Python制作一个仿QQ办公版的图形登录界面

    2021-06-23 20:08:49
  • asp如何防止计数器刷新计数?

    2009-11-22 19:19:00
  • Python 爬虫学习笔记之多线程爬虫

    2022-10-03 15:10:37
  • asp之家 网络编程 m.aspxhome.com