我们经常会涉及备份mysql,下面有几种用python定时备份的几份代码,代码分成3种,分别是单机指定单库的,单机多库的,多机多库的,弄好以后cron里添加下自动任务就可以了
- 单机指定数据库的
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# 导入所需模块
import warnings
import pymysql
import os
import time
import logging
# 数据库连接信息
mysql_host = "数据库地址"
mysql_user = "数据库用户"
mysql_pwd = "密码"
mysql_port = 3306
mysql_charset = "utf8"
# 备份时间和路径
new_date = time.strftime("%Y%m%d%H%M%S")
back_path = "/home/dataBasebak/wordpress/" # 指定备份路径
# 日志路径
log_path = "/var/log/mysql_backup/"
if not os.path.exists(log_path):
os.makedirs(log_path)
# 设置日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# 设置日志文件路径
handler_success = logging.FileHandler(log_path + 'success.log')
handler_success.setLevel(logging.INFO)
handler_error = logging.FileHandler(log_path + 'error.log')
handler_error.setLevel(logging.ERROR)
# 设置日志格式
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler_success.setFormatter(formatter)
handler_error.setFormatter(formatter)
#添加日志处理器
logger.addHandler(handler_success)
logger.addHandler(handler_error)
try:
# 如果备份路径不存在,则创建
if not os.path.exists(back_path):
os.makedirs(back_path)
# 连接数据库
conn = pymysql.connect(
host=mysql_host,
user=mysql_user,
passwd=mysql_pwd,
db='mysql',
port=mysql_port,
charset=mysql_charset
)
cur = conn.cursor()
# 指定备份的数据库名
db_name = "wordpress"
logger.info(f'当前的数据库名是:{db_name}')
# 指定备份文件的路径和名称
path = back_path + db_name + new_date + ".sql"
logger.info(f'备份文件的路径和名称是:{path}')
# 执行备份命令
os.system("mysqldump -h%s -u%s -p%s %s > %s" % (mysql_host, mysql_user, mysql_pwd, db_name, path))
logger.info('备份成功')
cur.close() # 关闭游标
conn.close() # 释放数据库资源
except Exception as e:
logger.error("备份失败", exc_info=True)
- 单机多库的
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# 导入所需模块
import warnings
import pymysql
import os
import time
import logging
# 数据库连接信息
mysql_host = "数据库地址"
mysql_user = "数据库用户"
mysql_pwd = "密码"
mysql_port = 3306
mysql_charset = "utf8"
# 备份时间和路径
new_date = time.strftime("%Y%m%d%H%M%S")
back_path = "/home/dataBasebak/"
# 日志路径
log_path = "/var/log/mysql_backup/"
if not os.path.exists(log_path):
os.makedirs(log_path)
# 设置日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# 设置日志文件路径
handler_success = logging.FileHandler(log_path + 'success.log')
handler_success.setLevel(logging.INFO)
handler_error = logging.FileHandler(log_path + 'error.log')
handler_error.setLevel(logging.ERROR)
# 设置日志格式
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler_success.setFormatter(formatter)
handler_error.setFormatter(formatter)
# 添加日志处理器
logger.addHandler(handler_success)
logger.addHandler(handler_error)
try:
# 如果备份路径不存在,则创建
if not os.path.exists(back_path):
os.makedirs(back_path)
# 连接数据库
conn = pymysql.connect(
host=mysql_host,
user=mysql_user,
passwd=mysql_pwd,
db='mysql',
port=mysql_port,
charset=mysql_charset
)
cur = conn.cursor()
cur.execute('show databases') # 查询出所有数据库
data = cur.fetchall() # 查询出来,并赋值 data
for db_names in data:
for db_name in db_names:
if db_name in ['information_schema', 'performance_schema', 'mysql']:
continue
if not os.path.exists(back_path + db_name):
os.makedirs(back_path + db_name)
path = back_path + db_name + "/" + new_date + ".sql" # 数据库备份路径
logger.info(f'备份文件的路径和名称是:{path}')
os.system("mysqldump -h%s -u%s -p%s %s > %s" % (mysql_host, mysql_user, mysql_pwd, db_name, path))
logger.info('备份成功')
cur.close() # 关闭游标
conn.close() # 释放数据库资源
except Exception as e:
logger.error("备份失败", exc_info=True)
- 多机多库就用这个
# coding=utf-8
"""
@Time : 2022/10/25 17:16
@Author : Taering
@File :back_mysql_many_servers.py
@IDE :PyCharm
@DESC :
"""
import pymysql
import os
import time
import logging
# 数据库连接信息
mysql_servers = [
{
"host": "数据库A的地址",
"user": "数据库A的用户名",
"pwd": "数据库A的密码",
"port": 3306,
"charset": "utf8"
},
{
"host": "数据库B的地址",
"user": "数据库B的用户名",
"pwd": "数据库B的密码",
"port": 3306,
"charset": "utf8"
},
]
# 备份时间和路径
new_date = time.strftime("%Y%m%d%H%M%S")
back_path = "/SqlData/dataBasebak/"
# 日志路径
log_path = "/var/log/mysql_backup/"
if not os.path.exists(log_path):
os.makedirs(log_path)
# 设置日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# 设置日志文件路径
handler_success = logging.FileHandler(log_path + 'success.log')
handler_success.setLevel(logging.INFO)
handler_error = logging.FileHandler(log_path + 'error.log')
handler_error.setLevel(logging.ERROR)
# 设置日志格式
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler_success.setFormatter(formatter)
handler_error.setFormatter(formatter)
# 添加日志处理器
logger.addHandler(handler_success)
logger.addHandler(handler_error)
for server in mysql_servers:
try:
# 如果备份路径不存在,则创建
if not os.path.exists(back_path + server['host']):
os.makedirs(back_path + server['host'])
# 连接数据库
conn = pymysql.connect(
host=server['host'],
user=server['user'],
passwd=server['pwd'],
db='mysql',
port=server['port'],
charset=server['charset']
)
cur = conn.cursor()
cur.execute('show databases') # 查询出所有数据库
data = cur.fetchall() # 查询出来,并赋值 data
for db_names in data:
for db_name in db_names:
logger.info(f'当前的数据库名是:{db_name}')
if db_name in ['information_schema', 'performance_schema', 'mysql', 'sys']:
continue
if not os.path.exists(back_path + server['host'] + "/" + db_name):
os.makedirs(back_path + server['host'] + "/" + db_name)
path = back_path + server['host'] + "/" + db_name + "/" + new_date + ".sql" # 数据库备份路径
print(path)
logger.info(f'备份文件的路径和名称是:{path}')
os.system(
"mysqldump -h%s -u%s -p%s %s > %s" % (server['host'], server['user'], server['pwd'], db_name, path))
logger.info('备份成功')
cur.close() # 关闭游标
conn.close() # 释放数据库资源
except Exception as e:
logger.error("备份失败", exc_info=True)
# 删除7天前的备份,自己看着改删除时间,或者不删除也可以
for server in mysql_servers:
try:
# 获取备份路径
path = back_path + server['host']
# 获取所有数据库目录
db_dirs = os.listdir(path)
for db_dir in db_dirs:
# 获取数据库备份目录
db_path = os.path.join(path, db_dir)
# 获取所有备份文件
files = os.listdir(db_path)
for file in files:
# 获取备份文件完整路径
file_path = os.path.join(db_path, file)
# 获取备份文件创建时间
file_ctime = os.path.getctime(file_path)
# 获取当前时间
now_time = time.time()
# 如果备份文件创建时间超过7天,删除备份文件
if now_time - file_ctime > 7 * 24 * 60 * 60:
os.remove(file_path)
logger.info(f'删除7天前的备份文件:{file_path}')
except Exception as e:
logger.error("删除7天前的备份失败", exc_info=True)
远程调用,或者在mysql本地都可以,最好的方法是开一个只读的账号只允许特定IP链接,然后每天进行备份,这样安全一些,即便是主站挂了也是损失有限,如果你用主从的话是实时的,那样更好一些
你要是觉得没有时效性,就去看看mysql主从同步和半同步
Pingback:Mysql主从同步 - 无双的个人博客