"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license,
see
"""
import logging
import pwd
import re
from pathlib import Path
from distutils.version import StrictVersion
from defence360agent.utils import (
check_run,
CheckRunError,
async_lru_cache,
)
from imav.wordpress import PLUGIN_PATH, PLUGIN_SLUG
from imav.wordpress.utils import (
build_command_for_user,
get_php_binary_path,
wp_wrapper,
)
from defence360agent.sentry import log_message
from imav.model.wordpress import WPSite
logger = logging.getLogger(__name__)
def _validate_semver(version_str: str) -> bool:
"""Validate if a string is a valid semantic version."""
# Handle None and non-string inputs
if not isinstance(version_str, str):
return False
# Trim the string and return False if empty
trimmed_str = version_str.strip()
if not trimmed_str:
return False
try:
StrictVersion(trimmed_str)
return True
except ValueError:
return False
def _extract_version_from_output(output: str) -> str:
"""
Extract version from WP CLI output, trying both first and last parts.
Args:
output: The raw output from WP CLI
Returns:
The extracted version string or None if no valid version found
"""
if not output:
return None
# Split the output into parts
parts = output.split()
if not parts:
return None
# Try the first part
if len(parts) > 0:
first_part = parts[0].strip()
if _validate_semver(first_part):
return first_part
# Try the last part
if len(parts) > 1:
last_part = parts[-1].strip()
if _validate_semver(last_part):
return last_part
# If neither first nor last part is valid semver, log to sentry
log_message(
"Failed to extract valid semver version from WP CLI output. Output:"
" '{output}'",
format_args={"output": output},
level="warning",
fingerprint="wp-plugin-version-extraction-failed",
)
# Return None when no valid semver is found
return None
async def _parse_version_from_plugin_file(site: WPSite) -> str:
"""
Parse the version of imunify-security plugin by reading the main plugin file.
Args:
site: WordPress site object containing docroot path
Returns:
str: Plugin version or None if not found or invalid
"""
content_dir = await get_content_dir(site)
plugin_file = (
content_dir / "plugins" / "imunify-security" / "imunify-security.php"
)
if not plugin_file.exists():
return None
version_pattern = re.compile(r"\* Version:\s*([0-9.]+)")
try:
with open(plugin_file) as f:
for line in f:
match = version_pattern.search(line)
if match:
version = match.group(1)
if _validate_semver(version):
return version
else:
return None
except Exception as e:
logger.error(
"Failed to read plugin file to determine version number %s: %s",
plugin_file,
e,
)
return None
return None
async def plugin_install(site: WPSite):
"""Install the Imunify Security WordPress plugin on given WordPress site."""
username = pwd.getpwuid(site.uid).pw_name
php_path = await get_php_binary_path(site, username)
args = [
*wp_wrapper(php_path, site.docroot),
"plugin",
"install",
str(PLUGIN_PATH),
"--activate",
"--force",
]
command = build_command_for_user(username, args)
logger.info(f"Installing wp plugin {command}")
await check_run(command)
async def plugin_update(site: WPSite):
"""
Update the Imunify Security WordPress plugin on given WordPress site.
Currently, this is the same as install, but in the future it may differ.
"""
username = pwd.getpwuid(site.uid).pw_name
php_path = await get_php_binary_path(site, username)
args = [
*wp_wrapper(php_path, site.docroot),
"plugin",
"install",
str(PLUGIN_PATH),
"--activate",
"--force",
]
command = build_command_for_user(username, args)
logger.info(f"Updating wp plugin {command}")
await check_run(command)
async def plugin_uninstall(site: WPSite):
"""Uninstall the imunify-security wp plugin from given wp site."""
username = pwd.getpwuid(site.uid).pw_name
php_path = await get_php_binary_path(site, username)
args = [
*wp_wrapper(php_path, site.docroot),
"plugin",
"uninstall",
PLUGIN_SLUG,
"--deactivate",
]
command = build_command_for_user(username, args)
logger.info(f"Uninstalling wp plugin {command}")
await check_run(command)
async def _get_plugin_version(site: WPSite):
"""
Get the version of the imunify-security wp plugin installed on given WordPress site.
Uses WP CLI to get the version.
"""
username = pwd.getpwuid(site.uid).pw_name
php_path = await get_php_binary_path(site, username)
args = [
*wp_wrapper(php_path, site.docroot),
"plugin",
"get",
PLUGIN_SLUG,
"--field=version",
]
command = build_command_for_user(username, args)
logger.info(f"Getting wp plugin version {command}")
try:
result = await check_run(command)
output = result.decode("utf-8").strip()
return _extract_version_from_output(output)
except CheckRunError as e:
logger.error(
"Failed to get wp plugin version. Return code: %s",
e.returncode,
)
return None
except UnicodeDecodeError as e:
logger.error("Failed to decode wp plugin version output: %s", e)
return None
async def get_plugin_version(site: WPSite):
"""
Get the version of the imunify-security wp plugin installed on given WordPress site.
First tries to parse the version from the plugin file, then falls back to WP CLI.
"""
# First try to parse version from plugin file
try:
version = await _parse_version_from_plugin_file(site)
if version:
return version
except Exception as e:
logger.warning("Failed to parse version from plugin file: %s", e)
# Fall back to WP CLI if file parsing fails
logger.info("Plugin version not found in file, trying WP CLI")
return await _get_plugin_version(site)
async def is_plugin_installed(site: WPSite):
"""Check if the imunify-security wp plugin is installed on given WordPress site."""
username = pwd.getpwuid(site.uid).pw_name
php_path = await get_php_binary_path(site, username)
args = [
*wp_wrapper(php_path, site.docroot),
"plugin",
"is-installed",
PLUGIN_SLUG,
]
command = build_command_for_user(username, args)
logger.info(f"Checking if wp plugin is installed {command}")
try:
await check_run(command)
except CheckRunError:
# exit code other than 0 means plugin is not installed or there is an error
return False
# exit code 0 means plugin is installed
return True
async def is_wordpress_installed(site: WPSite):
"""Check if WordPress is installed and given site is accessible using WP CLI."""
username = pwd.getpwuid(site.uid).pw_name
php_path = await get_php_binary_path(site, username)
args = [
*wp_wrapper(php_path, site.docroot),
"core",
"is-installed",
]
command = build_command_for_user(username, args)
logger.info(f"Checking if WordPress is installed {command}")
try:
# exit code other than 0 means WordPress is not installed or there is an error
await check_run(command)
except CheckRunError:
return False
# exit code 0 means WordPress is installed
return True
async def _get_content_directory(site: WPSite):
"""
Get the content directory of the WordPress site using WP CLI.
This should only be used if the default wp-content directory does not exist.
"""
username = pwd.getpwuid(site.uid).pw_name
php_path = await get_php_binary_path(site, username)
args = [
*wp_wrapper(php_path, site.docroot),
"eval",
"echo WP_CONTENT_DIR;",
]
command = build_command_for_user(username, args)
logger.info(f"Getting content directory {command}")
try:
result = await check_run(command)
return result.decode("utf-8").strip()
except CheckRunError as e:
logger.error(
"Failed to get content directory. Return code: %s",
e.returncode,
)
return None
except UnicodeDecodeError as e:
logger.error("Failed to decode content directory output: %s", e)
return None
@async_lru_cache(maxsize=100)
async def get_content_dir(site: WPSite):
"""
Get the WordPress content directory for the given WordPress site.
This function first checks if the default wp-content directory exists at the site's docroot.
If the default path doesn't exist or isn't a directory, it attempts to get the actual
content directory using WordPress CLI's WP_CONTENT_DIR constant.
Returns:
Path: The WordPress content directory path
"""
content_dir = Path(site.docroot) / "wp-content"
# First check if content_dir exists and is a folder
if not content_dir.exists() or not content_dir.is_dir():
# If not, try to get the content directory using WP CLI
wp_content_dir = await _get_content_directory(site)
if wp_content_dir:
content_dir = Path(wp_content_dir)
return content_dir
def clear_get_content_dir_cache():
"""Clear the async LRU cache for get_content_dir."""
get_content_dir.cache_clear()
async def get_data_dir(site: WPSite):
"""
Get the Imunify Security data directory for the given WordPress site.
"""
content_dir = await get_content_dir(site)
if not content_dir:
content_dir = Path(site.docroot) / "wp-content"
return Path(content_dir) / "imunify-security"