Python操作Mysql及创建连接池

Black Python 27 次浏览 , Python操作Mysql及创建连接池已关闭评论

作为一个合格的crud工程师,我真的就只记得crud,最近由于忘记了mysql账号密码(包括root账号),在找回账号的过程中顺便就梳理了一些常用的mysql用法。

忘记root密码

第一步: 关闭mysql

$ service mysql stop

第二步: 修改my.cnf文件,加上红框中两行内容

$ vim /etc/mysql/my.cnf

第三步: 重启mysql服务,并进入mysql修改密码

$ service mysql restart
$ mysql
mysql> use mysql;
mysql> update user set authentication_string=PASSWORD("new_pass") where user='root';

第四步: 撤回对my.cnf文件的修改

Python操作MySQL

python操作MySQL的常用方法是使用pymysql模块。在单次使用时,只需要直接调用connect()方法即可。

import pymysql
#链接数据库
db = pymysql.connect(host='yourhost',user='yourname',passwd='yourpasswd',db='yourdb')
cursor = db.cursor()  
cursor.execute("select * from yourdb")
result = cursor.fetchone()

但是为了达到复用的效果,可以选择对一些常用的操作进行封装。例如,新建一个操作mysql的工具类,命名文件为lib_mysql.py,类中封装exec_sql()方法用于查询。

import pymysql
from urllib.parse import urlparse

class Client:
      def __init__(self, url, **kwargs):
          self.url = url
          if not url:
              return
          url = urlparse(url)
          settings = {
              'host': url.hostname,
              'port': url.port or 3306,
              'charset': 'utf8',
              'db': url.path.strip().strip('/'),
              'user': url.username,
              'passwd': url.password,
              'cursorclass': pymysql.cursors.DictCursor
          }
          self.settings = settings
          if kwargs:
              self.settings.update(kwargs)
          self.conn = pymysql.connect(**self.settings)
          self.cursor = self.conn.cursor()
        
      def exec_sql(self, sql, fetch_rows=False, commit=True, raise_exc=False):
          try:
              self.cursor.execute(sql)
              if commit:
                  self.conn.commit()
          except Exception as exc:
              logger.exception('exe_sql got exception: {}'.format(str(exc)))
              if raise_exc:
                  raise
              if commit:
                  self.conn.rollback()

          if fetch_rows:
              rows = self.cursor.fetchall()
              return rows

这样之后想要查询mysql时,只需要import lib_mysql即可。例如:

>>> import lib_mysql
>>> client = lib_mysql.Client(url='mysql://test:1qaz@WSX@localhost:3306/test?charset=utf8')
>>> sql = '''select * from conn_pool_test'''
>>> record = client.exec_sql(sql,fetch_rows=True)
>>> type(record)
<class 'list'>
>>> record
[{'id': 1, 'app_id': '001', 'app_name': 'android_test'}, 
 {'id': 2, 'app_id': '002', 'app_name': 'ios_test'}, 
 {'id': 3, 'app_id': '003', 'app_name': 'web_test'}, 
 {'id': 4, 'app_id': '004', 'app_name': 'mac_test'}, 
 {'id': 5, 'app_i

同样地,可以在工具类中封装插入方法,例如:

def upsert_to_mysql(self, items, table, fields):
        try:
            fields_str = ','.join(fields)
            replace_str = ','.join(['%s' for _ in range(len(fields))])
            fields_update_str = ','.join(['{}=values({})'.format(field, field) for field in fields])
            sql = '''
            insert ignore into {table} ( {fields} )
            values ({replace_str})
            on duplicate key update {fields_update}
            '''.format(table=table, replace_str=replace_str, fields=fields_str, fields_update=fields_update_str)
            records = []
            for item in items:
                row = tuple([item[field] for field in fields])
                records.append(row)
            self.cursor.executemany(sql, records)
            self.conn.commit()
        except Exception as e:
            print(e)
            traceback.print_exc()
            self.conn.rollback()

调用插入方法:

>>> fields = ['id', 'app_id' ,'app_name']
>>> data = [{'id': 6, 'app_id': '006', 'app_name': 'Solor'}, {'id': 7, 'app_id': '007', 'app_name': 'Symbian'}]
>>> client.upsert_to_mysql(table='conn_pool_test', fields=fields, records=data)
total 2 records
processed 2 records

MySQL连接池

如果每一次连接数据库只是做了简单操作,然后反复连接断连。如果短时间内连接次数过多,会给数据库带来压力。因此,可以采用连接池的方式,让连接重复使用,降低数据库资源消耗。

这里介绍采用pymysql和DBUtils实现mysql连接池的方式。

import pymysql
from dbutils.pooled_db import PooledDB, SharedDBConnection
from urllib.parse import urlparse
class MysqlPool(object):

    def __init__(self, url):
        self.url = url
        if not url:
            return
        url = urlparse(url)
        self.POOL = PooledDB(
            creator=pymysql, 
            maxconnections=10, # 连接池的最大连接数 
            maxcached=10, 
            maxshared=10,
            blocking=True,  
            setsession=[],  
            host=url.hostname,
            port=url.port or 3306,
            user=url.username,
            password=url.password,
            database=url.path.strip().strip('/'),
            charset='utf8',
        )
    def __new__(cls, *args, **kw):
        if not hasattr(cls, '_instance'):
            cls._instance = object.__new__(cls)
        return cls._instance

    def connect(self):
        conn = self.POOL.connection()
        cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
        return conn, cursor

    def connect_close(self,conn, cursor):
        cursor.close()
        conn.close()

    def fetch_all(self,sql, args):
        conn, cursor = self.connect()
        if args is None:
            cursor.execute(sql)
        else:
            cursor.execute(sql, args)
        record_list = cursor.fetchall()
        return record_list

    def fetch_one(self,sql, args):
        conn, cursor = self.connect()
        cursor.execute(sql, args)
        result = cursor.fetchone()
        return result

    def insert(self,sql, args):
        conn, cursor = self.connect()
        row = cursor.execute(sql, args)
        conn.commit()
        self.connect_close(conn, cursor)
        return row

https://zhuanlan.zhihu.com/p/344642744

Go