Python定时从Mysql提取数据存入Redis的实现

作者:wzqnls 时间:2024-01-22 01:31:39 

设计思路:

1.程序一旦run起来,python会把mysql中最近一段时间的数据全部提取出来

2.然后实例化redis类,将数据简单解析后逐条传入redis队列

3.定时器设计每天凌晨12点开始跑

ps:redis是个内存数据库,做后台消息队列的缓存时有很大的用处,有兴趣的小伙伴可以去查看相关的文档。


# -*- coding:utf-8 -*-

import MySQLdb
import schedule
import time
import datetime
import random
import string
import redis

# get the data from mysql
class FromSql(object):
 def __init__(self, conn):
   self.conn = conn

def acquire(self):
   cursor = self.conn.cursor()
   try:
     sql = "SELECT * FROM test WHERE TO_DAYS(NOW()) - TO_DAYS(t) <= 1"

cursor.execute(sql)
     rs = cursor.fetchall()
     #print (rs)
     for eve in rs:

print('%s, %s, %s, %s' % eve)
     copy_rs = rs
     cursor.close()

return copy_rs

except Exception as e:
     print("The error: %s" % e)

class RedisQueue(object):

def __init__(self, name, namespace='queue', **redis_kwargs):
   """The default connection parameters are: host='localhost', port=6379, db=0"""
   self.__db= redis.Redis(**redis_kwargs)
   self.key = '%s:%s' %(namespace, name)

def qsize(self):
   return self.__db.llen(self.key)

def put(self, item):
   self.__db.rpush(self.key, item)

def get(self, block=True, timeout=None):

if block:
     item = self.__db.blpop(self.key, timeout=timeout)
   else:
     item = self.__db.lpop(self.key)

if item:
     item = item[1]
   return item

def get_nowait(self):
   return self.get(False)

if __name__ == "__main__":
 # connect mysqldb
 conn_sql = MySQLdb.connect(
           host = '127.0.0.1',
           port = 3306,
           user = 'root',
           passwd = '',
           db = 'test',
           charset = 'utf8'
           )

def job_for_redis():
   get_data = FromSql(conn_sql)
   data = get_data.acquire()

q = RedisQueue('test',host='localhost', port=6379, db=0)
   for single_data in data:
     for meta_data in single_data:
       q.put(meta_data)
       print(meta_data)
   print("All data had been inserted.")

"""
 try:
   schedule.every().day.at("00:00").do(job_for_redis)
 except Exception as e:
   print('Error: %s'% e)
#  finally:
#    conn.close()

while True:
   schedule.run_pending()
   time.sleep(1)
"""

补充知识:python定时获取汇率存入数据库

python定时任务:

我们可以使用 轻量级的第三方模块schedule。首先先安装:pip install schedule

定时任务的的小测试:


import schedule
import time

def job():
 print("I'm working...")

schedule.every(10).minutes.do(job)       # 每隔10分钟执行一次任务
schedule.every().hour.do(job)          # 每隔一小时执行一次任务
schedule.every().day.at("10:30").do(job)    # 每天10:30执行一次任务
schedule.every(5).to(10).days.do(job)      # 每5-10天执行一次任务
schedule.every().monday.do(job)         # 每周一的这个时候执行一次任务
schedule.every().wednesday.at("13:15").do(job) # 每周三13:15执行一次任务

while True:
 schedule.run_pending()

获取数据存入数据库:(格式可能不太对,还有一些符号。自己修改一下即可)


import pymysql
import schedule
import time
import requests
import pandas
from sqlalchemy import create_engine

#获取美元的所有外汇
def job():
 content = '美元'
 url = 'http://www.boc.cn/sourcedb/whpj/index.html' #外汇数据地址
 html = requests.get(url).content.decode('utf-8')

index = html.index('<td>' + content + '</td>')
 str = html[index:index+300]
 result = re.findall('<td>(.*?)</td>',str)

print("币种:" + result[0])
 print("现汇买入价:" + result[1])
 print("现钞买入价:" + result[2])
 print("现汇卖出价:" + result[3])
 print("现钞卖出价:" + result[4])
 print("中行结算价:" + result[5])
 print("发布时间:" + result[6] + ' ' + result[7])

#本地地址 数据库账号 密码  数据库名
 db = pymysql.connect('localhost','root','root','pinyougoudb')
 cursor = db.cursor()

#sql语句
 sql = "update tb_money set huiBuy = %s,chaoBuy = %s,huiSale = %s,chaoSale = %s,centerResult= %s,publishTime = '%s' where typeId = '%s'" % (result[1], result[2], result[3], result[4], result[5], result[6] + ' ' + result[7], result[0])

cursor.execute(sql)
 db.commit()
 print('success')

# 查询语句,将存入的数据查出来
 # sqlalchemy 进行数据库初始化
 engine = create_engine('mysql+pymysql://root:root@localhost:3306/pinyougoudb')
 sql = '''select * from tb_money'''

# pandas 进行数据库读写
 df = pandas.read_sql_query(sql,engine)
 print(df)

db.commit()

# 每隔几分中刷新一次
#schedule.every(0.1).minutes.do(job)

#每天什么时候刷新
schedule.every().day.at("09:29").do(job)
schedule.every().day.at("09:30").do(job)

#一直循环 知道满足条件执行
while True:
 schedule.run_pending()

来源:https://blog.csdn.net/wzqnls/article/details/51992693

标签:Python,Mysql,Redis
0
投稿

猜你喜欢

  • ASP项目中的asp分页 翻页模块函数

    2010-04-03 21:03:00
  • Python 实现自动完成A4标签排版打印功能

    2021-07-14 22:55:15
  • VUE预渲染及遇到的坑

    2023-07-02 17:08:34
  • Python+Tkinter制作在线个性签名工具

    2023-12-25 15:21:23
  • Vue实现无限加载瀑布流

    2024-05-09 15:23:16
  • SQL 截取字符串应用代码

    2024-01-21 08:53:14
  • php防止SQL注入详解及防范

    2023-07-22 19:25:53
  • MySQL Workbench安装及使用详解

    2024-01-27 10:52:15
  • 使用Dreamweaver MX表格排序功能

    2010-07-13 12:08:00
  • 对pandas中两种数据类型Series和DataFrame的区别详解

    2021-08-05 04:09:38
  • Python转码问题的解决方法

    2023-06-30 07:48:52
  • asp下实现代码的“运行代码”“复制代码”“保存代码”功能源码

    2011-04-14 10:39:00
  • Facebook:产品设计评价体系解密

    2011-05-24 17:13:00
  • Git 教程之远程仓库详解

    2023-10-18 20:34:16
  • python使用tkinter模块实现文件选择功能

    2022-06-18 18:35:56
  • Python连接es之查询方式示例汇总

    2023-06-07 11:11:03
  • 一个基于flask的web应用诞生 flask和mysql相连(4)

    2024-01-18 20:37:16
  • python 多进程和多线程使用详解

    2021-08-12 19:50:43
  • python turtle绘图命令及案例

    2022-04-29 10:26:58
  • TensorFlow 2.0之后动态分配显存方式

    2023-12-20 05:25:17
  • asp之家 网络编程 m.aspxhome.com