Done ! 403WebShell
403Webshell
Server IP : 162.0.217.223  /  Your IP : 216.73.216.153
Web Server : LiteSpeed
System : Linux premium269.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : mypckeys ( 1539)
PHP Version : 8.1.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /usr/share/l.v.e-manager/utils/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /usr/share/l.v.e-manager/utils/cloudlinux_cli_user.py
# coding:utf-8

# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

from __future__ import print_function
from __future__ import division
from __future__ import absolute_import

import json
import logging
import subprocess
import os
import sys

from libcloudlinux import (
    CloudlinuxCliBase,
    LVEMANAGER_PLUGIN_NAMES,
    DEFAULT_PLUGIN_NAME,
    PASSENGER_DEPEND_PLUGINS,
    AllLimitStrategy,
    NoLimitStrategy,
    LimitStrategyHeavy,
    LimitStrategyBase,
    ConfigLimitValue,
    BypassStrategy,
    EnterTool,
)
from clselector.clpassenger_detectlib import is_clpassenger_active
from clcommon import ClPwd
from clcommon.utils import is_litespeed_running
from clcommon.lib.cledition import is_cl_solo_edition
from cldetectlib import get_param_from_file
from clcommon.const import Feature
from clcommon.cpapi import is_panel_feature_supported


CONFIG = "/etc/sysconfig/cloudlinux"
SMART_ADVICE_USER_CLI = "/opt/alt/php-xray/cl-smart-advice-user"
PERCENTS_STATS_MODE_FLAG = (
    "/opt/cloudlinux/flags/enabled-flags.d/percentage-user-stats-mode.flag"
)

# NB: this logger's out is stderr, result JSON out is stdout - so with active logger web will not work properly
#       because of stderr redirection 2>&1
#       so it is MUST be silent(NOTSET) in normal situation
# also it is not possible to use file logger here - script works inside the cagefs with user's rights
logger = logging.getLogger(__name__)
logger.setLevel(logging.NOTSET)
init_formatter = logging.Formatter(
    "[%(asctime)s] %(funcName)s:%(lineno)s - %(message)s"
)
cagefs_formatter = logging.Formatter(
    "{cagefs} [%(asctime)s] %(funcName)s:%(lineno)s - %(message)s"
)
h = logging.StreamHandler()
h.setFormatter(init_formatter)
logger.addHandler(h)


logger.debug("cli start")


