Python封装数据库连接池详解

作者:傲娇的喵酱 时间:2024-01-14 06:26:37 

前言:

线程安全问题:当2个线程同时用到线程池时,会同时创建2个线程池。如果多个线程,错开用到线程池,就只会创建一个线程池,会共用一个线程池。我用的注解方式的单例模式,感觉就是这个注解的单例方式,解决了多线程问题,但是没解决线程安全问题,需要优化这个单例模式。

主要通过 PooledDB 模块实现。

一、数据库封装

1.1数据库基本配置

db_config.py

# -*- coding: UTF-8 -*-
import pymysql

# 数据库信息
DB_TEST_HOST = "127.0.0.1"
DB_TEST_PORT = 3308
DB_TEST_DBNAME = "bt"
DB_TEST_USER = "root"
DB_TEST_PASSWORD = "123456"

# 数据库连接编码
DB_CHARSET = "utf8"
# mincached : 启动时开启的闲置连接数量(缺省值 0 开始时不创建连接)
DB_MIN_CACHED = 5
# maxcached : 连接池中允许的闲置的最多连接数量(缺省值 0 代表不闲置连接池大小)
DB_MAX_CACHED = 0
# maxshared : 共享连接数允许的最大数量(缺省值 0 代表所有连接都是专用的)如果达到了最大数量,被请求为共享的连接将会被共享使用
DB_MAX_SHARED = 5
# maxconnecyions : 创建连接池的最大数量(缺省值 0 代表不限制)
DB_MAX_CONNECYIONS = 300
# blocking : 设置在连接池达到最大数量时的行为(缺省值 0 或 False 代表返回一个错误<toMany......> 其他代表阻塞直到连接数减少,连接被分配)
DB_BLOCKING = True
# maxusage : 单个连接的最大允许复用次数(缺省值 0 或 False 代表不限制的复用).当达到最大数时,连接会自动重新连接(关闭和重新打开)
DB_MAX_USAGE = 0
# setsession : 一个可选的SQL命令列表用于准备每个会话,如["set datestyle to german", ...]
DB_SET_SESSION = None

# creator : 使用连接数据库的模块
DB_CREATOR = pymysql

设置连接池最大最小为5个。则启动连接池时,就会建立5个连接。

1.2 编写单例模式注解

singleton.py

#单例模式函数,用来修饰类
def singleton(cls,*args,**kw):
   instances = {}
   def _singleton():
       if cls not in instances:
           instances[cls] = cls(*args,**kw)
       return instances[cls]
   return _singleton

1.3 构建连接池

db_dbutils_init.py

from dbutils.pooled_db import PooledDB
import db_config as config
# import random

from singleton import singleton
"""
@功能:创建数据库连接池
"""

class MyConnectionPool(object):
   # 私有属性
   # 能通过对象直接访问,但是可以在本类内部访问;
   __pool = None

# def __init__(self):
   #     self.conn = self.__getConn()
   #     self.cursor = self.conn.cursor()

# 创建数据库连接conn和游标cursor
   def __enter__(self):
       self.conn = self.__getconn()
       self.cursor = self.conn.cursor()

# 创建数据库连接池
   def __getconn(self):
       if self.__pool is None:
           # i = random.randint(1, 100)
           # print("创建线程池的数量"+str(i))
           self.__pool = PooledDB(
               creator=config.DB_CREATOR,
               mincached=config.DB_MIN_CACHED,
               maxcached=config.DB_MAX_CACHED,
               maxshared=config.DB_MAX_SHARED,
               maxconnections=config.DB_MAX_CONNECYIONS,
               blocking=config.DB_BLOCKING,
               maxusage=config.DB_MAX_USAGE,
               setsession=config.DB_SET_SESSION,
               host=config.DB_TEST_HOST,
               port=config.DB_TEST_PORT,
               user=config.DB_TEST_USER,
               passwd=config.DB_TEST_PASSWORD,
               db=config.DB_TEST_DBNAME,
               use_unicode=False,
               charset=config.DB_CHARSET
           )
       return self.__pool.connection()

# 释放连接池资源
   def __exit__(self, exc_type, exc_val, exc_tb):
       self.cursor.close()
       self.conn.close()
   # 关闭连接归还给链接池
   # def close(self):
   #     self.cursor.close()
   #     self.conn.close()

# 从连接池中取出一个连接
   def getconn(self):
       conn = self.__getconn()
       cursor = conn.cursor()
       return cursor, conn
# 获取连接池,实例化
@singleton
def get_my_connection():
   return MyConnectionPool()

