""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program.  If not, see . Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see """ import logging import pwd import re from pathlib import Path from distutils.version import StrictVersion from defence360agent.utils import ( check_run, CheckRunError, async_lru_cache, ) from imav.wordpress import PLUGIN_PATH, PLUGIN_SLUG from imav.wordpress.utils import ( build_command_for_user, get_php_binary_path, wp_wrapper, ) from defence360agent.sentry import log_message from imav.model.wordpress import WPSite logger = logging.getLogger(__name__) def _validate_semver(version_str: str) -> bool: """Validate if a string is a valid semantic version.""" # Handle None and non-string inputs if not isinstance(version_str, str): return False # Trim the string and return False if empty trimmed_str = version_str.strip() if not trimmed_str: return False try: StrictVersion(trimmed_str) return True except ValueError: return False def _extract_version_from_output(output: str) -> str: """ Extract version from WP CLI output, trying both first and last parts. Args: output: The raw output from WP CLI Returns: The extracted version string or None if no valid version found """ if not output: return None # Split the output into parts parts = output.split() if not parts: return None # Try the first part if len(parts) > 0: first_part = parts[0].strip() if _validate_semver(first_part): return first_part # Try the last part if len(parts) > 1: last_part = parts[-1].strip() if _validate_semver(last_part): return last_part # If neither first nor last part is valid semver, log to sentry log_message( "Failed to extract valid semver version from WP CLI output. Output:" " '{output}'", format_args={"output": output}, level="warning", fingerprint="wp-plugin-version-extraction-failed", ) # Return None when no valid semver is found return None async def _parse_version_from_plugin_file(site: WPSite) -> str: """ Parse the version of imunify-security plugin by reading the main plugin file. Args: site: WordPress site object containing docroot path Returns: str: Plugin version or None if not found or invalid """ content_dir = await get_content_dir(site) plugin_file = ( content_dir / "plugins" / "imunify-security" / "imunify-security.php" ) if not plugin_file.exists(): return None version_pattern = re.compile(r"\* Version:\s*([0-9.]+)") try: with open(plugin_file) as f: for line in f: match = version_pattern.search(line) if match: version = match.group(1) if _validate_semver(version): return version else: return None except Exception as e: logger.error( "Failed to read plugin file to determine version number %s: %s", plugin_file, e, ) return None return None async def plugin_install(site: WPSite): """Install the Imunify Security WordPress plugin on given WordPress site.""" username = pwd.getpwuid(site.uid).pw_name php_path = await get_php_binary_path(site, username) args = [ *wp_wrapper(php_path, site.docroot), "plugin", "install", str(PLUGIN_PATH), "--activate", "--force", ] command = build_command_for_user(username, args) logger.info(f"Installing wp plugin {command}") await check_run(command) async def plugin_update(site: WPSite): """ Update the Imunify Security WordPress plugin on given WordPress site. Currently, this is the same as install, but in the future it may differ. """ username = pwd.getpwuid(site.uid).pw_name php_path = await get_php_binary_path(site, username) args = [ *wp_wrapper(php_path, site.docroot), "plugin", "install", str(PLUGIN_PATH), "--activate", "--force", ] command = build_command_for_user(username, args) logger.info(f"Updating wp plugin {command}") await check_run(command) async def plugin_uninstall(site: WPSite): """Uninstall the imunify-security wp plugin from given wp site.""" username = pwd.getpwuid(site.uid).pw_name php_path = await get_php_binary_path(site, username) args = [ *wp_wrapper(php_path, site.docroot), "plugin", "uninstall", PLUGIN_SLUG, "--deactivate", ] command = build_command_for_user(username, args) logger.info(f"Uninstalling wp plugin {command}") await check_run(command) async def _get_plugin_version(site: WPSite): """ Get the version of the imunify-security wp plugin installed on given WordPress site. Uses WP CLI to get the version. """ username = pwd.getpwuid(site.uid).pw_name php_path = await get_php_binary_path(site, username) args = [ *wp_wrapper(php_path, site.docroot), "plugin", "get", PLUGIN_SLUG, "--field=version", ] command = build_command_for_user(username, args) logger.info(f"Getting wp plugin version {command}") try: result = await check_run(command) output = result.decode("utf-8").strip() return _extract_version_from_output(output) except CheckRunError as e: logger.error( "Failed to get wp plugin version. Return code: %s", e.returncode, ) return None except UnicodeDecodeError as e: logger.error("Failed to decode wp plugin version output: %s", e) return None async def get_plugin_version(site: WPSite): """ Get the version of the imunify-security wp plugin installed on given WordPress site. First tries to parse the version from the plugin file, then falls back to WP CLI. """ # First try to parse version from plugin file try: version = await _parse_version_from_plugin_file(site) if version: return version except Exception as e: logger.warning("Failed to parse version from plugin file: %s", e) # Fall back to WP CLI if file parsing fails logger.info("Plugin version not found in file, trying WP CLI") return await _get_plugin_version(site) async def is_plugin_installed(site: WPSite): """Check if the imunify-security wp plugin is installed on given WordPress site.""" username = pwd.getpwuid(site.uid).pw_name php_path = await get_php_binary_path(site, username) args = [ *wp_wrapper(php_path, site.docroot), "plugin", "is-installed", PLUGIN_SLUG, ] command = build_command_for_user(username, args) logger.info(f"Checking if wp plugin is installed {command}") try: await check_run(command) except CheckRunError: # exit code other than 0 means plugin is not installed or there is an error return False # exit code 0 means plugin is installed return True async def is_wordpress_installed(site: WPSite): """Check if WordPress is installed and given site is accessible using WP CLI.""" username = pwd.getpwuid(site.uid).pw_name php_path = await get_php_binary_path(site, username) args = [ *wp_wrapper(php_path, site.docroot), "core", "is-installed", ] command = build_command_for_user(username, args) logger.info(f"Checking if WordPress is installed {command}") try: # exit code other than 0 means WordPress is not installed or there is an error await check_run(command) except CheckRunError: return False # exit code 0 means WordPress is installed return True async def _get_content_directory(site: WPSite): """ Get the content directory of the WordPress site using WP CLI. This should only be used if the default wp-content directory does not exist. """ username = pwd.getpwuid(site.uid).pw_name php_path = await get_php_binary_path(site, username) args = [ *wp_wrapper(php_path, site.docroot), "eval", "echo WP_CONTENT_DIR;", ] command = build_command_for_user(username, args) logger.info(f"Getting content directory {command}") try: result = await check_run(command) return result.decode("utf-8").strip() except CheckRunError as e: logger.error( "Failed to get content directory. Return code: %s", e.returncode, ) return None except UnicodeDecodeError as e: logger.error("Failed to decode content directory output: %s", e) return None @async_lru_cache(maxsize=100) async def get_content_dir(site: WPSite): """ Get the WordPress content directory for the given WordPress site. This function first checks if the default wp-content directory exists at the site's docroot. If the default path doesn't exist or isn't a directory, it attempts to get the actual content directory using WordPress CLI's WP_CONTENT_DIR constant. Returns: Path: The WordPress content directory path """ content_dir = Path(site.docroot) / "wp-content" # First check if content_dir exists and is a folder if not content_dir.exists() or not content_dir.is_dir(): # If not, try to get the content directory using WP CLI wp_content_dir = await _get_content_directory(site) if wp_content_dir: content_dir = Path(wp_content_dir) return content_dir def clear_get_content_dir_cache(): """Clear the async LRU cache for get_content_dir.""" get_content_dir.cache_clear() async def get_data_dir(site: WPSite): """ Get the Imunify Security data directory for the given WordPress site. """ content_dir = await get_content_dir(site) if not content_dir: content_dir = Path(site.docroot) / "wp-content" return Path(content_dir) / "imunify-security"