관리-도구
편집 파일: backup_systems.py
import asyncio import functools import logging from datetime import timezone from typing import Callable, Dict, List, Optional from defence360agent.contracts.config import ( ACRONIS, ANTIVIRUS_MODE, AcronisBackup as AcronisBackupConfig, BackupConfig, BackupRestore, CLOUDLINUX, CLOUDLINUX_ON_PREMISE, CLUSTERLOGICS, CPANEL, Core, DIRECTADMIN, PLESK, R1SOFT, SAMPLE_BACKEND, ) from defence360agent.contracts.license import LicenseCLN from defence360agent.internals.cln import BackupNotFound, RestCLN from defence360agent.subsys.panels.cpanel.panel import cPanel from defence360agent.subsys.panels.directadmin.panel import DirectAdmin from defence360agent.subsys.panels.plesk.panel import Plesk if not ANTIVIRUS_MODE: from restore_infected import backup_backends from restore_infected.backup_backends.acronis import BackupFailed from restore_infected.backup_backends_lib import ( BackendNonApplicableError, BackendNotAuthorizedError, ) logger = logging.getLogger(__name__) def get_backend(name): try: return _get_avalible_backends(include_sample=True)[name]() except (KeyError, BackendNonApplicableError) as e: raise ValueError("Backup system is not available: {}".format(name)) def get_available_backends_names() -> List[str]: names = [] # Don't list the CL Backup as available for selection for name, cls in _get_avalible_backends(include_cl=False).items(): try: cls() except BackendNonApplicableError: pass else: names.append(name) return names def _get_avalible_backends( include_sample=False, include_cl=True, ) -> Dict[str, Callable]: backends = { ACRONIS: Acronis, R1SOFT: R1Soft, # https://cloudlinux.atlassian.net/browse/DEF-8806 # CLUSTERLOGICS: ClusterLogics, } if BackupRestore.CL_BACKUP_ALLOWED and include_cl: backends[CLOUDLINUX] = CloudLinux if BackupRestore.CL_ON_PREMISE_BACKUP_ALLOWED: backends[CLOUDLINUX_ON_PREMISE] = CloudLinuxOnPremise if cPanel.is_installed(): backends[CPANEL] = cPanelBackup elif Plesk.is_installed(): backends[PLESK] = PleskBackup elif DirectAdmin.is_installed(): backends[DIRECTADMIN] = DirectAdminBackup if include_sample: backends[SAMPLE_BACKEND] = Sample return backends def get_current_backend() -> Optional[str]: conf = BackupConfig().config_to_dict().get("BACKUP_SYSTEM", {}) return conf.get("enabled") and conf.get("backup_system") async def get_last_backup_timestamp() -> Optional[int]: backend = get_current_backend() if not backend: return None backend_instance = get_backend(backend) # type: BackupSystem return await backend_instance.get_last_backup_timestamp() def transactional(f): async def wrapper(cls, *args, **kwargs): ok = False try: rv = await f(cls, *args, **kwargs) ok = True finally: cls._update_backups_config(enabled=ok) return rv return wrapper class BackupException(Exception): pass class BackupSystem: def __init__(self, name, log_path=None): self.name = name self.log_path = log_path def _update_backups_config(self, enabled): new_conf = { "BACKUP_SYSTEM": { "enabled": enabled, "backup_system": self.name if enabled else None, } } BackupConfig().dict_to_config(new_conf, overwrite=True, validate=True) async def init(self, *args, **kwargs): self._update_backups_config(enabled=True) async def disable(self, delete_backups=False): self._update_backups_config(enabled=False) async def check(self): return {} async def show(self): return {} async def make_backup(self): pass async def check_state(self) -> bool: conf = BackupConfig().config_to_dict().get("BACKUP_SYSTEM", {}) return conf.get("enabled") and conf.get("backup_system") == self.name async def get_last_backup_timestamp(self) -> Optional[int]: return None class PleskBackup(BackupSystem): def __init__(self): super().__init__(PLESK) class cPanelBackup(BackupSystem): def __init__(self): super().__init__(CPANEL) class DirectAdminBackup(BackupSystem): def __init__(self): super().__init__(DIRECTADMIN) class R1Soft(BackupSystem): def __init__(self): super().__init__(R1SOFT) self.backend = backup_backends.backend("r1soft", async_=True) async def show(self) -> dict: info_data = await self.backend.info() return { k: v for k, v in info_data.items() if k in ("username", "timestamp", "ip") } @transactional async def init(self, ip, username, password, encryption_key, **kwargs): await self.backend.init(ip, username, password, encryption_key) class ClusterLogics(BackupSystem): def __init__(self): super().__init__(CLUSTERLOGICS) self.backend = backup_backends.backend(CLUSTERLOGICS, async_=True) async def show(self) -> dict: info_data = await self.backend.info() return { k: v for k, v in info_data.items() if k in ("username", "url", "apikey") } @transactional async def init(self, **kwargs): # 'force' argument (for arconis only) has default value # also, need to use default value for 'url', # assigned inside backend.init del kwargs["force"] await self.backend.init(**kwargs) class Sample(BackupSystem): def __init__(self): super().__init__(SAMPLE_BACKEND) self.backend = backup_backends.backend(self.name, async_=True) class Acronis(BackupSystem): def __init__(self): super().__init__( ACRONIS, "/var/log/%s/%s" % (Core.PRODUCT, AcronisBackupConfig.LOG_NAME), ) self.backend = backup_backends.backend(self.name, async_=True) async def show(self) -> dict: info_data = await self.backend.info() return { k: v for k, v in info_data.items() if k in ("username", "timestamp") } @transactional async def init(self, username, password, force=False, **kwargs): provision = not await self.backend.is_agent_installed() await self.backend.init( username, password, provision=provision, force=force, tmp_dir=Core.TMPDIR, ) async def _list_backups(self, until=None): return await self.backend.backups(until) async def get_last_backup_timestamp(self) -> Optional[int]: backups = await self._list_backups() if backups: return int( max( backup.created.replace(tzinfo=timezone.utc).timestamp() for backup in backups ) ) return None async def check_state(self) -> bool: """if backup exists, than state OK""" try: return bool(await self._list_backups()) except (asyncio.CancelledError, BackendNotAuthorizedError): raise except Exception: logger.exception("Error during checking state") return False class CloudLinuxBase(Acronis): async def show(self) -> dict: info_data = await self.backend.info() info_data["backup_space_used_bytes"] = info_data.pop("usage") info_data["login_url"] = await self.backend.login_url() return info_data async def make_backup(self): logger.info("Making backup") try: await self.backend.make_initial_backup_strict() except BackupFailed as e: logging.exception("CloudLinux backup failed") raise BackupException( str(e) if len(e.args) and e.args[0] else "BackupFailed" ) async def get_backup_progress(self) -> Optional[int]: return await self.backend.get_backup_progress() async def init(self, username, password, force=False, **kwargs): logger.info("Starting %s init" % self.name) provision = not await self.backend.is_agent_installed() await self.backend.init( username, password, provision=provision, force=force, tmp_dir=Core.TMPDIR, ) class CloudLinux(CloudLinuxBase): PAID, UNPAID = "paid", "unpaid" def __init__(self): super().__init__() self.name = CLOUDLINUX @transactional async def init(self, force=False, **kwargs): credentials = await RestCLN.acronis_credentials( server_id=LicenseCLN.get_server_id() ) await super().init( credentials["login"], credentials["password"], force=force, ) class Decorators: @staticmethod def update_credentials_on_unauthorized_error(f): @functools.wraps(f) async def wrapped(self, *args, **kwargs): try: return await f(self, *args, **kwargs) except BackendNotAuthorizedError: await self.init(force=True) return await f(self, *args, **kwargs) return wrapped @Decorators.update_credentials_on_unauthorized_error async def show(self) -> dict: info_data = await super().show() # FIXME: raise exception when server_id is None response = await RestCLN.acronis_check( server_id=LicenseCLN.get_server_id() ) purchased_backup_gb = response.get("size", 0) resize_url = response.get("url", None) info_data["purchased_backup_gb"] = purchased_backup_gb info_data["resize_url"] = resize_url return info_data @Decorators.update_credentials_on_unauthorized_error async def make_backup(self): await super().make_backup() @Decorators.update_credentials_on_unauthorized_error async def get_backup_progress(self) -> Optional[int]: return await super().get_backup_progress() @Decorators.update_credentials_on_unauthorized_error async def get_last_backup_timestamp(self) -> Optional[int]: return await super().get_last_backup_timestamp() @Decorators.update_credentials_on_unauthorized_error async def check_state(self) -> bool: return await super().check_state() async def check(self) -> dict: try: content = await RestCLN.acronis_check( server_id=LicenseCLN.get_server_id() ) except BackupNotFound as e: return {"status": self.UNPAID, "url": e.add_used_space()} return {"status": self.PAID, "size": content.get("size")} async def disable(self, delete_backups=False): await super().disable() if delete_backups: await RestCLN.acronis_remove(server_id=LicenseCLN.get_server_id()) class CloudLinuxOnPremise(CloudLinuxBase): def __init__(self): super().__init__() self.name = CLOUDLINUX_ON_PREMISE @transactional async def init(self, *args, **kwargs): await super().init(*args, **kwargs)