Файловый менеджер - Редактировать - /opt/imunify360/venv/lib/python3.11/site-packages/imav/wordpress/cli.py
Назад
""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>. Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see <https://www.imunify360.com/legal/eula> """ import logging import pwd import re from pathlib import Path from distutils.version import StrictVersion from defence360agent.utils import ( check_run, CheckRunError, async_lru_cache, ) from imav.wordpress import PLUGIN_PATH, PLUGIN_SLUG from imav.wordpress.utils import ( build_command_for_user, get_php_binary_path, wp_wrapper, ) from defence360agent.sentry import log_message from imav.model.wordpress import WPSite logger = logging.getLogger(__name__) def _validate_semver(version_str: str) -> bool: """Validate if a string is a valid semantic version.""" # Handle None and non-string inputs if not isinstance(version_str, str): return False # Trim the string and return False if empty trimmed_str = version_str.strip() if not trimmed_str: return False try: StrictVersion(trimmed_str) return True except ValueError: return False def _extract_version_from_output(output: str) -> str: """ Extract version from WP CLI output, trying both first and last parts. Args: output: The raw output from WP CLI Returns: The extracted version string or None if no valid version found """ if not output: return None # Split the output into parts parts = output.split() if not parts: return None # Try the first part if len(parts) > 0: first_part = parts[0].strip() if _validate_semver(first_part): return first_part # Try the last part if len(parts) > 1: last_part = parts[-1].strip() if _validate_semver(last_part): return last_part # If neither first nor last part is valid semver, log to sentry log_message( "Failed to extract valid semver version from WP CLI output. Output:" " '{output}'", format_args={"output": output}, level="warning", fingerprint="wp-plugin-version-extraction-failed", ) # Return None when no valid semver is found return None async def _parse_version_from_plugin_file(site: WPSite) -> str: """ Parse the version of imunify-security plugin by reading the main plugin file. Args: site: WordPress site object containing docroot path Returns: str: Plugin version or None if not found or invalid """ content_dir = await get_content_dir(site) plugin_file = ( content_dir / "plugins" / "imunify-security" / "imunify-security.php" ) if not plugin_file.exists(): return None version_pattern = re.compile(r"\* Version:\s*([0-9.]+)") try: with open(plugin_file) as f: for line in f: match = version_pattern.search(line) if match: version = match.group(1) if _validate_semver(version): return version else: return None except Exception as e: logger.error( "Failed to read plugin file to determine version number %s: %s", plugin_file, e, ) return None return None async def plugin_install(site: WPSite): """Install the Imunify Security WordPress plugin on given WordPress site.""" username = pwd.getpwuid(site.uid).pw_name php_path = await get_php_binary_path(site, username) args = [ *wp_wrapper(php_path, site.docroot), "plugin", "install", str(PLUGIN_PATH), "--activate", "--force", ] command = build_command_for_user(username, args) logger.info(f"Installing wp plugin {command}") await check_run(command) async def plugin_update(site: WPSite): """ Update the Imunify Security WordPress plugin on given WordPress site. Currently, this is the same as install, but in the future it may differ. """ username = pwd.getpwuid(site.uid).pw_name php_path = await get_php_binary_path(site, username) args = [ *wp_wrapper(php_path, site.docroot), "plugin", "install", str(PLUGIN_PATH), "--activate", "--force", ] command = build_command_for_user(username, args) logger.info(f"Updating wp plugin {command}") await check_run(command) async def plugin_uninstall(site: WPSite): """Uninstall the imunify-security wp plugin from given wp site.""" username = pwd.getpwuid(site.uid).pw_name php_path = await get_php_binary_path(site, username) args = [ *wp_wrapper(php_path, site.docroot), "plugin", "uninstall", PLUGIN_SLUG, "--deactivate", ] command = build_command_for_user(username, args) logger.info(f"Uninstalling wp plugin {command}") await check_run(command) async def _get_plugin_version(site: WPSite): """ Get the version of the imunify-security wp plugin installed on given WordPress site. Uses WP CLI to get the version. """ username = pwd.getpwuid(site.uid).pw_name php_path = await get_php_binary_path(site, username) args = [ *wp_wrapper(php_path, site.docroot), "plugin", "get", PLUGIN_SLUG, "--field=version", ] command = build_command_for_user(username, args) logger.info(f"Getting wp plugin version {command}") try: result = await check_run(command) output = result.decode("utf-8").strip() return _extract_version_from_output(output) except CheckRunError as e: logger.error( "Failed to get wp plugin version. Return code: %s", e.returncode, ) return None except UnicodeDecodeError as e: logger.error("Failed to decode wp plugin version output: %s", e) return None async def get_plugin_version(site: WPSite): """ Get the version of the imunify-security wp plugin installed on given WordPress site. First tries to parse the version from the plugin file, then falls back to WP CLI. """ # First try to parse version from plugin file try: version = await _parse_version_from_plugin_file(site) if version: return version except Exception as e: logger.warning("Failed to parse version from plugin file: %s", e) # Fall back to WP CLI if file parsing fails logger.info("Plugin version not found in file, trying WP CLI") return await _get_plugin_version(site) async def is_plugin_installed(site: WPSite): """Check if the imunify-security wp plugin is installed on given WordPress site.""" username = pwd.getpwuid(site.uid).pw_name php_path = await get_php_binary_path(site, username) args = [ *wp_wrapper(php_path, site.docroot), "plugin", "is-installed", PLUGIN_SLUG, ] command = build_command_for_user(username, args) logger.info(f"Checking if wp plugin is installed {command}") try: await check_run(command) except CheckRunError: # exit code other than 0 means plugin is not installed or there is an error return False # exit code 0 means plugin is installed return True async def is_wordpress_installed(site: WPSite): """Check if WordPress is installed and given site is accessible using WP CLI.""" username = pwd.getpwuid(site.uid).pw_name php_path = await get_php_binary_path(site, username) args = [ *wp_wrapper(php_path, site.docroot), "core", "is-installed", ] command = build_command_for_user(username, args) logger.info(f"Checking if WordPress is installed {command}") try: # exit code other than 0 means WordPress is not installed or there is an error await check_run(command) except CheckRunError: return False # exit code 0 means WordPress is installed return True async def _get_content_directory(site: WPSite): """ Get the content directory of the WordPress site using WP CLI. This should only be used if the default wp-content directory does not exist. """ username = pwd.getpwuid(site.uid).pw_name php_path = await get_php_binary_path(site, username) args = [ *wp_wrapper(php_path, site.docroot), "eval", "echo WP_CONTENT_DIR;", ] command = build_command_for_user(username, args) logger.info(f"Getting content directory {command}") try: result = await check_run(command) return result.decode("utf-8").strip() except CheckRunError as e: logger.error( "Failed to get content directory. Return code: %s", e.returncode, ) return None except UnicodeDecodeError as e: logger.error("Failed to decode content directory output: %s", e) return None @async_lru_cache(maxsize=100) async def get_content_dir(site: WPSite): """ Get the WordPress content directory for the given WordPress site. This function first checks if the default wp-content directory exists at the site's docroot. If the default path doesn't exist or isn't a directory, it attempts to get the actual content directory using WordPress CLI's WP_CONTENT_DIR constant. Returns: Path: The WordPress content directory path """ content_dir = Path(site.docroot) / "wp-content" # First check if content_dir exists and is a folder if not content_dir.exists() or not content_dir.is_dir(): # If not, try to get the content directory using WP CLI wp_content_dir = await _get_content_directory(site) if wp_content_dir: content_dir = Path(wp_content_dir) return content_dir def clear_get_content_dir_cache(): """Clear the async LRU cache for get_content_dir.""" get_content_dir.cache_clear() async def get_data_dir(site: WPSite): """ Get the Imunify Security data directory for the given WordPress site. """ content_dir = await get_content_dir(site) if not content_dir: content_dir = Path(site.docroot) / "wp-content" return Path(content_dir) / "imunify-security"
| ver. 1.4 |
Github
|
.
| PHP 8.1.33 | Генерация страницы: 0.03 |
proxy
|
phpinfo
|
Настройка