我孤身走在路上, 石子在雾中发亮,夜很安静,荒原面对太空,星星互诉衷肠
用Python定时备份Mysql
用Python定时备份Mysql

用Python定时备份Mysql

我们经常会涉及备份mysql,下面有几种用python定时备份的几份代码,代码分成3种,分别是单机指定单库的,单机多库的,多机多库的,弄好以后cron里添加下自动任务就可以了

  • 单机指定数据库的
#!/usr/bin/python
# -*- coding: UTF-8 -*-

# 导入所需模块
import warnings
import pymysql
import os
import time
import logging

# 数据库连接信息

mysql_host = "数据库地址"
mysql_user = "数据库用户"
mysql_pwd = "密码"
mysql_port = 3306
mysql_charset = "utf8"

# 备份时间和路径
new_date = time.strftime("%Y%m%d%H%M%S")
back_path = "/home/dataBasebak/wordpress/"  # 指定备份路径

# 日志路径
log_path = "/var/log/mysql_backup/"
if not os.path.exists(log_path):
    os.makedirs(log_path)

# 设置日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# 设置日志文件路径
handler_success = logging.FileHandler(log_path + 'success.log')
handler_success.setLevel(logging.INFO)

handler_error = logging.FileHandler(log_path + 'error.log')
handler_error.setLevel(logging.ERROR)

# 设置日志格式
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler_success.setFormatter(formatter)
handler_error.setFormatter(formatter)

#添加日志处理器
logger.addHandler(handler_success)
logger.addHandler(handler_error)

try:
    # 如果备份路径不存在,则创建
    if not os.path.exists(back_path):
        os.makedirs(back_path)

    # 连接数据库
    conn = pymysql.connect(
        host=mysql_host,
        user=mysql_user,
        passwd=mysql_pwd,
        db='mysql',
        port=mysql_port,
        charset=mysql_charset
    )
    cur = conn.cursor()

    # 指定备份的数据库名
    db_name = "wordpress"
    logger.info(f'当前的数据库名是:{db_name}')

    # 指定备份文件的路径和名称
    path = back_path + db_name + new_date + ".sql"
    logger.info(f'备份文件的路径和名称是:{path}')

    # 执行备份命令
    os.system("mysqldump -h%s -u%s -p%s %s > %s" % (mysql_host, mysql_user, mysql_pwd, db_name, path))
    logger.info('备份成功')

    cur.close()  # 关闭游标
    conn.close()  # 释放数据库资源

except Exception as e:
    logger.error("备份失败", exc_info=True)
  • 单机多库的
#!/usr/bin/python
# -*- coding: UTF-8 -*-

# 导入所需模块
import warnings
import pymysql
import os
import time
import logging

# 数据库连接信息
mysql_host = "数据库地址"
mysql_user = "数据库用户"
mysql_pwd = "密码"
mysql_port = 3306
mysql_charset = "utf8"

# 备份时间和路径
new_date = time.strftime("%Y%m%d%H%M%S")
back_path = "/home/dataBasebak/"  

# 日志路径
log_path = "/var/log/mysql_backup/"
if not os.path.exists(log_path):
    os.makedirs(log_path)

# 设置日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# 设置日志文件路径
handler_success = logging.FileHandler(log_path + 'success.log')
handler_success.setLevel(logging.INFO)

handler_error = logging.FileHandler(log_path + 'error.log')
handler_error.setLevel(logging.ERROR)

# 设置日志格式
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler_success.setFormatter(formatter)
handler_error.setFormatter(formatter)

# 添加日志处理器
logger.addHandler(handler_success)
logger.addHandler(handler_error)

try:
    # 如果备份路径不存在,则创建
    if not os.path.exists(back_path):
        os.makedirs(back_path)

    # 连接数据库
    conn = pymysql.connect(
        host=mysql_host,
        user=mysql_user,
        passwd=mysql_pwd,
        db='mysql',
        port=mysql_port,
        charset=mysql_charset
    )
    cur = conn.cursor()
    cur.execute('show databases')  # 查询出所有数据库
    data = cur.fetchall()  # 查询出来,并赋值 data

    for db_names in data:
        for db_name in db_names:
            if db_name in ['information_schema', 'performance_schema', 'mysql']:
                continue
            if not os.path.exists(back_path + db_name):
                os.makedirs(back_path + db_name)
            path = back_path + db_name + "/" + new_date + ".sql"  # 数据库备份路径
            logger.info(f'备份文件的路径和名称是:{path}')
            os.system("mysqldump -h%s -u%s -p%s %s > %s" % (mysql_host, mysql_user, mysql_pwd, db_name, path))
            logger.info('备份成功')

    cur.close()  # 关闭游标
    conn.close()  # 释放数据库资源

