python操作pgsql备份还原
作者:互联网
python操作pgsql备份还原
common.py
#!/usr/bin/env python
# encoding: utf-8
import argparse
dbs = [
"test1",
"test2"
]
# ct: pgsql容器的名称
ct = "pg"
# 环境主机地址
host = {
"dev": "ip地址1",
"test": "ip地址2"
}
# 环境pg端口
port = {
"dev": 5432,
"test": 5432
}
# 数据库账号密码
password = {
"user1": "passwd1",
"user2": "passwd2"
}
def get_parser():
parser = argparse.ArgumentParser(description='Krum')
parser.add_argument('-e', required=True, choices=['dev', 'test'])
return parser
database_back.py
#!/usr/bin/env python
# encoding: utf-8
import os
import time
from datetime import datetime
from common import *
def backup_postgresql(env="test", backup_dir="."):
cmd = f'mkdir -p {backup_dir}'
print(cmd)
os.system(cmd)
for db in dbs:
cmd = f'docker exec -i {ct} bash -c '
cmd += f'"export PGPASSWORD={password[env]} && pg_dump -h {host[env]} -Uroot {db} " > {backup_dir}/{db}.sql;'
print(cmd)
os.system(cmd)
def backup_mysql(env="test", backup_dir="."):
cmd = f'mkdir -p {backup_dir}'
print(cmd)
os.system(cmd)
db = "lts"
ct = "mysql"
cmd = f'docker exec -i {ct} '
cmd += f'mysqldump -h {host[env]} -uroot -p{password[env]} {db} > {backup_dir}/{db}.sql;'
print(cmd)
os.system(cmd)
if __name__ == '__main__':
parser = get_parser()
args = parser.parse_args()
env = args.e
now = datetime.now().strftime("%Y%m%d")
now_more = datetime.now().strftime("%Y%m%d%H%M%S")
backup_dir = f'./backup/{env}/{env}_{now}'
backup_dir_more = f'./backup/{env}/{env}_{now_more}'
backup_postgresql(env=env, backup_dir=backup_dir)
backup_mysql(env=env, backup_dir=backup_dir)
cmd = f'tar -zcvf {backup_dir_more}.tar.gz {backup_dir}'
#cmd = f'cp -r {backup_dir} {backup_dir_more}'
print(cmd)
os.system(cmd)
备份用法:
python3 backup_db.py -e prod
import_db.py
#!/usr/bin/env python
# encoding: utf-8
from email.policy import default
import os
from pydoc import describe
import time
from datetime import datetime
import argparse
# 清理旧数据库并创建新的数据库
def clear_db(db, ct="postgres"):
if not db:
print("db=None")
exit()
cmd = f'docker exec {ct} bash -c '
cmd += f'"export PGPASSWORD={pgpd}; psql -Uroot -w -h {tghost} -p {tgport} -c '
cmd += f"\\\"SELECT pg_terminate_backend(pg_stat_activity.pid) FROM pg_stat_activity WHERE datname='{db}' AND pid<>pg_backend_pid();\\\"\""
os.system(cmd)
print(f"[+] DATABASE {db}")
cmd = f'docker exec {ct} bash -c '
cmd += f'"export PGPASSWORD={pgpd}; psql -Uroot -w -h {tghost} -p {tgport} -c '
cmd += f'\'DROP DATABASE {db};\'"'
os.system(cmd)
cmd = f'docker exec {ct} bash -c '
cmd += f'"export PGPASSWORD={pgpd}; psql -Uroot -w -h {tghost} -p {tgport} -c '
cmd += f'\'CREATE DATABASE {db};\'"'
os.system(cmd)
# 导入数据
def import_pgsql(db, ct='postgres'):
print(f"DBNAME: {db} TARGET: {tghost}")
clear_db(db)
cmd = f'docker exec -i {ct} bash -c "export PGPASSWORD={pgpd} && psql -o /dev/null -q -h {tghost} -p {tgport} -Uroot -w {db}" < {dpath}/{db}.sql'
os.system(cmd)
print()
print("[+] IMPORT DB SUCCESS!")
print()
def import_db_all(dbname):
print(f"TARGET: {tghost} PORT: {tgport}")
confirm = input("please confirm this information[yes/no]:")
if confirm != 'yes' and confirm != 'y':
print("Not confirm and Exit...")
exit()
if not dbname:
print("\033[31m未获取到数据库文件,检查路径:'" + dpath + "' 是否正确\033[0m")
exit()
for db in dbname:
import_pgsql(db)
def get_parser():
parser = argparse.ArgumentParser(description='import database')
parser.add_argument('-d', required=True)
parser.add_argument('-tg', required=True)
parser.add_argument('-tgp', required=False,default=5432)
return parser
if __name__ == "__main__":
parser = get_parser()
args = parser.parse_args()
dpath = args.d
tghost = args.tg
tgport = args.tgp
list_file = os.listdir(dpath)
pgpd = 'dbpasswd'
dbname = []
for filename in list_file:
if 'lts' not in filename and 'sql' in list_file:
dbname.append(filename.split('.')[0])
print('-d:sql文件存放路径;-tg: 导入主机;-tgp:导入主机端口')
import_db_all(dbname)
导入用法:
python3 import_db.py -d:sql文件存放路径;-tg: 导入主机;-tgp:导入主机端口
标签:python,备份,cmd,db,pgsql,env,import,backup,dir 来源: https://www.cnblogs.com/pgy674477230/p/16351172.html