1.4 封装Python操作MYSQL的代码

mysqlhelper.py

import time
from db_dbutils_init import get_my_connection
"""执行语句查询有结果返回结果没有返回0;增/删/改返回变更数据条数,没有返回0"""
class MySqLHelper(object):
   def __init__(self):
       self.db = get_my_connection()  # 从数据池中获取连接
   #
   # def __new__(cls, *args, **kwargs):
   #     if not hasattr(cls, 'inst'):  # 单例
   #         cls.inst = super(MySqLHelper, cls).__new__(cls, *args, **kwargs)
   #     return cls.inst

# 封装执行命令
   def execute(self, sql, param=None, autoclose=False):
       """
       【主要判断是否有参数和是否执行完就释放连接】
       :param sql: 字符串类型,sql语句
       :param param: sql语句中要替换的参数"select %s from tab where id=%s" 其中的%s就是参数
       :param autoclose: 是否关闭连接
       :return: 返回连接conn和游标cursor
       """
       cursor, conn = self.db.getconn()  # 从连接池获取连接
       count = 0
       try:
           # count : 为改变的数据条数
           if param:
               count = cursor.execute(sql, param)
           else:
               count = cursor.execute(sql)
           conn.commit()
           if autoclose:
               self.close(cursor, conn)
       except Exception as e:
           pass
       return cursor, conn, count

# 释放连接
   def close(self, cursor, conn):
       """释放连接归还给连接池"""
       cursor.close()
       conn.close()

# 查询所有
   def selectall(self, sql, param=None):
       cursor = None
       conn = None
       count = None
       try:
           cursor, conn, count = self.execute(sql, param)
           res = cursor.fetchall()
           return res
       except Exception as e:
           print(e)
           self.close(cursor, conn)
           return count

# 查询单条
   def selectone(self, sql, param=None):
       cursor = None
       conn = None
       count = None
       try:
           cursor, conn, count = self.execute(sql, param)
           res = cursor.fetchone()
           self.close(cursor, conn)
           return res
       except Exception as e:
           print("error_msg:", e.args)
           self.close(cursor, conn)
           return count

# 增加
   def insertone(self, sql, param):
       cursor = None
       conn = None
       count = None
       try:
           cursor, conn, count = self.execute(sql, param)
           # _id = cursor.lastrowid()  # 获取当前插入数据的主键id,该id应该为自动生成为好
           conn.commit()
           self.close(cursor, conn)
           return count
       except Exception as e:
           print(e)
           conn.rollback()
           self.close(cursor, conn)
           return count

# 增加多行
   def insertmany(self, sql, param):
       """
       :param sql:
       :param param: 必须是元组或列表[(),()]或((),())
       :return:
       """
       cursor, conn, count = self.db.getconn()
       try:
           cursor.executemany(sql, param)
           conn.commit()
           return count
       except Exception as e:
           print(e)
           conn.rollback()
           self.close(cursor, conn)
           return count

# 删除
   def delete(self, sql, param=None):
       cursor = None
       conn = None
       count = None
       try:
           cursor, conn, count = self.execute(sql, param)
           self.close(cursor, conn)
           return count
       except Exception as e:
           print(e)
           conn.rollback()
           self.close(cursor, conn)
           return count

# 更新
   def update(self, sql, param=None):
       cursor = None
       conn = None
       count = None
       try:
           cursor, conn, count = self.execute(sql, param)
           conn.commit()
           self.close(cursor, conn)
           return count
       except Exception as e:
           print(e)
           conn.rollback()
           self.close(cursor, conn)
           return count
# if __name__ == '__main__':
#     db = MySqLHelper()
#     sql = "SELECT SLEEP(10)"
#     db.execute(sql)
#     time.sleep(20)

# TODO 查询单条
   # sql1 = 'select * from userinfo where name=%s'
   # args = 'python'
   # ret = db.selectone(sql=sql1, param=args)
   # print(ret)  # (None, b'python', b'123456', b'0')

# TODO 增加单条
   # sql2 = 'insert into hotel_urls(cname,hname,cid,hid,url) values(%s,%s,%s,%s,%s)'
   # ret = db.insertone(sql2, ('1', '2', '1', '2', '2'))
   # print(ret)

# TODO 增加多条
   # sql3 = 'insert into userinfo (name,password) VALUES (%s,%s)'
   # li = li = [
   #     ('分省', '123'),
   #     ('到达','456')
   # ]
   # ret = db.insertmany(sql3,li)
   # print(ret)