class CloudlinuxCliUser(CloudlinuxCliBase):
    limit_strategy: LimitStrategyBase

    def __init__(self):
        self.web_resource_limit_mode = ConfigLimitValue.HEAVY
        limit_mode = get_param_from_file(
            CONFIG, "web_resource_limit_mode", "=", ConfigLimitValue.HEAVY.value
        )
        self.web_resource_limit_mode = ConfigLimitValue(limit_mode)
        super(CloudlinuxCliUser, self).__init__()
        self.command_methods.update(
            {
                "spa-get-domains": self.spa_user_domains,
                "spa-get-homedir": self.spa_user_homedir,
                "cloudlinux-snapshots": self.cl_snapshots,
                "spa-get-user-info": self.spa_get_user_info,
            }
        )

    def __init_limit_strategy(self):
        """
        Set default strategy from the `CONFIG` values
        """
        if self.skip_cagefs_check:
            # update log format to easier log review
            logger.handlers[0].setFormatter(cagefs_formatter)

        # we cannot use lve when it is not available
        if not is_panel_feature_supported(Feature.LVE):
            self.limit_strategy = BypassStrategy()
        else:
            self.limit_strategy = {
                ConfigLimitValue.ALL: AllLimitStrategy,
                ConfigLimitValue.HEAVY: LimitStrategyHeavy,
                ConfigLimitValue.UNLIMITED: NoLimitStrategy,
            }.get(self.web_resource_limit_mode, LimitStrategyHeavy)()

        # we cannot use cagefs when it is not available
        if not is_panel_feature_supported(Feature.CAGEFS):
            self.limit_strategy.enter_tool = EnterTool.LVE_SUWRAPPER

        # some commands do not work inside cagefs, but we can still limit them with lve
        if self.__is_cagefs_incompatible_command():
            self.limit_strategy.enter_tool = EnterTool.LVE_SUWRAPPER

        logger.debug(
            f"Limits strategy inited as {self.limit_strategy.__class__}"
            f"\n\tBecause of:"
            f"\n\tself.web_resource_limit_mode: {self.web_resource_limit_mode}"
        )

    def set_limit_strategy(self, strategy: LimitStrategyBase):
        logger.debug(f"Limit strategy is explicitly set to {strategy.__class__}")
        self.limit_strategy = strategy

    def __is_cagefs_incompatible_command(self):
        """
        Returns True if command is not compatible with CageFS
        """
        data = self.request_data

        # phpselector commands
        if data.get("params", {}).get("interpreter") == "php":
            return True

        if data.get("command") in {
            # TODO: https://cloudlinux.atlassian.net/browse/CLOS-3561
            "cloudlinux-statistics",
            "cloudlinux-top",
            "cloudlinux-snapshots",
            "cloudlinux-charts",
            "cloudlinux-statsnotifier",
            # this command cannot run inside cagefs because it needs access to /dev/vdaX
            "cloudlinux-quota",
        }:
            logger.debug("Executable command found in the exclusive list")
            return True
        return False

    def drop_permission(self):
        """
        Drop permission to users, if owner of script is user
        :return:
        """
        logger.debug(
            "drop permissions start"
            f"\n\targv is: {sys.argv}"
            f"\n\trequest data is: {self.request_data}"
        )
        self.__init_limit_strategy()
        data = self.request_data
        if data["owner"] != "user":
            self.exit_with_error("User not allowed")
        super(CloudlinuxCliUser, self).drop_permission()

        args = self.prepair_params_for_command()
        logger.debug(f"prepared args is: {args}")

        if data.get("command"):
            if self.skip_cagefs_check:
                logger.debug("cagefs skipped: --skip-cagefs-check arg found")
            else:
                # if rc is None - script won't enter the cagefs
                # otherwise - command is executed in the cagefs
                rc = self.limit_strategy.execute(
                    self.user_info["lve-id"], data["command"], args, self.request_data
                )
                if rc is not None:
                    logger.debug(f"command executed inside of the cagefs with rc: {rc}")
                    sys.exit(rc)
                else:
                    logger.debug(
                        f"cagefs skipped: strategy is {self.limit_strategy.__class__}"
                    )

        # skip checking plugin availability on spa-get-user-info
        if data.get("command") != "spa-get-user-info":
            self.check_plugin_availability()
        logger.debug("drop permissons end")

    def spa_user_domains(self):
        print(json.dumps({"result": "success", "list": self.get_user_domains()}))
        sys.exit(0)

    def spa_user_homedir(self):
        print(json.dumps({"result": "success", "homedir": self.get_user_homedir()}))
        sys.exit(0)

    def spa_get_user_info(self):
        try:
            print(
                json.dumps(
                    {
                        "result": "success",
                        "domains": self.get_user_domains(),
                        "homedir": self.get_user_homedir(),
                        "is_litespeed_running": is_litespeed_running(),
                        "is_cl_solo_edition": is_cl_solo_edition(skip_jwt_check=True),
                        "smart_advice": os.path.isfile(SMART_ADVICE_USER_CLI),
                        "is_lve_supported": is_panel_feature_supported(Feature.LVE),
                        "user_stats_mode": self.get_stats_mode(),
                        "server_ip": self.get_server_ip(),
                    }
                )
            )
        except:
            self.exit_with_error("Module unavailable")
        sys.exit(0)

    def get_user_domains(self):
        try:
            from clcommon.cpapi import userdomains
        except:
            self.exit_with_error("Module unavailable")
        return [x[0] for x in userdomains(self.user_info["username"])]

    def get_stats_mode(self):
        if os.path.isfile(PERCENTS_STATS_MODE_FLAG):
            return "percent"
        return "default"

    def get_user_homedir(self):
        try:
            pwdir = ClPwd().get_homedir(self.user_info["username"])
            return pwdir + "/"
        except KeyError:
            self.exit_with_error("No such user")

    def cl_snapshots(self):
        list_to_request = self.prepair_params_for_command()
        try:
            output = self.run_util("/usr/sbin/lve-read-snapshot", *list_to_request)
        except subprocess.CalledProcessError as processError:
            output = processError.output
        try:
            result = json.loads(output)
        except:
            self.exit_with_error(output)
            return
        self.exit_with_success({"data": result["data"]})
        sys.exit(0)

    def check_plugin_availability(self):
        plugin_names = {
            "nodejs_selector": "Node.js Selector",
            "python_selector": "Python Selector",
        }
        selector_enabled = True
        manager = None
        try:
            if self.current_plugin_name == "nodejs_selector":
                from clselect.clselectnodejs.node_manager import NodeManager

                manager = NodeManager()
            if self.current_plugin_name == "python_selector":
                from clselect.clselectpython.python_manager import PythonManager

                manager = PythonManager()
            if manager:
                selector_enabled = manager.selector_enabled
        except:
            selector_enabled = False
        if not selector_enabled:
            self.exit_with_error(
                code=503,
                error_id="ERROR.not_available_plugin",
                context={
                    "pluginName": plugin_names.get(self.current_plugin_name, "Plugin")
                },
                icon="disabled",
            )
        plugin_available_checker = {
            "nodejs_selector": self._plugin_available_nodejs,
            "python_selector": self._plugin_available_python,
            "php_selector": self._plugin_available_php,
            "resource_usage": self._plugin_available_resource_usage,
        }.get(self.current_plugin_name)
        if plugin_available_checker:
            plugin_available = plugin_available_checker()
        else:
            plugin_available = True

        if (
            not is_clpassenger_active()
            and self.current_plugin_name in PASSENGER_DEPEND_PLUGINS
        ):
            self.exit_with_error(
                code=503,
                error_id="ERROR.not_available_passenger",
                context={
                    "pluginName": LVEMANAGER_PLUGIN_NAMES.get(
                        self.current_plugin_name, DEFAULT_PLUGIN_NAME
                    )
                },
                icon="disabled",
            )
        if not plugin_available:
            self.exit_with_error(
                code=503,
                error_id="ERROR.not_available_plugin",
                context={
                    "pluginName": LVEMANAGER_PLUGIN_NAMES.get(
                        self.current_plugin_name, DEFAULT_PLUGIN_NAME
                    )
                },
                icon="disabled",
            )

    def _plugin_available_nodejs(self):
        try:
            from clselect.clselectnodejs.node_manager import NodeManager

            manager = NodeManager()
            if not manager.selector_enabled or not is_clpassenger_active():
                return False
        except:
            return False
        return True

    def _plugin_available_python(self):
        try:
            from clselect.clselectpython.python_manager import PythonManager

            manager = PythonManager()
            if not manager.selector_enabled or not is_clpassenger_active():
                return False
        except:
            return False
        return True

    def _plugin_available_php(self):
        try:
            from clselect.clselectphp.php_manager import PhpManager

            manager = PhpManager()
            if not manager.selector_enabled:
                return False
        except:
            return False
        return True

    def _plugin_available_resource_usage(self):
        return True

Youez - 2016 - github.com/yon3zu
LinuXploit