每天迁移MySQL历史数据到历史库Python脚本

作者:oeleven123456789 时间:2024-01-27 20:43:00 

本文实例为大家分享了Python每天迁移MySQL历史数据到历史库的具体代码,供大家参考,具体内容如下


#!/usr/bin/env python
# coding:utf-8
__author__ = 'John'

import MySQLdb
import sys
import datetime
import time

class ClassMigrate(object):
 def _get_argv(self):
   self.usage = """
     usage():
     python daily_migration.py --source=192.168.1.4:3306/db_name:tab_name/proxy/password \\
                   --dest=192.168.1.150:13301/db_name_archive:tab_name_201601/proxy/password \\
                   --delete_strategy=delete --primary_key=auto_id --date_col=ut --time_interval=180
     """
   if len(sys.argv) == 1:
     print self.usage
     sys.exit(1)
   elif sys.argv[1] == '--help' or sys.argv[1] == '-h':
       print self.usage
       sys.exit()
   elif len(sys.argv) > 2:
     for i in sys.argv[1:]:
       _argv = i.split('=')
       if _argv[0] == '--source':
         _list = _argv[1].split('/')
         self.source_host = _list[0].split(':')[0]
         self.source_port = int(_list[0].split(':')[1])
         self.source_db = _list[1].split(':')[0]
         self.source_tab = _list[1].split(':')[1]
         self.source_user = _list[2]
         self.source_password = _list[3]
       elif _argv[0] == '--dest':
         _list = _argv[1].split('/')
         self.dest_host = _list[0].split(':')[0]
         self.dest_port = int(_list[0].split(':')[1])
         self.dest_db = _list[1].split(':')[0]
         self.dest_tab = _list[1].split(':')[1]
         self.dest_user = _list[2]
         self.dest_password = _list[3]
       elif _argv[0] == '--delete_strategy':
         self.deleteStrategy = _argv[1]
         if self.deleteStrategy not in ('delete', 'drop'):
           print (self.usage)
           sys.exit(1)
       elif _argv[0] == '--primary_key':
         self.pk = _argv[1]
       elif _argv[0] == '--date_col':
         self.date_col = _argv[1]
       elif _argv[0] == '--time_interval':
         self.interval = _argv[1]
       else:
         print (self.usage)
         sys.exit(1)

def __init__(self):
   self._get_argv()
## --------------------------------------------------------------------
   self.sourcedb_conn_str = MySQLdb.connect(host=self.source_host, port=self.source_port, user=self.source_user, passwd=self.source_password, db=self.source_db, charset='utf8')
   self.sourcedb_conn_str.autocommit(True)
   self.destdb_conn_str = MySQLdb.connect(host=self.dest_host, port=self.dest_port, user=self.dest_user, passwd=self.dest_password, db=self.dest_db, charset='utf8')
   self.destdb_conn_str.autocommit(True)
## --------------------------------------------------------------------
   self.template_tab = self.source_tab + '_template'
   self.step_size = 20000
## --------------------------------------------------------------------
   self._migCompleteState = False
   self._deleteCompleteState = False
## --------------------------------------------------------------------
   self.source_cnt = ''
   self.source_min_id = ''
   self.source_max_id = ''
   self.source_checksum = ''
   self.dest_cn = ''
## --------------------------------------------------------------------
   self.today = time.strftime("%Y-%m-%d")
   # self.today = '2016-05-30 09:59:40'

def sourcedb_query(self, sql, sql_type):
   try:
     cr = self.sourcedb_conn_str.cursor()
     cr.execute(sql)
     if sql_type == 'select':
       return cr.fetchall()
     elif sql_type == 'dml':
       rows = self.sourcedb_conn_str.affected_rows()
       return rows
     else:
       return True
   except Exception, e:
     print (str(e) + "<br>")
     return False
   finally:
     cr.close()

def destdb_query(self, sql, sql_type, values=''):
   try:
     cr = self.destdb_conn_str.cursor()
     if sql_type == 'select':
       cr.execute(sql)
       return cr.fetchall()
     elif sql_type == 'insertmany':
       cr.executemany(sql, values)
       rows = self.destdb_conn_str.affected_rows()
       return rows
     else:
       cr.execute(sql)
       return True
   except Exception, e:
     print (str(e) + "<br>")
     return False
   finally:
     cr.close()

def create_table_from_source(self):
   '''''因为tab_name表的数据需要迁移到archive引擎表,所以不适合使用这种方式。 预留作其他用途。'''
   try:
     sql = "show create table %s;" % self.source_tab
     create_str = self.sourcedb_query(sql, 'select')[0][1]
     create_str = create_str.replace('CREATE TABLE', 'CREATE TABLE IF NOT EXISTS')
     self.destdb_query(create_str, 'ddl')
     return True
   except Exception, e:
     print (str(e) + "<br>")
     return False