# TODO 删除
   # sql4 = 'delete from  userinfo WHERE name=%s'
   # args = 'xxxx'
   # ret = db.delete(sql4, args)
   # print(ret)

# TODO 更新
   # sql5 = r'update userinfo set password=%s WHERE name LIKE %s'
   # args = ('993333993', '%old%')
   # ret = db.update(sql5, args)
   # print(ret)

二、连接池测试

修改 db_dbutils_init.py 文件,在创建连接池def __getconn(self):方法下,加一个打印随机数,方便将来我们定位是否时单例的线程池。

Python封装数据库连接池详解

 修改后的db_dbutils_init.py 文件:

from dbutils.pooled_db import PooledDB
import db_config as config
import random
from singleton import singleton

"""
@功能:创建数据库连接池
"""
class MyConnectionPool(object):
   # 私有属性
   # 能通过对象直接访问,但是可以在本类内部访问;
   __pool = None

# def __init__(self):
   #     self.conn = self.__getConn()
   #     self.cursor = self.conn.cursor()

# 创建数据库连接conn和游标cursor
   def __enter__(self):
       self.conn = self.__getconn()
       self.cursor = self.conn.cursor()

# 创建数据库连接池
   def __getconn(self):
       if self.__pool is None:
           i = random.randint(1, 100)
           print("线程池的随机数"+str(i))
           self.__pool = PooledDB(
               creator=config.DB_CREATOR,
               mincached=config.DB_MIN_CACHED,
               maxcached=config.DB_MAX_CACHED,
               maxshared=config.DB_MAX_SHARED,
               maxconnections=config.DB_MAX_CONNECYIONS,
               blocking=config.DB_BLOCKING,
               maxusage=config.DB_MAX_USAGE,
               setsession=config.DB_SET_SESSION,
               host=config.DB_TEST_HOST,
               port=config.DB_TEST_PORT,
               user=config.DB_TEST_USER,
               passwd=config.DB_TEST_PASSWORD,
               db=config.DB_TEST_DBNAME,
               use_unicode=False,
               charset=config.DB_CHARSET
           )
       return self.__pool.connection()

# 释放连接池资源
   def __exit__(self, exc_type, exc_val, exc_tb):
       self.cursor.close()
       self.conn.close()

# 关闭连接归还给链接池
   # def close(self):
   #     self.cursor.close()
   #     self.conn.close()

# 从连接池中取出一个连接
   def getconn(self):
       conn = self.__getconn()
       cursor = conn.cursor()
       return cursor, conn
# 获取连接池,实例化
@singleton
def get_my_connection():
   return MyConnectionPool()

开始测试:

场景一:同一个实例,执行2次sql

from mysqlhelper import MySqLHelper
import time

if __name__ == '__main__':
   sql = "SELECT SLEEP(10)"
   sql1 = "SELECT SLEEP(15)"

db = MySqLHelper()
   db.execute(sql)
   db.execute(sql1)
   time.sleep(20)

在数据库中,使用 show processlist;

show processlist;

当执行第一个sql时。数据库连接显示。

Python封装数据库连接池详解

当执行第二个sql时。数据库连接显示:

Python封装数据库连接池详解

 当执行完sql,程序sleep时。数据库连接显示:

Python封装数据库连接池详解

程序打印结果:

线程池的随机数43

由以上可以得出结论:

线程池启动后,生成了5个连接。执行第一个sql时,使用了1个连接。执行完第一个sql后,使用了另外1个连接。 这是一个线性的,线程池中一共5个连接,但是每次执行,只使用了其中一个。

有个疑问,连接池如果不支持并发是不是就毫无意义?

如上,虽然开了线程池5个连接,但是每次执行sql,只用到了一个连接。那为何不设置线程池大小为1呢?设置线程池大小的意义何在呢?(如果在非并发的场景下,是不是设置大小无意义?)

相比于不用线程池的优点:

如果不用线程池,则每次执行一个sql都要创建、断开连接。 像我们这样使用连接池,不用反复创建、断开连接,拿现成的连接直接用就好了。

场景二:依次创建2个实例,各自执行sql

from mysqlhelper import MySqLHelper
import time

if __name__ == '__main__':
   db = MySqLHelper()
   db1 = MySqLHelper()
   sql = "SELECT SLEEP(10)"
   sql1 = "SELECT SLEEP(15)"
   db.execute(sql)
   db1.execute(sql1)
   time.sleep(20)

第一个实例db,执行sql。线程池启动了5个连接