except Exception as e:
    logger.error("备份失败", exc_info=True)
  • 多机多库就用这个
# coding=utf-8
"""
@Time : 2022/10/25 17:16
@Author : Taering
@File :back_mysql_many_servers.py
@IDE :PyCharm
@DESC :
"""


import pymysql
import os
import time
import logging

# 数据库连接信息
mysql_servers = [
    {
        "host": "数据库A的地址",
        "user": "数据库A的用户名",
        "pwd": "数据库A的密码",
        "port": 3306,
        "charset": "utf8"
    },
    {
        "host": "数据库B的地址",
        "user": "数据库B的用户名",
        "pwd": "数据库B的密码",
        "port": 3306,
        "charset": "utf8"
    },
]

# 备份时间和路径
new_date = time.strftime("%Y%m%d%H%M%S")
back_path = "/SqlData/dataBasebak/"

# 日志路径
log_path = "/var/log/mysql_backup/"
if not os.path.exists(log_path):
    os.makedirs(log_path)

# 设置日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# 设置日志文件路径
handler_success = logging.FileHandler(log_path + 'success.log')
handler_success.setLevel(logging.INFO)

handler_error = logging.FileHandler(log_path + 'error.log')
handler_error.setLevel(logging.ERROR)

# 设置日志格式
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler_success.setFormatter(formatter)
handler_error.setFormatter(formatter)

# 添加日志处理器
logger.addHandler(handler_success)
logger.addHandler(handler_error)

for server in mysql_servers:
    try:
        # 如果备份路径不存在,则创建
        if not os.path.exists(back_path + server['host']):
            os.makedirs(back_path + server['host'])

        # 连接数据库
        conn = pymysql.connect(
            host=server['host'],
            user=server['user'],
            passwd=server['pwd'],
            db='mysql',
            port=server['port'],
            charset=server['charset']
        )
        cur = conn.cursor()
        cur.execute('show databases')  # 查询出所有数据库
        data = cur.fetchall()  # 查询出来,并赋值 data

        for db_names in data:
            for db_name in db_names:
                logger.info(f'当前的数据库名是:{db_name}')
                if db_name in ['information_schema', 'performance_schema', 'mysql', 'sys']:
                    continue
                if not os.path.exists(back_path + server['host'] + "/" + db_name):
                    os.makedirs(back_path + server['host'] + "/" + db_name)
                path = back_path + server['host'] + "/" + db_name + "/" + new_date + ".sql"  # 数据库备份路径
                print(path)

                logger.info(f'备份文件的路径和名称是:{path}')
                os.system(
                    "mysqldump -h%s -u%s -p%s %s > %s" % (server['host'], server['user'], server['pwd'], db_name, path))
                logger.info('备份成功')

        cur.close()  # 关闭游标
        conn.close()  # 释放数据库资源

    except Exception as e:
        logger.error("备份失败", exc_info=True)

# 删除7天前的备份,自己看着改删除时间,或者不删除也可以
for server in mysql_servers:
    try:
        # 获取备份路径
        path = back_path + server['host']
        # 获取所有数据库目录
        db_dirs = os.listdir(path)
        for db_dir in db_dirs:
            # 获取数据库备份目录
            db_path = os.path.join(path, db_dir)
            # 获取所有备份文件
            files = os.listdir(db_path)
            for file in files:
                # 获取备份文件完整路径
                file_path = os.path.join(db_path, file)
                # 获取备份文件创建时间
                file_ctime = os.path.getctime(file_path)
                # 获取当前时间
                now_time = time.time()
                # 如果备份文件创建时间超过7天,删除备份文件
                if now_time - file_ctime > 7 * 24 * 60 * 60:
                    os.remove(file_path)
                    logger.info(f'删除7天前的备份文件:{file_path}')
    except Exception as e:
        logger.error("删除7天前的备份失败", exc_info=True)

远程调用,或者在mysql本地都可以,最好的方法是开一个只读的账号只允许特定IP链接,然后每天进行备份,这样安全一些,即便是主站挂了也是损失有限,如果你用主从的话是实时的,那样更好一些

你要是觉得没有时效性,就去看看mysql主从同步和半同步

一条评论

  1. Pingback:Mysql主从同步 - 无双的个人博客

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

− 1 = 2