def create_table_from_template(self):
   try:
     sql = 'CREATE TABLE IF NOT EXISTS %s like %s;' % (self.dest_tab, self.template_tab)
     state = self.destdb_query(sql, 'ddl')
     if state:
       return True
     else:
       return False
   except Exception, e:
     print (str(e + "<br>") + "<br>")
     return False

def get_min_max(self):
   """ 创建目标表、并获取源表需要迁移的总条数、最小id、最大id """
   try:
     print ("\nStarting Migrate at -- %s <br>") % (datetime.datetime.now().__str__())
     sql = """select count(*),IFNULL(min(%s),-1),IFNULL(max(%s),-1) from %s where %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00') \
          and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """ \
           % (self.pk, self.pk, self.source_tab, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)
     q = self.sourcedb_query(sql, 'select')
     self.source_cnt = q[0][0]
     self.source_min_id = q[0][1]
     self.source_max_id = q[0][2]
     self.source_checksum = str(self.source_cnt) + '_' + str(self.source_min_id) + '_' + str(self.source_max_id)
     if self.source_cnt == 0 or self.source_min_id == -1 or self.source_max_id == -1:
       print ("There is 0 record in source table been matched! <br>")
       return False
     else:
       return True
   except Exception, e:
     print (str(e) + "<br>")
     return False

def migrate_2_destdb(self):
   try:
     get_min_max_id = self.get_min_max()
     if get_min_max_id:
       k = self.source_min_id
       desc_sql = "desc %s;" % self.source_tab
       # self.filed = []
       cols = self.sourcedb_query(desc_sql, 'select')
       # for j in cols:
       #   self.filed.append(j[0])
       fileds = "%s," * len(cols) # 源表有多少个字段,就拼凑多少个%s,拼接到insert语句
       fileds = fileds.rstrip(',')
       while k <= self.source_max_id:
         sql = """select * from %s where %s >= %d and %s< %d \
              and %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00') \
              and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """\
            % (self.source_tab, self.pk, k, self.pk, k+self.step_size, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)
         print ("\n%s <br>") % sql
         starttime = datetime.datetime.now()
         results = self.sourcedb_query(sql, 'select')
         insert_sql = "insert into " + self.dest_tab + " values (%s)" % fileds
         rows = self.destdb_query(insert_sql, 'insertmany', results)
         if rows == False:
           print ("Insert failed!! <br>")
         else:
           print ("Inserted %s rows. <br>") % rows
         endtime = datetime.datetime.now()
         timeinterval = endtime - starttime
         print("Elapsed :" + str(timeinterval.seconds) + '.' + str(timeinterval.microseconds) + " seconds <br>")
         k += self.step_size
       print ("\nInsert complete at -- %s <br>") % (datetime.datetime.now().__str__())
       return True
     else:
       return False
   except Exception, e:
     print (str(e) + "<br>")
     return False

def verify_total_cnt(self):
   try:
     sql = """select count(*),IFNULL(min(%s),-1),IFNULL(max(%s),-1) from %s where %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00') \
          and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """ \
           % (self.pk, self.pk, self.dest_tab, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)
     dest_result = self.destdb_query(sql, 'select')
     self.dest_cnt = dest_result[0][0]
     dest_checksum = str(self.dest_cnt) + '_' + str(dest_result[0][1]) + '_' + str(dest_result[0][2])
     print ("source_checksum: %s, dest_checksum: %s <br>") % (self.source_checksum, dest_checksum)
     if self.source_cnt == dest_result[0][0] and dest_result[0][0] != 0 and self.source_checksum == dest_checksum:
       self._migCompleteState = True
       print ("Verify successfully !!<br>")
     else:
       print ("Verify failed !!<br>")
       sys.exit(77)
   except Exception, e:
     print (str(e) + "<br>")

def drop_daily_partition(self):
   try:
     if self._migCompleteState:
       sql = """explain partitions select * from %s where %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00')
              and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """\
            % (self.source_tab, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)
       partition_name = self.sourcedb_query(sql, 'select')
       partition_name = partition_name[0][3]

