import subprocess
import logging
import os
DEFAULT_PG_PORT = '5432'
def get_database_list(host: str, username: str, password: str, port: str = DEFAULT_PG_PORT) -> list:
"""Получить список базданных сервера
Args:
host (str): Адрес сервера
username (str): Имя пользователя
password (str): Пароль
port (str): Порт сервера
Raises:
Exception: Ошибки при подключении к PG
Returns:
list: Список баз данных
"""
sql_command_select_databases = 'SELECT datname FROM pg_database;'
result = subprocess.run(
[
'psql',
'-h', host,
'-p', port,
'-U', username,
'--tuples-only',
'-c', sql_command_select_databases
],
encoding='UTF-8',
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env={'PGPASSWORD': password},
)
if result.returncode != 0:
raise Exception(f'Get DB list: {host} {result.stderr}')
else:
dbnames = result.stdout.split('\n')
dbnames = list(map(lambda dbname: dbname.strip(), dbnames))
dbnames = list(dbname for dbname in dbnames if dbname != '')
logging.info(f'{host} dbnames: {",".join(dbnames)}')
return dbnames
def backup_database(host: str, username: str, password: str, dbname: str, filename: str, port: str = DEFAULT_PG_PORT):
"""Создать резервную копию базы данных в custom формате с включением больших объектов
Args:
host (str): Адрес сервера
username (str): Имя пользователя
password (str): Пароль
dbname (str): Имя базы данных
filename (str): Имя файла приемника
port (str): Порт сервера
Raises:
Exception: Ошибки при подключении к PG
"""
logging.info(f'backup {dbname} from {host}')
result = subprocess.run([
'pg_dump', dbname,
'-h', host,
'-p', port,
'-U', username,
'--format=c', '--blobs',
'--file', filename
],
encoding='UTF-8',
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
env={'PGPASSWORD': password},
)
if result.returncode != 0:
raise Exception(f'Backup DB + {result.stderr}')
else:
file_stats = os.stat(filename)
logging.info(
f'Backup DB: end {dbname} size {round(file_stats.st_size / (1024 * 1024))}MB')
def drop_database(host: str, username: str, password: str, dbname: str, port: str = DEFAULT_PG_PORT):
"""Удаляет базу данных с предвартельным отключением соединений
Args:
host (str): Адрес сервера
username (str): Имя пользователя
password (str): Пароль
dbname (str): Имя базы данных
port (str): Порт сервера
Raises:
Exception: Ошибки при подключении к PG
"""
logging.info(f'Drop DB: {dbname} to {host}')
sql_command_drop_database = f" \
UPDATE pg_database SET datallowconn = 'false' WHERE datname = '{dbname}'; \
SELECT pg_terminate_backend(pg_stat_activity.pid) \
FROM pg_stat_activity \
WHERE pg_stat_activity.datname = '{dbname}' AND pid <> pg_backend_pid();"
result = subprocess.run(
['psql',
'-h', host,
'-p', port,
'-U', username,
'-c', sql_command_drop_database,
],
encoding='UTF-8',
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
env={'PGPASSWORD': password},
)
if result.returncode != 0:
raise Exception(f'Drop DB: {result.stderr}')
result = subprocess.run(
['psql',
'-h', host,
'-p', port,
'-U', username,
'-c', f"DROP DATABASE {dbname};",
],
encoding='UTF-8',
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
env={'PGPASSWORD': password},
)
if result.returncode != 0:
raise Exception(f'Drop DB: {result.stderr}')
def create_database(host: str, username: str, password: str, dbname: str, port: str = DEFAULT_PG_PORT):
"""Создает базу данных
Args:
host (str): Адрес сервера
username (str): Имя пользователя
password (str): Пароль
dbname (str): Имя базы данных
port (str): Порт сервера
Raises:
Exception: Ошибки при подключении к PG
"""
logging.info(f'Create DB: {dbname} to {host}')
sql_command_create_database = f'CREATE DATABASE {dbname}'
result = subprocess.run(
['psql',
'-h', host,
'-p', port,
'-U', username,
'-d', 'postgres',
'-c', sql_command_create_database,
],
encoding='UTF-8',
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
env={'PGPASSWORD': password},
)
if result.returncode != 0:
raise Exception(f'Create DB: {result.stderr}')
def restore_database(host: str, username: str, password: str, dbname: str, filename: str, port: str = DEFAULT_PG_PORT):
"""Восстанавливает БД из файла источника
Args:
host (str): Адрес сервера
username (str): Имя пользователя
password (str): Пароль
dbname (str): Имя базы данных
filename (str): Имя файла источника
port (str): Порт сервера
Raises:
Exception: Ошибки при подключении к PG
"""
logging.info(f'Restore DB: {dbname} to {host}')
result = subprocess.run([
'pg_restore', filename,
'-h', host,
'-p', port,
'-U', username,
'--dbname', dbname,
],
encoding='UTF-8',
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
env={'PGPASSWORD': password},
)
if result.returncode != 0:
raise Exception(f'Restore DB: {result.stderr}')
else:
logging.info(f'Restore DB: {dbname} end')