Python封装数据库连接池详解

第二个实例db1,执行sql:

Python封装数据库连接池详解

 程序睡眠时,一共5个线程池:

Python封装数据库连接池详解

 打印结果:

Python封装数据库连接池详解

结果证明:

虽然我们依次创建了2个实例,但是(1)创建线程池的打印结果,只打印1次,且从始至终,线程池一共只启动了5个连接,且连接的id没有发生改变,说明一直是这5个连接。

证明,我们虽然创建了2个实例,但是这2个实例其实是一个实例。(单例模式是生效的)

场景三:启动2个线程,但是线程在创建连接池实例时,有时间间隔

import threading
from mysqlhelper import MySqLHelper
import time
def sl1():
   time.sleep(2)
   db = MySqLHelper()
   sql = "SELECT SLEEP(6)"
   db.execute(sql)

def sl2():
   time.sleep(4)
   db = MySqLHelper()
   sql = "SELECT SLEEP(15)"
   db.execute(sql)
if __name__ == '__main__':
   threads = []
   t1 = threading.Thread(target=sl1)
   threads.append(t1)
   t2 = threading.Thread(target=sl2)
   threads.append(t2)

for t in threads:
       t.setDaemon(True)
       t.start()
   time.sleep(20)

2个线程间隔了2秒。

观察数据库的连接数量:

Python封装数据库连接池详解

打印结果:

Python封装数据库连接池详解

在并发执行2个sql时,共用了这5个连接,且打印结果只打印了一次,说明虽然并发创建了2次实例,但真正只创建了一个连接池。

场景四:启动2个线程,线程在创建连接池实例时,没有时间间隔

import threading
from mysqlhelper import MySqLHelper
import time

if __name__ == '__main__':
   db = MySqLHelper()
   sql = "SELECT SLEEP(6)"
   sql1 = "SELECT SLEEP(15)"
   threads = []
   t1 = threading.Thread(target=db.execute, args=(sql,))
   threads.append(t1)
   t2 = threading.Thread(target=db.execute, args=(sql1,))
   threads.append(t2)

for t in threads:
       t.setDaemon(True)
       t.start()
   time.sleep(20)

观察数据库连接 :

Python封装数据库连接池详解

 打印结果:

Python封装数据库连接池详解

结果表明:

终端打印了2次,数据库建立了10个连接,说明创建了2个线程池。这样的单例模式,存在线程安全问题。

来源:https://blog.csdn.net/qq_39208536/article/details/125354741

标签:Python,封装,数据库,连接池
0
投稿

猜你喜欢

  • Python与人工神经网络:使用神经网络识别手写图像介绍

    2022-05-23 03:11:36
  • python3实现语音转文字(语音识别)和文字转语音(语音合成)

    2022-10-02 03:40:33
  • python paramiko实现ssh远程访问的方法

    2021-07-17 23:03:55
  • Python内建函数之raw_input()与input()代码解析

    2021-01-12 01:37:36
  • Python科学计算环境推荐——Anaconda

    2022-12-17 15:07:30
  • Python调用高德API实现批量地址转经纬度并写入表格的功能

    2023-12-26 03:22:20
  • Docker同时安装MySQL和MariaDB的方法步骤

    2024-01-27 06:51:32
  • python使用webbrowser浏览指定url的方法

    2023-10-24 03:33:31
  • Pygame游戏开发之太空射击实战入门篇

    2023-07-17 23:12:42
  • DataFrame 数据合并实现(merge,join,concat)

    2022-03-28 04:24:02
  • php通过pecl方式安装扩展的实例讲解

    2023-07-23 07:06:12
  • Python读取properties配置文件操作示例

    2021-06-10 04:20:55
  • python实现scrapy爬虫每天定时抓取数据的示例代码

    2022-05-04 22:33:18
  • python 如何将带小数的浮点型字符串转换为整数

    2021-04-24 01:35:51
  • pycharm部署、配置anaconda环境的教程

    2022-08-23 11:50:54
  • python银行系统实现源码

    2022-08-09 16:05:20
  • Python 按字典dict的键排序,并取出相应的键值放于list中的实例

    2022-01-26 16:48:46
  • Python面向对象的三大特性封装、继承、多态

    2023-12-11 05:15:52
  • NumPy 基本切片和索引的具体使用方法

    2023-02-11 20:47:50
  • 10分钟用Python快速搭建全文搜索引擎详解流程

    2023-11-06 16:13:41
  • asp之家 网络编程 m.aspxhome.com