Файловый менеджер - Редактировать - /opt/imunify360/venv/lib/python3.11/site-packages/imav/malwarelib/scan/mds/restore.py
Назад
""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>. Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see <https://www.imunify360.com/legal/eula> """ import hashlib import logging import uuid from typing import cast from defence360agent.contracts.config import Malware, MalwareScanIntensity from defence360agent.internals.the_sink import TheSink from defence360agent.utils import resource_limits from imav.contracts.messages import MalwareDatabaseRestoreTask from imav.malwarelib.cleanup.types import CleanupRevertInitiator from imav.malwarelib.config import ( RESTORE_ORIGINAL_INTENSITY_KEY, MalwareScanResourceType, ) from imav.malwarelib.model import MalwareHit from imav.malwarelib.scan.mds import MDS_PATH from imav.malwarelib.scan.mds.detached import MDSDetachedRestoreDir from imav.malwarelib.utils import hash_path logger = logging.getLogger(__name__) class MalwareDatabaseRestore: def __init__( self, *, app_name: str, path: str, signature_id: str | None = None, restore_id: str | None = None, ): self.app_name = app_name self.path = path self.signature_id = signature_id self.restore_id = restore_id or uuid.uuid4().hex def cmd(self, work_dir: MDSDetachedRestoreDir): command = [ "/opt/ai-bolit/wrapper", MDS_PATH, "--path", self.path, "--app-name", self.app_name, "--report-file", str(work_dir.report_file), "--detached", self.restore_id, "--progress", str(work_dir.progress_file), "--db-timeout", str(Malware.MDS_DB_TIMEOUT), ] if self.signature_id: command += ["--restore-sig-id", self.signature_id] else: command += ["--restore", storage_path(self.path, self.app_name)] return command async def restore(self): with MDSDetachedRestoreDir(self.restore_id) as work_dir: cmd = self.cmd(work_dir) with ( work_dir.log_file.open(mode="w") as l_f, work_dir.err_file.open(mode="w") as e_f, ): logger.info("Running MDS Restore with: %s", cmd) await resource_limits.create_subprocess( cmd, intensity_cpu=MalwareScanIntensity.CPU, intensity_io=MalwareScanIntensity.IO, start_new_session=True, stdout=l_f, stderr=e_f, cwd=str(work_dir), key=RESTORE_ORIGINAL_INTENSITY_KEY, ) def storage_path(path: str, app_name: str): return "/var/imunify360/cleanup_storage/" + hash_path( path + app_name, hash_provider=hashlib.sha384 ) async def restore_hits( hits: list[MalwareHit], sink: TheSink, initiator: CleanupRevertInitiator | None = None, ): db_hits = [ hit for hit in hits if hit.resource_type == MalwareScanResourceType.DB.value ] hit: MalwareHit for hit in db_hits: await sink.process_message( MalwareDatabaseRestoreTask( path=cast(str, hit.orig_file), app_name=cast(str, hit.app_name), signature_id=hit.signature_id if initiator == CleanupRevertInitiator.IMUNIFY else None, ) )
| ver. 1.4 |
Github
|
.
| PHP 8.1.33 | Генерация страницы: 0.05 |
proxy
|
phpinfo
|
Настройка