sql = """select count(*),IFNULL(min(%s),-1),IFNULL(max(%s),-1) from %s partition (%s)""" \
           % (self.pk, self.pk, self.source_tab, partition_name)
       q = self.sourcedb_query(sql, 'select')
       source_cnt = q[0][0]
       source_min_id = q[0][1]
       source_max_id = q[0][2]
       checksum = str(source_cnt) + '_' + str(source_min_id) + '_' + str(source_max_id)
       if source_cnt == 0 or source_min_id == -1 or source_max_id == -1:
         print ("There is 0 record in source PARTITION been matched! <br>")
       else:
         if checksum == self.source_checksum:
           drop_par_sql = "alter table %s drop partition %s;" % (self.source_tab, partition_name)
           droped = self.sourcedb_query(drop_par_sql, 'ddl')
           if droped:
             print (drop_par_sql + " <br>")
             print ("\nDrop partition complete at -- %s <br>") % (datetime.datetime.now().__str__())
             self._deleteCompleteState = True
           else:
             print (drop_par_sql + " <br>")
             print ("Drop partition failed.. <br>")
         else:
           print ("The partition %s checksum failed !! Drop failed !!") % partition_name
           sys.exit(77)
   except Exception, e:
     print (str(e) + "<br>")

def delete_data(self):
   try:
     if self._migCompleteState:
       k = self.source_min_id
       while k <= self.source_max_id:
         sql = """delete from %s where %s >= %d and %s< %d \
              and %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00') \
              and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """ \
            % (self.source_tab, self.pk, k, self.pk, k+self.step_size, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)
         print ("\n%s <br>") % sql
         starttime = datetime.datetime.now()
         rows = self.sourcedb_query(sql, 'dml')
         if rows == False:
           print ("Delete failed!! <br>")
         else:
           print ("Deleted %s rows. <br>") % rows
         endtime = datetime.datetime.now()
         timeinterval = endtime - starttime
         print("Elapsed :" + str(timeinterval.seconds) + '.' + str(timeinterval.microseconds) + " seconds <br>")
         time.sleep(1)
         k += self.step_size
       print ("\nDelete complete at -- %s <br>") % (datetime.datetime.now().__str__())
       self._deleteCompleteState = True
   except Exception, e:
     print (str(e) + "<br>")

def do(self):
   tab_create = self.create_table_from_template()
   if tab_create:
     migration = self.migrate_2_destdb()
     if migration:
       self.verify_total_cnt()
       if self._migCompleteState:
         if self.deleteStrategy == 'drop':
           self.drop_daily_partition()
         else:
           self.delete_data()
         print ("\n<br>")
         print ("====="*5 + '<br>')
         print ("source_total_cnt: %s <br>") % self.source_cnt
         print ("dest_total_cnt: %s <br>") % self.dest_cnt
         print ("====="*5 + '<br>')
         if self._deleteCompleteState:
           print ("\nFinal result: Successfully !! <br>")
           sys.exit(88)
         else:
           print ("\nFinal result: Failed !! <br>")
           sys.exit(254)
   else:
     print ("Create table failed ! Exiting. . .")
     sys.exit(255)

f = ClassMigrate()
f.do()

来源:https://blog.csdn.net/oEleven123456789/article/details/51850169

标签:Python,迁移,数据
0
投稿

猜你喜欢

  • sql server常用命令行操作(启动、停止、暂停)

    2012-01-05 19:02:48
  • mysql出现10061错误解决办法

    2010-07-04 13:36:00
  • 五个Python迷你版小程序附代码

    2023-09-28 02:55:26
  • Python ORM框架SQLAlchemy学习笔记之关系映射实例

    2022-06-22 23:39:04
  • 详解Python list 与 NumPy.ndarry 切片之间的对比

    2023-02-16 10:25:23
  • ASP实现长文章自动分页的函数代码

    2008-10-10 17:09:00
  • python 用正则表达式筛选文本信息的实例

    2023-04-29 14:12:26
  • Django中的ajax请求

    2022-10-19 10:28:14
  • opencv-python 开发环境的安装、配置教程详解

    2022-04-25 22:14:58
  • 将Django项目部署到CentOs服务器中

    2021-07-30 20:11:11
  • Dreamweaver MX 2004表格设计

    2008-02-03 11:36:00
  • numpy实现合并多维矩阵、list的扩展方法

    2022-01-14 22:59:52
  • python中关于数据类型的学习笔记

    2023-10-30 09:31:29
  • 使用jQuery简化Ajax开发

    2010-04-11 21:09:00
  • ORACLE时间函数(SYSDATE)深入理解

    2024-01-18 05:15:14
  • python中import学习备忘笔记

    2021-05-15 02:27:14
  • javascript获取select值的方法完整实例

    2024-04-22 12:49:18
  • 构建可视化 web的 Python 神器streamlit

    2021-05-03 16:56:05
  • Python中Numpy ndarray的使用详解

    2022-08-08 19:03:05
  • Python实现查找数组中任意第k大的数字算法示例

    2022-04-26 22:10:46
  • asp之家 网络编程 m.aspxhome.com