Server IP : 162.0.217.223 / Your IP : 216.73.216.153 Web Server : LiteSpeed System : Linux premium269.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64 User : mypckeys ( 1539) PHP Version : 8.1.33 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /usr/share/l.v.e-manager/utils/ |
Upload File : |
# coding:utf-8 # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT from __future__ import print_function from __future__ import division from __future__ import absolute_import import json import logging import subprocess import os import sys from libcloudlinux import ( CloudlinuxCliBase, LVEMANAGER_PLUGIN_NAMES, DEFAULT_PLUGIN_NAME, PASSENGER_DEPEND_PLUGINS, AllLimitStrategy, NoLimitStrategy, LimitStrategyHeavy, LimitStrategyBase, ConfigLimitValue, BypassStrategy, EnterTool, ) from clselector.clpassenger_detectlib import is_clpassenger_active from clcommon import ClPwd from clcommon.utils import is_litespeed_running from clcommon.lib.cledition import is_cl_solo_edition from cldetectlib import get_param_from_file from clcommon.const import Feature from clcommon.cpapi import is_panel_feature_supported CONFIG = "/etc/sysconfig/cloudlinux" SMART_ADVICE_USER_CLI = "/opt/alt/php-xray/cl-smart-advice-user" PERCENTS_STATS_MODE_FLAG = ( "/opt/cloudlinux/flags/enabled-flags.d/percentage-user-stats-mode.flag" ) # NB: this logger's out is stderr, result JSON out is stdout - so with active logger web will not work properly # because of stderr redirection 2>&1 # so it is MUST be silent(NOTSET) in normal situation # also it is not possible to use file logger here - script works inside the cagefs with user's rights logger = logging.getLogger(__name__) logger.setLevel(logging.NOTSET) init_formatter = logging.Formatter( "[%(asctime)s] %(funcName)s:%(lineno)s - %(message)s" ) cagefs_formatter = logging.Formatter( "{cagefs} [%(asctime)s] %(funcName)s:%(lineno)s - %(message)s" ) h = logging.StreamHandler() h.setFormatter(init_formatter) logger.addHandler(h) logger.debug("cli start") class CloudlinuxCliUser(CloudlinuxCliBase): limit_strategy: LimitStrategyBase def __init__(self): self.web_resource_limit_mode = ConfigLimitValue.HEAVY limit_mode = get_param_from_file( CONFIG, "web_resource_limit_mode", "=", ConfigLimitValue.HEAVY.value ) self.web_resource_limit_mode = ConfigLimitValue(limit_mode) super(CloudlinuxCliUser, self).__init__() self.command_methods.update( { "spa-get-domains": self.spa_user_domains, "spa-get-homedir": self.spa_user_homedir, "cloudlinux-snapshots": self.cl_snapshots, "spa-get-user-info": self.spa_get_user_info, } ) def __init_limit_strategy(self): """ Set default strategy from the `CONFIG` values """ if self.skip_cagefs_check: # update log format to easier log review logger.handlers[0].setFormatter(cagefs_formatter) # we cannot use lve when it is not available if not is_panel_feature_supported(Feature.LVE): self.limit_strategy = BypassStrategy() else: self.limit_strategy = { ConfigLimitValue.ALL: AllLimitStrategy, ConfigLimitValue.HEAVY: LimitStrategyHeavy, ConfigLimitValue.UNLIMITED: NoLimitStrategy, }.get(self.web_resource_limit_mode, LimitStrategyHeavy)() # we cannot use cagefs when it is not available if not is_panel_feature_supported(Feature.CAGEFS): self.limit_strategy.enter_tool = EnterTool.LVE_SUWRAPPER # some commands do not work inside cagefs, but we can still limit them with lve if self.__is_cagefs_incompatible_command(): self.limit_strategy.enter_tool = EnterTool.LVE_SUWRAPPER logger.debug( f"Limits strategy inited as {self.limit_strategy.__class__}" f"\n\tBecause of:" f"\n\tself.web_resource_limit_mode: {self.web_resource_limit_mode}" ) def set_limit_strategy(self, strategy: LimitStrategyBase): logger.debug(f"Limit strategy is explicitly set to {strategy.__class__}") self.limit_strategy = strategy def __is_cagefs_incompatible_command(self): """ Returns True if command is not compatible with CageFS """ data = self.request_data # phpselector commands if data.get("params", {}).get("interpreter") == "php": return True if data.get("command") in { # TODO: https://cloudlinux.atlassian.net/browse/CLOS-3561 "cloudlinux-statistics", "cloudlinux-top", "cloudlinux-snapshots", "cloudlinux-charts", "cloudlinux-statsnotifier", # this command cannot run inside cagefs because it needs access to /dev/vdaX "cloudlinux-quota", }: logger.debug("Executable command found in the exclusive list") return True return False def drop_permission(self): """ Drop permission to users, if owner of script is user :return: """ logger.debug( "drop permissions start" f"\n\targv is: {sys.argv}" f"\n\trequest data is: {self.request_data}" ) self.__init_limit_strategy() data = self.request_data if data["owner"] != "user": self.exit_with_error("User not allowed") super(CloudlinuxCliUser, self).drop_permission() args = self.prepair_params_for_command() logger.debug(f"prepared args is: {args}") if data.get("command"): if self.skip_cagefs_check: logger.debug("cagefs skipped: --skip-cagefs-check arg found") else: # if rc is None - script won't enter the cagefs # otherwise - command is executed in the cagefs rc = self.limit_strategy.execute( self.user_info["lve-id"], data["command"], args, self.request_data ) if rc is not None: logger.debug(f"command executed inside of the cagefs with rc: {rc}") sys.exit(rc) else: logger.debug( f"cagefs skipped: strategy is {self.limit_strategy.__class__}" ) # skip checking plugin availability on spa-get-user-info if data.get("command") != "spa-get-user-info": self.check_plugin_availability() logger.debug("drop permissons end") def spa_user_domains(self): print(json.dumps({"result": "success", "list": self.get_user_domains()})) sys.exit(0) def spa_user_homedir(self): print(json.dumps({"result": "success", "homedir": self.get_user_homedir()})) sys.exit(0) def spa_get_user_info(self): try: print( json.dumps( { "result": "success", "domains": self.get_user_domains(), "homedir": self.get_user_homedir(), "is_litespeed_running": is_litespeed_running(), "is_cl_solo_edition": is_cl_solo_edition(skip_jwt_check=True), "smart_advice": os.path.isfile(SMART_ADVICE_USER_CLI), "is_lve_supported": is_panel_feature_supported(Feature.LVE), "user_stats_mode": self.get_stats_mode(), "server_ip": self.get_server_ip(), } ) ) except: self.exit_with_error("Module unavailable") sys.exit(0) def get_user_domains(self): try: from clcommon.cpapi import userdomains except: self.exit_with_error("Module unavailable") return [x[0] for x in userdomains(self.user_info["username"])] def get_stats_mode(self): if os.path.isfile(PERCENTS_STATS_MODE_FLAG): return "percent" return "default" def get_user_homedir(self): try: pwdir = ClPwd().get_homedir(self.user_info["username"]) return pwdir + "/" except KeyError: self.exit_with_error("No such user") def cl_snapshots(self): list_to_request = self.prepair_params_for_command() try: output = self.run_util("/usr/sbin/lve-read-snapshot", *list_to_request) except subprocess.CalledProcessError as processError: output = processError.output try: result = json.loads(output) except: self.exit_with_error(output) return self.exit_with_success({"data": result["data"]}) sys.exit(0) def check_plugin_availability(self): plugin_names = { "nodejs_selector": "Node.js Selector", "python_selector": "Python Selector", } selector_enabled = True manager = None try: if self.current_plugin_name == "nodejs_selector": from clselect.clselectnodejs.node_manager import NodeManager manager = NodeManager() if self.current_plugin_name == "python_selector": from clselect.clselectpython.python_manager import PythonManager manager = PythonManager() if manager: selector_enabled = manager.selector_enabled except: selector_enabled = False if not selector_enabled: self.exit_with_error( code=503, error_id="ERROR.not_available_plugin", context={ "pluginName": plugin_names.get(self.current_plugin_name, "Plugin") }, icon="disabled", ) plugin_available_checker = { "nodejs_selector": self._plugin_available_nodejs, "python_selector": self._plugin_available_python, "php_selector": self._plugin_available_php, "resource_usage": self._plugin_available_resource_usage, }.get(self.current_plugin_name) if plugin_available_checker: plugin_available = plugin_available_checker() else: plugin_available = True if ( not is_clpassenger_active() and self.current_plugin_name in PASSENGER_DEPEND_PLUGINS ): self.exit_with_error( code=503, error_id="ERROR.not_available_passenger", context={ "pluginName": LVEMANAGER_PLUGIN_NAMES.get( self.current_plugin_name, DEFAULT_PLUGIN_NAME ) }, icon="disabled", ) if not plugin_available: self.exit_with_error( code=503, error_id="ERROR.not_available_plugin", context={ "pluginName": LVEMANAGER_PLUGIN_NAMES.get( self.current_plugin_name, DEFAULT_PLUGIN_NAME ) }, icon="disabled", ) def _plugin_available_nodejs(self): try: from clselect.clselectnodejs.node_manager import NodeManager manager = NodeManager() if not manager.selector_enabled or not is_clpassenger_active(): return False except: return False return True def _plugin_available_python(self): try: from clselect.clselectpython.python_manager import PythonManager manager = PythonManager() if not manager.selector_enabled or not is_clpassenger_active(): return False except: return False return True def _plugin_available_php(self): try: from clselect.clselectphp.php_manager import PhpManager manager = PhpManager() if not manager.selector_enabled: return False except: return False return True def _plugin_available_resource_usage(self): return True