dbutils封裝文件傳送門
DBUtils是一套Python數(shù)據(jù)庫(kù)連接池包,并允許對(duì)非線程安全的數(shù)據(jù)庫(kù)接口進(jìn)行線程安全包裝。DBUtils來自Webware for Python。
DBUtils提供兩種外部接口:
- PersistentDB :提供線程專用的數(shù)據(jù)庫(kù)連接,并自動(dòng)管理連接。
- PooledDB :提供線程間可共享的數(shù)據(jù)庫(kù)連接,并自動(dòng)管理連接。
需要庫(kù)
1、DBUtils pip install DBUtils
2、pymysql pip install pymysql/MySQLdb
創(chuàng)建DButils組件
db_config.py 配置文件
# -*- coding: UTF-8 -*-
import pymysql
# 數(shù)據(jù)庫(kù)信息
DB_TEST_HOST = "127.0.0.1"
DB_TEST_PORT = 3306
DB_TEST_DBNAME = "ball"
DB_TEST_USER = "root"
DB_TEST_PASSWORD = "123456"
# 數(shù)據(jù)庫(kù)連接編碼
DB_CHARSET = "utf8"
# mincached : 啟動(dòng)時(shí)開啟的閑置連接數(shù)量(缺省值 0 開始時(shí)不創(chuàng)建連接)
DB_MIN_CACHED = 10
# maxcached : 連接池中允許的閑置的最多連接數(shù)量(缺省值 0 代表不閑置連接池大小)
DB_MAX_CACHED = 10
# maxshared : 共享連接數(shù)允許的最大數(shù)量(缺省值 0 代表所有連接都是專用的)如果達(dá)到了最大數(shù)量,被請(qǐng)求為共享的連接將會(huì)被共享使用
DB_MAX_SHARED = 20
# maxconnecyions : 創(chuàng)建連接池的最大數(shù)量(缺省值 0 代表不限制)
DB_MAX_CONNECYIONS = 100
# blocking : 設(shè)置在連接池達(dá)到最大數(shù)量時(shí)的行為(缺省值 0 或 False 代表返回一個(gè)錯(cuò)誤toMany......> 其他代表阻塞直到連接數(shù)減少,連接被分配)
DB_BLOCKING = True
# maxusage : 單個(gè)連接的最大允許復(fù)用次數(shù)(缺省值 0 或 False 代表不限制的復(fù)用).當(dāng)達(dá)到最大數(shù)時(shí),連接會(huì)自動(dòng)重新連接(關(guān)閉和重新打開)
DB_MAX_USAGE = 0
# setsession : 一個(gè)可選的SQL命令列表用于準(zhǔn)備每個(gè)會(huì)話,如["set datestyle to german", ...]
DB_SET_SESSION = None
# creator : 使用連接數(shù)據(jù)庫(kù)的模塊
DB_CREATOR = pymysql
db_dbutils_init.py 創(chuàng)建數(shù)據(jù)池初始化
from DBUtils.PooledDB import PooledDB
import db_config as config
"""
@功能:創(chuàng)建數(shù)據(jù)庫(kù)連接池
"""
class MyConnectionPool(object):
__pool = None
# def __init__(self):
# self.conn = self.__getConn()
# self.cursor = self.conn.cursor()
# 創(chuàng)建數(shù)據(jù)庫(kù)連接conn和游標(biāo)cursor
def __enter__(self):
self.conn = self.__getconn()
self.cursor = self.conn.cursor()
# 創(chuàng)建數(shù)據(jù)庫(kù)連接池
def __getconn(self):
if self.__pool is None:
self.__pool = PooledDB(
creator=config.DB_CREATOR,
mincached=config.DB_MIN_CACHED,
maxcached=config.DB_MAX_CACHED,
maxshared=config.DB_MAX_SHARED,
maxconnections=config.DB_MAX_CONNECYIONS,
blocking=config.DB_BLOCKING,
maxusage=config.DB_MAX_USAGE,
setsession=config.DB_SET_SESSION,
host=config.DB_TEST_HOST,
port=config.DB_TEST_PORT,
user=config.DB_TEST_USER,
passwd=config.DB_TEST_PASSWORD,
db=config.DB_TEST_DBNAME,
use_unicode=False,
charset=config.DB_CHARSET
)
return self.__pool.connection()
# 釋放連接池資源
def __exit__(self, exc_type, exc_val, exc_tb):
self.cursor.close()
self.conn.close()
# 關(guān)閉連接歸還給鏈接池
# def close(self):
# self.cursor.close()
# self.conn.close()
# 從連接池中取出一個(gè)連接
def getconn(self):
conn = self.__getconn()
cursor = conn.cursor()
return cursor, conn
# 獲取連接池,實(shí)例化
def get_my_connection():
return MyConnectionPool()
制作mysqlhelper.py
from db_dbutils_init import get_my_connection
"""執(zhí)行語(yǔ)句查詢有結(jié)果返回結(jié)果沒有返回0;增/刪/改返回變更數(shù)據(jù)條數(shù),沒有返回0"""
class MySqLHelper(object):
def __init__(self):
self.db = get_my_connection() # 從數(shù)據(jù)池中獲取連接
def __new__(cls, *args, **kwargs):
if not hasattr(cls, 'inst'): # 單例
cls.inst = super(MySqLHelper, cls).__new__(cls, *args, **kwargs)
return cls.inst
# 封裝執(zhí)行命令
def execute(self, sql, param=None, autoclose=False):
"""
【主要判斷是否有參數(shù)和是否執(zhí)行完就釋放連接】
:param sql: 字符串類型,sql語(yǔ)句
:param param: sql語(yǔ)句中要替換的參數(shù)"select %s from tab where id=%s" 其中的%s就是參數(shù)
:param autoclose: 是否關(guān)閉連接
:return: 返回連接conn和游標(biāo)cursor
"""
cursor, conn = self.db.getconn() # 從連接池獲取連接
count = 0
try:
# count : 為改變的數(shù)據(jù)條數(shù)
if param:
count = cursor.execute(sql, param)
else:
count = cursor.execute(sql)
conn.commit()
if autoclose:
self.close(cursor, conn)
except Exception as e:
pass
return cursor, conn, count
# 執(zhí)行多條命令
# def executemany(self, lis):
# """
# :param lis: 是一個(gè)列表,里面放的是每個(gè)sql的字典'[{"sql":"xxx","param":"xx"}....]'
# :return:
# """
# cursor, conn = self.db.getconn()
# try:
# for order in lis:
# sql = order['sql']
# param = order['param']
# if param:
# cursor.execute(sql, param)
# else:
# cursor.execute(sql)
# conn.commit()
# self.close(cursor, conn)
# return True
# except Exception as e:
# print(e)
# conn.rollback()
# self.close(cursor, conn)
# return False
# 釋放連接
def close(self, cursor, conn):
"""釋放連接歸還給連接池"""
cursor.close()
conn.close()
# 查詢所有
def selectall(self, sql, param=None):
try:
cursor, conn, count = self.execute(sql, param)
res = cursor.fetchall()
return res
except Exception as e:
print(e)
self.close(cursor, conn)
return count
# 查詢單條
def selectone(self, sql, param=None):
try:
cursor, conn, count = self.execute(sql, param)
res = cursor.fetchone()
self.close(cursor, conn)
return res
except Exception as e:
print("error_msg:", e.args)
self.close(cursor, conn)
return count
# 增加
def insertone(self, sql, param):
try:
cursor, conn, count = self.execute(sql, param)
# _id = cursor.lastrowid() # 獲取當(dāng)前插入數(shù)據(jù)的主鍵id,該id應(yīng)該為自動(dòng)生成為好
conn.commit()
self.close(cursor, conn)
return count
# 防止表中沒有id返回0
# if _id == 0:
# return True
# return _id
except Exception as e:
print(e)
conn.rollback()
self.close(cursor, conn)
return count
# 增加多行
def insertmany(self, sql, param):
"""
:param sql:
:param param: 必須是元組或列表[(),()]或((),())
:return:
"""
cursor, conn, count = self.db.getconn()
try:
cursor.executemany(sql, param)
conn.commit()
return count
except Exception as e:
print(e)
conn.rollback()
self.close(cursor, conn)
return count
# 刪除
def delete(self, sql, param=None):
try:
cursor, conn, count = self.execute(sql, param)
self.close(cursor, conn)
return count
except Exception as e:
print(e)
conn.rollback()
self.close(cursor, conn)
return count
# 更新
def update(self, sql, param=None):
try:
cursor, conn, count = self.execute(sql, param)
conn.commit()
self.close(cursor, conn)
return count
except Exception as e:
print(e)
conn.rollback()
self.close(cursor, conn)
return count
if __name__ == '__main__':
db = MySqLHelper()
# # 查詢單條
# sql1 = 'select * from userinfo where name=%s'
# args = 'python'
# ret = db.selectone(sql=sql1, param=args)
# print(ret) # (None, b'python', b'123456', b'0')
# 增加單條
# sql2 = 'insert into userinfo (name,password) VALUES (%s,%s)'
# ret = db.insertone(sql2, ('old2','22222'))
# print(ret)
# 增加多條
# sql3 = 'insert into userinfo (name,password) VALUES (%s,%s)'
# li = li = [
# ('分省', '123'),
# ('到達(dá)','456')
# ]
# ret = db.insertmany(sql3,li)
# print(ret)
# 刪除
# sql4 = 'delete from userinfo WHERE name=%s'
# args = 'xxxx'
# ret = db.delete(sql4, args)
# print(ret)
# 更新
# sql5 = r'update userinfo set password=%s WHERE name LIKE %s'
# args = ('993333993', '%old%')
# ret = db.update(sql5, args)
# print(ret)
python3 實(shí)現(xiàn)mysql數(shù)據(jù)庫(kù)連接池
原理
python編程中可以使用MySQLdb進(jìn)行數(shù)據(jù)庫(kù)的連接及諸如查詢/插入/更新等操作,但是每次連接mysql數(shù)據(jù)庫(kù)請(qǐng)求時(shí),都是獨(dú)立的去請(qǐng)求訪問,相當(dāng)浪費(fèi)資源,
而且訪問數(shù)量達(dá)到一定數(shù)量時(shí),對(duì)mysql的性能會(huì)產(chǎn)生較大的影響。
因此,實(shí)際使用中,通常會(huì)使用數(shù)據(jù)庫(kù)的連接池技術(shù),來訪問數(shù)據(jù)庫(kù)達(dá)到資源復(fù)用的目的。
安裝數(shù)據(jù)庫(kù)連接池模塊DBUtils
DBUtils是一套Python數(shù)據(jù)庫(kù)連接池包,并允許對(duì)非線程安全的數(shù)據(jù)庫(kù)接口進(jìn)行線程安全包裝。DBUtils來自Webware for Python。
DBUtils提供兩種外部接口:
- * PersistentDB :提供線程專用的數(shù)據(jù)庫(kù)連接,并自動(dòng)管理連接。
- * PooledDB :提供線程間可共享的數(shù)據(jù)庫(kù)連接,并自動(dòng)管理連接。
下載地址:DBUtils 下載解壓后,使用python setup.py install 命令進(jìn)行安裝
下面利用MySQLdb和DBUtils建立自己的mysql數(shù)據(jù)庫(kù)連接池工具包
在工程目錄下新建package命名為:dbConnecttion,并新建module命名為MySqlConn,下面是MySqlConn.py,該模塊創(chuàng)建Mysql的連接池對(duì)象,并創(chuàng)建了如查詢/插入等通用的操作方法。該部分代碼實(shí)現(xiàn)如下:
還有很多其他參數(shù)可以配置:
dbapi :數(shù)據(jù)庫(kù)接口
mincached :?jiǎn)?dòng)時(shí)開啟的空連接數(shù)量
maxcached :連接池最大可用連接數(shù)量
maxshared :連接池最大可共享連接數(shù)量
maxconnections :最大允許連接數(shù)量
blocking :達(dá)到最大數(shù)量時(shí)是否阻塞
maxusage :?jiǎn)蝹€(gè)連接最大復(fù)用次數(shù)
根據(jù)自己的需要合理配置上述的資源參數(shù),以滿足自己的實(shí)際需要。
代碼:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pymysql, os, configparser
from pymysql.cursors import DictCursor
from DBUtils.PooledDB import PooledDB
class Config(object):
"""
# Config().get_content("user_information")
配置文件里面的參數(shù)
[notdbMysql]
host = 192.168.1.101
port = 3306
user = root
password = python123
"""
def __init__(self, config_filename="myProjectConfig.cnf"):
file_path = os.path.join(os.path.dirname(__file__), config_filename)
self.cf = configparser.ConfigParser()
self.cf.read(file_path)
def get_sections(self):
return self.cf.sections()
def get_options(self, section):
return self.cf.options(section)
def get_content(self, section):
result = {}
for option in self.get_options(section):
value = self.cf.get(section, option)
result[option] = int(value) if value.isdigit() else value
return result
class BasePymysqlPool(object):
def __init__(self, host, port, user, password, db_name=None):
self.db_host = host
self.db_port = int(port)
self.user = user
self.password = str(password)
self.db = db_name
self.conn = None
self.cursor = None
class MyPymysqlPool(BasePymysqlPool):
"""
MYSQL數(shù)據(jù)庫(kù)對(duì)象,負(fù)責(zé)產(chǎn)生數(shù)據(jù)庫(kù)連接 , 此類中的連接采用連接池實(shí)現(xiàn)獲取連接對(duì)象:conn = Mysql.getConn()
釋放連接對(duì)象;conn.close()或del conn
"""
# 連接池對(duì)象
__pool = None
def __init__(self, conf_name=None):
self.conf = Config().get_content(conf_name)
super(MyPymysqlPool, self).__init__(**self.conf)
# 數(shù)據(jù)庫(kù)構(gòu)造函數(shù),從連接池中取出連接,并生成操作游標(biāo)
self._conn = self.__getConn()
self._cursor = self._conn.cursor()
def __getConn(self):
"""
@summary: 靜態(tài)方法,從連接池中取出連接
@return MySQLdb.connection
"""
if MyPymysqlPool.__pool is None:
__pool = PooledDB(creator=pymysql,
mincached=1,
maxcached=20,
host=self.db_host,
port=self.db_port,
user=self.user,
passwd=self.password,
db=self.db,
use_unicode=False,
charset="utf8",
cursorclass=DictCursor)
return __pool.connection()
def getAll(self, sql, param=None):
"""
@summary: 執(zhí)行查詢,并取出所有結(jié)果集
@param sql:查詢SQL,如果有查詢條件,請(qǐng)只指定條件列表,并將條件值使用參數(shù)[param]傳遞進(jìn)來
@param param: 可選參數(shù),條件列表值(元組/列表)
@return: result list(字典對(duì)象)/boolean 查詢到的結(jié)果集
"""
if param is None:
count = self._cursor.execute(sql)
else:
count = self._cursor.execute(sql, param)
if count > 0:
result = self._cursor.fetchall()
else:
result = False
return result
def getOne(self, sql, param=None):
"""
@summary: 執(zhí)行查詢,并取出第一條
@param sql:查詢SQL,如果有查詢條件,請(qǐng)只指定條件列表,并將條件值使用參數(shù)[param]傳遞進(jìn)來
@param param: 可選參數(shù),條件列表值(元組/列表)
@return: result list/boolean 查詢到的結(jié)果集
"""
if param is None:
count = self._cursor.execute(sql)
else:
count = self._cursor.execute(sql, param)
if count > 0:
result = self._cursor.fetchone()
else:
result = False
return result
def getMany(self, sql, num, param=None):
"""
@summary: 執(zhí)行查詢,并取出num條結(jié)果
@param sql:查詢SQL,如果有查詢條件,請(qǐng)只指定條件列表,并將條件值使用參數(shù)[param]傳遞進(jìn)來
@param num:取得的結(jié)果條數(shù)
@param param: 可選參數(shù),條件列表值(元組/列表)
@return: result list/boolean 查詢到的結(jié)果集
"""
if param is None:
count = self._cursor.execute(sql)
else:
count = self._cursor.execute(sql, param)
if count > 0:
result = self._cursor.fetchmany(num)
else:
result = False
return result
def insertMany(self, sql, values):
"""
@summary: 向數(shù)據(jù)表插入多條記錄
@param sql:要插入的SQL格式
@param values:要插入的記錄數(shù)據(jù)tuple(tuple)/list[list]
@return: count 受影響的行數(shù)
"""
count = self._cursor.executemany(sql, values)
return count
def __query(self, sql, param=None):
if param is None:
count = self._cursor.execute(sql)
else:
count = self._cursor.execute(sql, param)
return count
def update(self, sql, param=None):
"""
@summary: 更新數(shù)據(jù)表記錄
@param sql: SQL格式及條件,使用(%s,%s)
@param param: 要更新的 值 tuple/list
@return: count 受影響的行數(shù)
"""
return self.__query(sql, param)
def insert(self, sql, param=None):
"""
@summary: 更新數(shù)據(jù)表記錄
@param sql: SQL格式及條件,使用(%s,%s)
@param param: 要更新的 值 tuple/list
@return: count 受影響的行數(shù)
"""
return self.__query(sql, param)
def delete(self, sql, param=None):
"""
@summary: 刪除數(shù)據(jù)表記錄
@param sql: SQL格式及條件,使用(%s,%s)
@param param: 要?jiǎng)h除的條件 值 tuple/list
@return: count 受影響的行數(shù)
"""
return self.__query(sql, param)
def begin(self):
"""
@summary: 開啟事務(wù)
"""
self._conn.autocommit(0)
def end(self, option='commit'):
"""
@summary: 結(jié)束事務(wù)
"""
if option == 'commit':
self._conn.commit()
else:
self._conn.rollback()
def dispose(self, isEnd=1):
"""
@summary: 釋放連接池資源
"""
if isEnd == 1:
self.end('commit')
else:
self.end('rollback')
self._cursor.close()
self._conn.close()
if __name__ == '__main__':
mysql = MyPymysqlPool("notdbMysql")
sqlAll = "select * from myTest.aa;"
result = mysql.getAll(sqlAll)
print(result)
sqlAll = "select * from myTest.aa;"
result = mysql.getMany(sqlAll, 2)
print(result)
result = mysql.getOne(sqlAll)
print(result)
# mysql.insert("insert into myTest.aa set a=%s", (1))
# 釋放資源
mysql.dispose()
參考博客:https://www.cnblogs.com/renfanzi/p/7656142.html
到此這篇關(guān)于python3 實(shí)現(xiàn)mysql數(shù)據(jù)庫(kù)連接池的示例代碼的文章就介紹到這了,更多相關(guān)python3 mysql連接池內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
您可能感興趣的文章:- java數(shù)據(jù)庫(kù)連接池新手入門一篇就夠了,太簡(jiǎn)單了!
- 如何在SpringBoot 中使用 Druid 數(shù)據(jù)庫(kù)連接池
- Java數(shù)據(jù)庫(kù)連接池技術(shù)的入門教程
- Java 模擬數(shù)據(jù)庫(kù)連接池的實(shí)現(xiàn)代碼
- 一篇文章搞定數(shù)據(jù)庫(kù)連接池