Linux ip-172-26-2-223 5.4.0-1018-aws #18-Ubuntu SMP Wed Jun 24 01:15:00 UTC 2020 x86_64
Apache
: 172.26.2.223 | : 18.222.164.159
Cant Read [ /etc/named.conf ]
8.1.13
www
www.github.com/MadExploits
Terminal
AUTO ROOT
Adminer
Backdoor Destroyer
Linux Exploit
Lock Shell
Lock File
Create User
CREATE RDP
PHP Mailer
BACKCONNECT
UNLOCK SHELL
HASH IDENTIFIER
CPANEL RESET
CREATE WP USER
BLACK DEFEND!
README
+ Create Folder
+ Create File
/
www /
server /
panel /
class_v2 /
[ HOME SHELL ]
Name
Size
Permission
Action
__pycache__
[ DIR ]
drwxr-xr-x
btdockerModelV2
[ DIR ]
drwxr-xr-x
crontabModelV2
[ DIR ]
drwxr-xr-x
databaseModelV2
[ DIR ]
drwxr-xr-x
firewallModelV2
[ DIR ]
drwxr-xr-x
logsModelV2
[ DIR ]
drwxr-xr-x
monitorModelV2
[ DIR ]
drwxr-xr-x
panelModelV2
[ DIR ]
drwxr-xr-x
power_mta
[ DIR ]
drwxr-xr-x
projectModelV2
[ DIR ]
drwxr-xr-x
safeModelV2
[ DIR ]
drwxr-xr-x
safe_warning_v2
[ DIR ]
drwxr-xr-x
ssl_domainModelV2
[ DIR ]
drwxr-xr-x
virtualModelV2
[ DIR ]
drwxr-xr-x
wp_toolkit
[ DIR ]
drwxr-xr-x
acme_v3.py
133.98
KB
-rw-r--r--
ajax_v2.py
95.41
KB
-rw-r--r--
apache_v2.py
17.28
KB
-rw-r--r--
backup_bak_v2.py
24.86
KB
-rw-r--r--
breaking_through.py
47.94
KB
-rw-r--r--
cloud_stora_upload_v2.py
19.27
KB
-rw-r--r--
common_v2.py
12.45
KB
-rw-r--r--
config_v2.py
165.36
KB
-rw-r--r--
crontab_ssl_v2.py
1.85
KB
-rw-r--r--
crontab_v2.py
111.93
KB
-rw-r--r--
data_v2.py
36.54
KB
-rw-r--r--
database_v2.py
125.54
KB
-rw-r--r--
datatool_v2.py
5.83
KB
-rw-r--r--
db_mysql_v2.py
11.41
KB
-rw-r--r--
db_v2.py
11.04
KB
-rw-r--r--
dk_db.py
18.34
KB
-rw-r--r--
download_file_v2.py
2.54
KB
-rw-r--r--
fastcgi_client_two_v2.py
12.26
KB
-rw-r--r--
fastcgi_client_v2.py
6.89
KB
-rw-r--r--
file_execute_deny_v2.py
10.34
KB
-rw-r--r--
files_v2.py
149.12
KB
-rw-r--r--
firewall_new_v2.py
22.4
KB
-rw-r--r--
firewalld_v2.py
11.09
KB
-rw-r--r--
firewalls_v2.py
17.44
KB
-rw-r--r--
flask_compress_v2.py
5.12
KB
-rw-r--r--
flask_sockets_v2.py
3.75
KB
-rw-r--r--
ftp_log_v2.py
21.72
KB
-rw-r--r--
ftp_v2.py
16.17
KB
-rw-r--r--
http_requests_v2.py
24.25
KB
-rw-r--r--
jobs_v2.py
36.98
KB
-rw-r--r--
letsencrypt_v2.py
12.85
KB
-rw-r--r--
log_analysis_v2.py
12.23
KB
-rw-r--r--
monitor_v2.py
13.53
KB
-rw-r--r--
one_key_wp_v2.py
75.79
KB
-rw-r--r--
panelControllerV2.py
4.97
KB
-rw-r--r--
panelDatabaseControllerV2.py
5.76
KB
-rw-r--r--
panelDockerControllerV2.py
5.86
KB
-rw-r--r--
panelFireControllerV2.py
4.65
KB
-rw-r--r--
panelModControllerV2.py
5.13
KB
-rw-r--r--
panelProjectControllerV2.py
6.07
KB
-rw-r--r--
panelSafeControllerV2.py
4.65
KB
-rw-r--r--
panel_api_v2.py
10.43
KB
-rw-r--r--
panel_auth_v2.py
33.21
KB
-rw-r--r--
panel_backup_v2.py
102.56
KB
-rw-r--r--
panel_dns_api_v2.py
22.2
KB
-rw-r--r--
panel_http_proxy_v2.py
11.33
KB
-rw-r--r--
panel_lets_v2.py
43.61
KB
-rw-r--r--
panel_mssql_v2.py
4.48
KB
-rw-r--r--
panel_mysql_v2.py
7.55
KB
-rw-r--r--
panel_php_v2.py
24.78
KB
-rw-r--r--
panel_ping_v2.py
2.88
KB
-rw-r--r--
panel_plugin_v2.py
125.11
KB
-rw-r--r--
panel_push_v2.py
23.78
KB
-rw-r--r--
panel_redirect_v2.py
34.02
KB
-rw-r--r--
panel_restore_v2.py
11.04
KB
-rw-r--r--
panel_site_v2.py
343.73
KB
-rw-r--r--
panel_ssl_v2.py
75.34
KB
-rw-r--r--
panel_task_v2.py
28.7
KB
-rw-r--r--
panel_video_V2.py
1.88
KB
-rw-r--r--
panel_warning_v2.py
68.71
KB
-rw-r--r--
password_v2.py
8.09
KB
-rw-r--r--
plugin_auth_v2.py
3.14
KB
-rw-r--r--
plugin_deployment_v2.py
28.85
KB
-rw-r--r--
san_baseline_v2.py
51.13
KB
-rw-r--r--
site_dir_auth_v2.py
17.67
KB
-rw-r--r--
ssh_security_v2.py
45.66
KB
-rw-r--r--
ssh_terminal_v2.py
58.86
KB
-rw-r--r--
system_v2.py
44.77
KB
-rw-r--r--
userRegister_v2.py
6.74
KB
-rw-r--r--
user_login_v2.py
21.2
KB
-rw-r--r--
vilidate_v2.py
4.94
KB
-rw-r--r--
wxapp_v2.py
5.62
KB
-rw-r--r--
Delete
Unzip
Zip
${this.title}
Close
Code Editor : one_key_wp_v2.py
# coding: utf-8 # +------------------------------------------------------------------- # | aaPanel # +------------------------------------------------------------------- # | Copyright (c) 2015-2016 aaPanel(www.aapanel.com) All rights reserved. # +------------------------------------------------------------------- # | Author: zhwen <zhwen@aapanel.com> # +------------------------------------------------------------------- import os import time import json import sys import public import re import requests from bs4 import BeautifulSoup import panel_mysql_v2 as panelMysql from public.validate import Param # import wp-toolkit core # from wp_toolkit import wp_version, wpmgr, wpfastcgi_cache def get_mem(): import psutil mem = psutil.virtual_memory() memInfo = {'memTotal': int(mem.total / 1024 / 1024), 'memFree': int(mem.free / 1024 / 1024), 'memBuffers': int(mem.buffers / 1024 / 1024), 'memCached': int(mem.cached / 1024 / 1024)} return memInfo['memTotal'] class one_key_wp: wp_package_url = 'https://wordpress.org/latest.zip' # wp_package_url = 'http://download.bt.cn/install/package/wordpress-5.9.1.zip' base_path = "/www/server/panel/" wp_session_path = '{}/data/wp_session'.format(base_path) package_zip = '{}/package/wp.zip'.format(base_path) md5_file = '{}/package/md5'.format(base_path) log_path = '/tmp/schedule.log' __php_tmp_file = '/tmp/wp_tmp'.format(base_path) panel_db = "/www/server/panel/data/default.db" timeout_count = 0 old_time = 0 __wp_session = None __session_resp = None __ajax_nonce = None __domain = None __plugin_page_content = None wp_user = None wp_passwd = None def __init__(self): self.create_wp_table() if not self.__wp_session: self.__wp_session = requests.Session() # import PluginLoader # self.__IS_PRO_MEMBER = PluginLoader.get_auth_state() > 0 def get_headers(self): return { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.99 Safari/537.36', 'Host': self.__domain } def get_plugin_page(self): _get_ajax_nonce_url = "http://{}/wp-admin/plugins.php".format(self.__domain) self.__plugin_page_content = self.__wp_session.get(_get_ajax_nonce_url, headers=self.get_headers(), verify=False).text def get_wp_nonce(self, content, rep): rex = re.search(rep, content) if rex: return rex.group(1) def get_nonce(self): resp = self.__session_resp if resp != '1': resp = resp.text else: self.get_plugin_page() resp = self.__plugin_page_content _ajax_nonce_rep = '"ajax_nonce\\\"\\:\\\"(\\w+)' rex = re.search(_ajax_nonce_rep, resp) if rex: self.__ajax_nonce = rex.group(1) def __load_cookies(self, s_id): self.__domain = self.get_wp_auth(s_id) f = '{}/{}'.format(self.wp_session_path, self.__domain) c = public.readFile(f) if c: mtime = os.path.getmtime(f) expried = 86400 if time.time() - mtime > expried: print('cookies expired, re-login wordpress') self.__login_wp() else: print('use local session') self.__wp_session.cookies.update(json.loads(c)) self.__session_resp = '1' else: print('No local cookies, login wordpress') self.__login_wp() # self.__login_wp() # print('login success!') try: self.get_nonce() except: pass if not self.__ajax_nonce: self.__login_wp() self.get_nonce() self.get_plugin_page() def __save_cookies(self): if not os.path.exists(self.wp_session_path): os.mkdir(self.wp_session_path) f = '{}/{}'.format(self.wp_session_path, self.__domain) return public.writeFile(f, json.dumps(self.__wp_session.cookies.get_dict())) def __login_wp(self): wp_login = 'http://{}/wp-login.php'.format(self.__domain) headers1 = {'Cookie': 'wordpress_test_cookie=WP Cookie check', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.99 Safari/537.36', # 'referer':"https://{}/wp-login.php?loggedout=true&wp_lang=en_US".format(self.__domain), 'Host': self.__domain } datas = { 'log': self.wp_user, 'pwd': self.wp_passwd, 'wp-submit': 'Log In', # 'redirect_to': 'https://{}/wp-admin'.format(self.__domain), 'testcookie': '1' } # hosts = public.readFile('/etc/hosts') # if not hosts: # return False # if not re.search(r'127.0.0.1\s+{}'.format(self.__domain),hosts): # public.writeFile('/etc/hosts','\n127.0.0.1 {}'.format(self.__domain),'a+') self.__session_resp = retry( lambda: self.__wp_session.post(wp_login, headers=headers1, data=datas, verify=False)) # cookies持久化 self.__save_cookies() # 写输出日志 def write_logs(self, log_msg, clean=None): if clean: fp = open(self.log_path, 'w') fp.write(log_msg) fp.close() fp = open(self.log_path, 'a+') fp.write(log_msg + '\n') fp.close() # 下载文件 def download_file(self): try: path = os.path.dirname(self.package_zip) if not os.path.exists(path): os.makedirs(path) import urllib, socket, ssl ssl._create_default_https_context = ssl._create_unverified_context socket.setdefaulttimeout(10) self.pre = 0 self.old_time = time.time() print("Download the installation package: {} --> {}".format(self.wp_package_url, self.package_zip)) self.write_logs( "|-Download the installation package: {} --> {}".format(self.wp_package_url, self.package_zip)) if sys.version_info[0] == 2: urllib.urlretrieve(self.wp_package_url, filename=self.package_zip, reporthook=self.download_hook) else: urllib.request.urlretrieve(self.wp_package_url, filename=self.package_zip, reporthook=self.download_hook) md5str = public.FileMd5(self.package_zip) public.writeFile(self.md5_file, str(md5str)) except: print("Download error: {}".format(public.get_error_info())) if self.timeout_count > 5: return; self.timeout_count += 1 time.sleep(5) self.download_file() # 下载文件进度回调 def download_hook(self, count, blockSize, totalSize): used = count * blockSize pre1 = int((100.0 * used / totalSize)) if self.pre != pre1: dspeed = used / (time.time() - self.old_time) self.pre = pre1 def check_package(self): # 检查本地包 download = False md5str = None if os.path.exists(self.package_zip): md5str = public.FileMd5(self.package_zip) if os.path.exists(self.md5_file) and md5str: if md5str != public.readFile(self.md5_file): download = True else: download = True return download def download_latest_package(self): print("Start downloading the installation package...") self.write_logs("|-Start downloading the installation package...") if os.path.exists(self.package_zip): # 获取文件的最后修改时间 modified_time = os.path.getmtime(self.package_zip) # 计算当前时间与最后修改时间的差值 time_diff = time.time() - modified_time time2 = 30 * 24 * 60 * 60 if time_diff > time2: # 如果超过30天,则删除文件 os.remove(self.package_zip) os.remove(self.md5_file) self.write_logs("|-Del package...") if not self.check_package(): print("|-MD5 consistent, no need to download...") return public.return_msg_gettext(True, public.lang("MD5 consistent!")) self.download_file() if not os.path.exists(self.package_zip): self.write_logs("|-Download failed...") print("Download failed...") return public.return_msg_gettext(False, public.lang("File download failed!")) return public.return_msg_gettext(True, public.lang("Download successfully")) def unzip_package(self, site_path): print("Start unzipping the installation package...") self.write_logs("|-Start unzipping the installation package...") public.ExecShell('unzip -o {} -d {}/'.format(self.package_zip, site_path)) public.ExecShell('mv {}/wordpress/* {}'.format(site_path, site_path)) os.removedirs("{}/wordpress/".format(site_path)) print("Start setting up site permissions...") self.write_logs("|-Start setting up site permissions...") self.set_permission(site_path) def set_permission(self, site_path): os.system('chmod -R 755 ' + site_path) os.system('chown -R www.www ' + site_path) def set_urlrewrite(self, site_name, site_path): webserver = public.get_webserver() if webserver == 'openlitespeed': webserver = 'apache' swfile = '/www/server/panel/rewrite/{}/wordpress.conf'.format(webserver) if os.path.exists(swfile): rewriteConf = public.readFile(swfile) if webserver == 'nginx': dwfile = '{}/vhost/rewrite/{}.conf'.format(self.base_path, site_name) else: dwfile = '{}/.htaccess'.format(site_path) public.writeFile(dwfile, rewriteConf) def write_db(self, s_id, d_id, prefix, user_name, admin_password): print("Inserting data...") pdata = {"s_id": s_id, "d_id": d_id, "prefix": prefix, "user": user_name, "pass": admin_password} public.M('wordpress_onekey').where('s_id=?', (s_id,)).field('s_id').find() if public.M('wordpress_onekey').where('s_id=?', (s_id,)).field('s_id').find(): print("Data already exists, update data...") public.M('wordpress_onekey').where('s_id=?', (s_id,)).update(pdata) return print("Insert data...") print("Insert data:{}".format(pdata)) print("Result:{}".format(public.M('wordpress_onekey').insert(pdata))) def create_wp_table(self): if not public.M('sqlite_master').where('type=? AND name=?', ('table', 'wordpress_onekey')).count(): public.M('').execute('''CREATE TABLE "wordpress_onekey" ( "id" INTEGER PRIMARY KEY AUTOINCREMENT, "s_id" INTEGER DEFAULT '', "d_id" INTEGER DEFAULT '', "prefix" TEXT DEFAULT '', "user" TEXT DEFAULT '', "pass" TEXT DEFAULT '');''') def init_wp(self, values): hosts = public.readFile('/etc/hosts') if not hosts: return False if not re.search(r'127.0.0.1\s+{}'.format(values['site_name']), hosts): public.writeFile('/etc/hosts', '\n127.0.0.1 {}'.format(values['site_name']), 'a+') self.write_logs("|-Start initializing Wordpress...") from wp_toolkit import wpmgr # 初始化WP管理类 wpmgr_obj = wpmgr(values['s_id']) # 初始化WP网站配置 self.write_logs("|-Start setup configurations...") ok, msg = wpmgr_obj.setup_config(values['dbname'], values['db_user'], values['db_pwd'], 'localhost', values['prefix']) self.write_logs('|-Setup config >>> {} {}'.format('OK' if ok else 'FAIL', msg)) if not ok: raise Exception(msg) # 初始化WP网站信息 self.write_logs("|-Start installations...") ok, msg = wpmgr_obj.wp_install(values['weblog_title'], values['user_name'], values['admin_email'], values['admin_password'], values['language']) self.write_logs('|-Installation {} {}'.format('OK' if ok else 'FAIL', msg)) if not ok: raise Exception(msg) # if self.__IS_PRO_MEMBER: # from wp_toolkit import wpmgr # # # 初始化WP管理类 # wpmgr_obj = wpmgr(values['s_id']) # # # 初始化WP网站配置 # self.write_logs("|-Start setup configurations...") # ok, msg = wpmgr_obj.setup_config(values['dbname'], values['db_user'], values['db_pwd'], 'localhost', values['prefix']) # self.write_logs('|-Setup config >>> {} {}'.format('OK' if ok else 'FAIL', msg)) # # if not ok: # raise Exception(msg) # # # 初始化WP网站信息 # self.write_logs("|-Start installations...") # ok, msg = wpmgr_obj.wp_install(values['weblog_title'], values['user_name'], values['admin_email'], values['admin_password'], values['language']) # self.write_logs('|-Installation {} {}'.format('OK' if ok else 'FAIL', msg)) # # if not ok: # raise Exception(msg) # else: # self.request_setup_0(values) # time.sleep(1) # if not self.request_setup_2(values): # return public.return_msg_gettext(False, # "The database connection is abnormal. Please check whether the root user authority or database configuration parameters are correct.") # time.sleep(1) # self.request_setup_3(values) # time.sleep(1) # self.request_setup_4(values) # time.sleep(1) # 配置伪静态规则 self.set_urlrewrite(values['site_name'], values['site_path']) def request_setup_0(self, values): self.write_logs("|-Start initializing Wordpress language...") url = "http://{}/wp-admin/setup-config.php?step=0".format(values['site_name']) param = { "url": url, "data": {"language": values['language']}, "headers": {"Host": values['domain']} } # result = self.request_wp_api(param) response = retry(lambda: self.request_wp_api(param)) # public.print_log("开始检查--request_setup_0_result:{}".format(response)) def request_setup_2(self, values): self.write_logs("|-Start initializing Wordpress config...") url = "http://{}/wp-admin/setup-config.php?step=2".format(values['site_name']) param = { "url": url, "headers": {"Host": values['domain']}, "data": { "dbname": values['dbname'], "uname": values['db_user'], "pwd": values['db_pwd'], "dbhost": "localhost", "prefix": values['prefix'], "language": values['language'], "submit": "Submit", } } # result = self.request_wp_api(param) response = retry(lambda: self.request_wp_api(param)) if "install.php?language=ja": return True def request_setup_3(self, values): self.write_logs("|-Start initializing Wordpress install language...") url = "http://{}/wp-admin/install.php?language={}".format(values['site_name'], values['language']) param = { "url": url, "headers": {"Host": values['domain']}, "data": { "language": values['language'] } } # self.request_wp_api(param) response = retry(lambda: self.request_wp_api(param)) def request_setup_4(self, values): self.write_logs("|-Start installing the wordpress program...") url = "http://{}/wp-admin/install.php?step=2".format(values['site_name']) param = { "url": url, "headers": {"Host": values['domain']}, "data": { "weblog_title": values['weblog_title'], "user_name": values['user_name'], "admin_password": values['admin_password'], "admin_password2": values['admin_password'], # "pw_weak": values['pw_weak'], # on/off "admin_email": values['admin_email'], # "Submit": "Install WordPress", "language": values['language'] } } # self.request_wp_api(param) response = retry(lambda: self.request_wp_api(param)) def request_wp_api(self, param): """需要指定域名得host为本机IP""" resp = requests.post(param['url'], data=param['data'], headers=param['headers']) # public.print_log("开始检查--request_wp_api:{}".format(resp)) # public.print_log("开始检查--request_wp_api_resp.text:{}".format(resp.text)) return resp.text def get_update_wp_nonce(self): url = "http://{}/wp-admin/update-core.php".format(self.__domain) # res = self.action_plugin_get(url) _wp_nonce_rep = '"_wpnonce\\\"\\svalue=\\\"(\\w+)' # rex = re.search(_wp_nonce_rep,res) # if rex: # self.__wp_nonce = rex.group(1) # return self.get_wp_nonce(self.action_plugin_get(url),_wp_nonce_rep) response = retry(lambda: self.get_wp_nonce(self.action_plugin_get(url), _wp_nonce_rep)) return response def action_plugin_post(self, param): headers = self.get_headers() if 'upgrade' in param['data']: param['data']['_wpnonce'] = self.get_update_wp_nonce() headers['referer'] = 'http://{}/wp-admin/update-core.php'.format(param['domain']) response = retry( lambda: self.__wp_session.post(param['url'], data=param['data'], headers=headers, verify=False).text) return response # return self.__wp_session.post(param['url'],data=param['data'], headers=headers, verify=False).text def action_plugin_get(self, url, headers=None): if not headers: headers = self.get_headers() # headers['Referer'] = "http://{}/wp-admin".format(self.__domain) response = retry(lambda: self.__wp_session.get(url, headers=headers, verify=False).text) return response # return self.__wp_session.get(url, headers=headers, verify=False).text def install_plugin(self, values): self.write_logs("|-Start installing the [nginx-helper] plugin...") self.__load_cookies(values['s_id']) param = { "url": "http://{}/wp-admin/admin-ajax.php".format(self.__domain), "domain": values['domain'], "data": { "slug": values['slug'], # 插件名称 nginx-helper "action": values['wp_action'], # action已经被面板占用 install-plugin "_ajax_nonce": self.__ajax_nonce, "_fs_nonce": "", "username": "", "password": "", "connection_type": "", "public_key": "", "private_key": "" } } res = self.action_plugin_post(param) return res def get_smart_http_expire_form_nonce(self, url): public.writeFile('/tmp/2', str(self.action_plugin_get(url))) smart_http_expire_form_nonce = '"smart_http_expire_form_nonce\\\"\\svalue=\\\"(\\w+)' return self.get_wp_nonce(self.action_plugin_get(url), smart_http_expire_form_nonce) def set_nginx_helper(self, values): self.write_logs("|-Setting up [nginx-helper] plugin cache rules...") self.__load_cookies(values['s_id']) url = "http://{}/wp-admin/options-general.php?page=nginx".format(self.__domain) param = { "url": url, "domain": values['domain'], "data": { "enable_purge": "1", "is_submit": "1", "cache_method": "enable_fastcgi", "purge_method": "unlink_files", "redis_hostname": "127.0.0.1", "redis_port": "6379", "redis_prefix": "nginx-cache", "purge_homepage_on_edit": "1", "purge_homepage_on_del": "1", "purge_page_on_mod": "1", "purge_page_on_new_comment": "1", "purge_page_on_deleted_comment": "1", "purge_archive_on_edit": "1", "purge_archive_on_del": "1", "purge_archive_on_new_comment": "1", "purge_archive_on_deleted_comment": "1", "purge_url": "", "log_level": "INFO", "log_filesize": "5", "smart_http_expire_form_nonce": self.get_smart_http_expire_form_nonce(url), "smart_http_expire_save": "Save All Changes" } } return self.action_plugin_post(param) def act_nginx_helper_active(self, values): self.write_logs("|-activating [nginx-helper] plugin...") self.__load_cookies(values['s_id']) values['active_id'] = "activate-nginx-helper" self.get_plugin_page() res = self.get_plugin_url(values['active_id'], self.__plugin_page_content) url = 'http://{}/wp-admin/{data}'.format(self.__domain, data=str(str(res[0]).split('"')[5])) url = url.replace('amp;', '') return self.action_plugin_get(url) def get_plugin_url(self, active_id, content): # soup = BeautifulSoup(content) soup = BeautifulSoup(content, features="html.parser") res = soup.find_all(id=active_id) return res def generate_wp_passwd(self, site_path, new_pass): hash_password_code = public.readFile("{}/wp-includes/class-phpass.php".format(site_path)) extra_code = """ $passwordValue = "%s"; $wp_hasher = new PasswordHash(8, TRUE); $sigPassword = $wp_hasher->HashPassword($passwordValue); $data = $wp_hasher->CheckPassword($passwordValue,$sigPassword); if($data){ echo 'True|'.$sigPassword;; }else{ echo 'False|'.$sigPassword;; } """ % new_pass php_code = hash_password_code + extra_code public.writeFile(self.__php_tmp_file, php_code) a, e = public.ExecShell("php -f {}".format(self.__php_tmp_file)) os.remove(self.__php_tmp_file) res = a.split('|') if res[0] == "True": return public.return_message(0, 0, res[1]) return public.return_message(-1, 0, public.lang("Generated password detection failed!")) def get_cache_status(self, s_id): """ s_id 网站id """ import data site_info = public.M('sites').where('id=?', (s_id,)).field('name').find() if not isinstance(site_info, dict): return False site_name = site_info['name'] # 获取WP站点绑定的PHP可执行文件 from public import websitemgr php_v = websitemgr.get_site_php_version(site_name).replace('.', '') from wp_toolkit import wpfastcgi_cache return wpfastcgi_cache().get_fast_cgi_status(site_name, php_v) # return fast_cgi().get_fast_cgi_status(site_name, php_v) ##############################对外接口—BEGIN############################## def get_wp_username(self, get): """ s_id 网站ID """ # 校验参数 try: get.validate([ Param('s_id').Integer(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) values = self.check_param(get) if values['status'] == -1: return values values = values['message'] db_info = public.M('wordpress_onekey').where('s_id=?', (values['s_id'],)).find() db_name = public.M('databases').where('id=?', (db_info['d_id'],)).field('name').find() if "name" not in db_name: return public.return_message(-1, 0, public.lang("The database of this wordpress was not found, this may be caused by the fact that you have manually deleted the database")) db_name = db_name['name'] mysql_obj = panelMysql.panelMysql() res = mysql_obj.query('select * from {}.{}users'.format( db_name, db_info['prefix'])) if hasattr(res, '__iter__'): return public.return_message(0, 0, [i[1] for i in res]) else: return public.return_message(-1, 0, "Site database [{}] failed to query users, try setting the database for this site. Error: {}".format( db_name, res)) def reset_wp_password(self, get): """ 重置wordpress用户密码 s_id 网站ID user 要重置的用户名 new_pass """ # 校验参数 try: get.validate([ Param('user').String(), Param('new_pass').String(), Param('s_id').Integer(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) values = self.check_param(get) if values['status'] == -1: return values values = values['message'] s_id = values['s_id'] db_info = public.M('wordpress_onekey').where('s_id=?', (s_id,)).find() db_name = public.M('databases').where('id=?', (db_info['d_id'],)).field('name').find()['name'] path = public.M('sites').where('id=?', (s_id,)).field('path').find()['path'] new_pass = values['new_pass'] passwd = self.generate_wp_passwd(path, new_pass) if passwd['status'] == -1: return passwd passwd = passwd['message'] if isinstance(passwd, dict): passwd = passwd.get('result', None) if passwd is None: return public.fail_v2('Reset password failed') mysql_obj = panelMysql.panelMysql() sql = 'update {}.{}users set user_pass = "{}" where user_login = "{}"'.format( db_name, db_info['prefix'], passwd, values['user']) mysql_obj.execute(sql) return public.return_message(0, 0, public.lang("Password reset successful")) # 获取WP可用版本列表 def get_wp_available_versions(self, args: public.dict_obj): # 校验参数 try: args.validate([ Param('php_version_short').Integer(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) from wp_toolkit import wp_version versions = list(map(lambda x: { 'locale': x['locale'], 'version': x['version'], 'php_version': x['php_version'], 'mysql_version': x['mysql_version'], }, wp_version().latest_versions())) if 'php_version_short' in args: versions = list( filter(lambda x: int(args.php_version_short) >= int(''.join(str(x['php_version']).split('.')[:2])), versions)) return public.success_v2(versions) # 获取WP网站本地版本号 def get_wp_version(self, s_id): """获取wordpress本地版本 s_id 网站id """ path = public.M('sites').where('id=?', (s_id,)).field('path').find()['path'] conf_file = "{}/wp-includes/version.php".format(path) conf = public.readFile(conf_file) try: version = re.search('\\$wp_version\\s*=\\s*[\'\"]{1}([\\d\\.]*)[\'\"]{1}', conf).groups(1)[0] except: version = "00" return version # 获取WP已发布的最新版本号 def get_wp_version_online(self): """获取wordpress线上版本""" url = "http://wordpress.org/download/" headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.99 Safari/537.36' } result = requests.get(url, headers=headers) result = re.search(r'Download\s+WordPress\s+([\d\.]+)', result.text) if result: return result.group(1) return "00" # 检查WP是否有新版本可以更新 def is_update(self, get): """ s_id 网站ID """ values = self.check_param(get) if values['status'] == -1: return values values = values['message'] online_v = self.get_wp_version_online() local_v = self.get_wp_version(values['s_id']) update = False if str(online_v) != str(local_v): update = True data = { "online_v": online_v, "local_v": local_v, "update": update } return_message = public.return_msg_gettext(True, data) del return_message['status'] return public.return_message(0, 0, return_message['msg']) def purge_all_cache(self, get): """ 清理所有缓存 """ # 校验参数 try: get.validate([ Param('s_id').Integer(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) if public.get_webserver() != "nginx": return public.return_message(-1, 0, public.lang("This feature currently only supports Nginx")) from wp_toolkit import wpmgr wpmgr(get.s_id).purge_cache_with_nginx_helper() # if self.__IS_PRO_MEMBER: # from wp_toolkit import wpmgr # wpmgr(get.s_id).purge_cache_with_nginx_helper() # else: # cache_dir = "/dev/shm/nginx-cache/wp" # public.ExecShell("rm -rf {}/*".format(cache_dir)) return public.return_message(0, 0, public.lang("Cleaned up successfully!")) def set_fastcgi_cache(self, get): """ 设置缓存 version php版本 sitename 完整名 act disable/enable 开启关闭缓存 """ # 校验参数 try: get.validate([ Param('version').String(), Param('sitename').String(), Param('act').String(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) if public.get_webserver() != "nginx": return public.return_message(-1, 0, public.lang("This feature currently only supports Nginx")) values = self.check_param(get) if values['status'] == -1: return values values = values['message'] if '.' in values['version']: values['version'] = values['version'].replace('.', '') from wp_toolkit import wpfastcgi_cache ok, msg = wpfastcgi_cache().set_website_conf(values['version'], values['sitename'], values['act'], immediate=True) if not ok: return public.fail_v2(msg) return public.success_v2(msg) # if self.__IS_PRO_MEMBER: # from wp_toolkit import wpfastcgi_cache # # ok, msg = wpfastcgi_cache().set_website_conf(values['version'], values['sitename'], values['act'], immediate=True) # # if not ok: # return public.fail_v2(msg) # # return public.success_v2(msg) # else: # return fast_cgi().set_website_conf(values['version'], values['sitename'], values['act']) # 使用网站ID查询WP站点域名 def get_wp_auth(self, s_id): domain = public.M('domain').where("pid=?", (s_id,)).field('name').find()['name'] info = public.M('wordpress_onekey').where("s_id=?", (s_id,)).find() self.wp_user = info['user'] self.wp_passwd = info['pass'] return domain # 更新WP版本 def update_wp(self, args): """更新wordpress版本 s_id 网站ID version 需要更新的版本 """ from wp_toolkit import wpmgr # 校验参数 try: args.validate([ Param('s_id').Require().Integer('>', 0), Param('version').Require().Regexp(r'^\d+(?:\.\d+)+$'), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) ok, msg = wpmgr(args.s_id).update_version(args.version) if not ok: return public.fail_v2(msg) return public.success_v2('Upgrade to {}'.format(msg)) # if self.__IS_PRO_MEMBER: # from wp_toolkit import wpmgr # # # 校验参数 # try: # args.validate([ # Param('s_id').Require().Integer('>', 0), # Param('version').Require().Regexp(r'^\d+(?:\.\d+)+$'), # ], [ # public.validate.trim_filter(), # ]) # except Exception as ex: # public.print_log("error info: {}".format(ex)) # return public.return_message(-1, 0, str(ex)) # # ok, msg = wpmgr(args.s_id).update_version(args.version) # # if not ok: # return public.fail_v2(msg) # # return public.success_v2('Upgrade to {}'.format(msg)) # else: # self.__load_cookies(args.s_id) # param = { # "url": "http://{}/wp-admin/update-core.php?action=do-core-upgrade".format(self.__domain), # "domain": self.__domain, # "data": { # # "_wpnonce":get._wpnonce, # "_wp_http_referer": '/wp-admin/update-core.php', # "version": args.version, # "locale": "en_US", # "upgrade": "Update to version {}".format(args.version) # } # } # self.action_plugin_post(param) # return_message=public.return_msg_gettext(True, 'Updated successfully!') # del return_message['status'] # return public.return_message(0,0, return_message['msg']) # 获取可用的语言列表 def get_language(self, get=None): language = { "en": "English (United States)", "af": "Afrikaans", "am": "አማርኛ", "ar": "العربية", "ary": "العربية المغربية", "as": "অসমীয়া", "az": "Azərbaycan dili", "azb": "گؤنئی آذربایجان", "bel": "Беларуская мова", "bg_BG": "Български", "bn_BD": "বাংলা", "bo": "བོད་ཡིག", "bs_BA": "Bosanski", "ca": "Català", "ceb": "Cebuano", "cs_CZ": "Čeština", "cy": "Cymraeg", "da_DK": "Dansk", "de_CH_informal": "Deutsch (Schweiz, Du)", "de_DE": "Deutsch", "de_DE_formal": "Deutsch (Sie)", "de_CH": "Deutsch (Schweiz)", "de_AT": "Deutsch (Österreich)", "dsb": "Dolnoserbšćina", "dzo": "རྫོང་ཁ", "el": "Ελληνικά", "en_US": "English (United States)", "en_NZ": "English (New Zealand)", "en_AU": "English (Australia)", "en_CA": "English (Canada)", "en_GB": "English (UK)", "en_ZA": "English (South Africa)", "eo": "Esperanto", "es_ES": "Español", "es_EC": "Español de Ecuador", "es_CO": "Español de Colombia", "es_AR": "Español de Argentina", "es_DO": "Español de República Dominicana", "es_PE": "Español de Perú", "es_CR": "Español de Costa Rica", "es_UY": "Español de Uruguay", "es_CL": "Español de Chile", "es_PR": "Español de Puerto Rico", "es_VE": "Español de Venezuela", "es_GT": "Español de Guatemala", "es_MX": "Español de México", "et": "Eesti", "eu": "Euskara", "fa_IR": "فارسی", "fa_AF": "(فارسی (افغانستان", "fi": "Suomi", "fr_FR": "Français", "fr_BE": "Français de Belgique", "fr_CA": "Français du Canada", "fur": "Friulian", "gd": "Gàidhlig", "gl_ES": "Galego", "gu": "ગુજરાતી", "haz": "هزاره گی", "he_IL": "עִבְרִית", "hi_IN": "हिन्दी", "hr": "Hrvatski", "hsb": "Hornjoserbšćina", "hu_HU": "Magyar", "hy": "Հայերեն", "id_ID": "Bahasa Indonesia", "is_IS": "Íslenska", "it_IT": "Italiano", "ja": "日本語", "jv_ID": "Basa Jawa", "ka_GE": "ქართული", "kab": "Taqbaylit", "kk": "Қазақ тілі", "km": "ភាសាខ្មែរ", "kn": "ಕನ್ನಡ", "ko_KR": "한국어", "ckb": "كوردی", "lo": "ພາສາລາວ", "lt_LT": "Lietuvių kalba", "lv": "Latviešu valoda", "mk_MK": "Македонски јазик", "ml_IN": "മലയാളം", "mn": "Монгол", "mr": "मराठी", "ms_MY": "Bahasa Melayu", "my_MM": "ဗမာစာ", "nb_NO": "Norsk bokmål", "ne_NP": "नेपाली", "nl_NL_formal": "Nederlands (Formeel)", "nl_NL": "Nederlands", "nl_BE": "Nederlands (België)", "nn_NO": "Norsk nynorsk", "oci": "Occitan", "pa_IN": "ਪੰਜਾਬੀ", "pl_PL": "Polski", "ps": "پښتو", "pt_BR": "Português do Brasil", "pt_AO": "Português de Angola", "pt_PT_ao90": "Português (AO90)", "pt_PT": "Português", "rhg": "Ruáinga", "ro_RO": "Română", "ru_RU": "Русский", "sah": "Сахалыы", "snd": "سنڌي", "si_LK": "සිංහල", "sk_SK": "Slovenčina", "skr": "سرائیکی", "sl_SI": "Slovenščina", "sq": "Shqip", "sr_RS": "Српски језик", "sv_SE": "Svenska", "sw": "Kiswahili", "szl": "Ślōnskŏ gŏdka", "ta_IN": "தமிழ்", "ta_LK": "தமிழ்", "te": "తెలుగు", "th": "ไทย", "tl": "Tagalog", "tr_TR": "Türkçe", "tt_RU": "Татар теле", "tah": "Reo Tahiti", "ug_CN": "ئۇيغۇرچە", "uk": "Українська", "ur": "اردو", "uz_UZ": "O‘zbekcha", "vi": "Tiếng Việt", "zh_TW": "繁體中文", "zh_HK": "香港中文版 ", "zh_CN": "简体中文", } return_message = public.return_msg_gettext(True, language) del return_message['status'] return public.return_message(0, 0, return_message['msg']) # 请求参数校验 def check_param(self, args): """ @name 检测传入参数 @author zhwen<2022-03-10> """ # 检查email格式 rep_email = r"[\w!#$%&'*+/=?^_`{|}~-]+(?:\.[\w!#$%&'*+/=?^_`{|}~-]+)*@(?:[\w](?:[\w-]*[\w])?\.)+[\w](?:[\w-]*[\w])?" # 检查域名格式 rep_domain = r"^(?=^.{3,255}$)[a-zA-Z0-9\_\-][a-zA-Z0-9\_\-]{0,62}(\.[a-zA-Z0-9\_\-][a-zA-Z0-9\_\-]{0,62})+$" values = {} if hasattr(args, 'd_id'): if isinstance(args.d_id, int) or re.search(r'\d+', args.d_id): values["d_id"] = args.d_id else: return public.return_message(-1, 0, public.lang("Please check if the [{}] format is correct For example: {}", "d_id", "99")) if hasattr(args, 's_id'): if isinstance(args.s_id, int) or re.search(r'\d+', args.s_id): values["s_id"] = args.s_id else: return public.return_message(-1, 0, public.lang("Please check if the [{}] format is correct For example: {}", "s_id", "99")) if hasattr(args, 'language'): if args.language in self.get_language()['message']: values['language'] = args.language else: return public.return_message(-1, 0, public.lang("Please check if the [{}] format is correct For example: {}", "language", "en")) if hasattr(args, 'domain'): if re.search(rep_domain, args.domain): values['domain'] = public.xssencode2(args.domain) else: return public.return_message(-1, 0, public.lang("Please check if the [{}] format is correct For example: {}", "domain", "aapanel.com")) if hasattr(args, 'weblog_title'): values['weblog_title'] = public.xssencode2(args.weblog_title) if hasattr(args, 'user_name'): values['user_name'] = public.xssencode2(args.user_name) if hasattr(args, 'admin_password'): values['admin_password'] = public.xssencode2(args.admin_password) if hasattr(args, 'pw_weak'): if args.pw_weak in ['on', 'off']: values['pw_weak'] = args.pw_weak else: return public.return_message(-1, 0, public.lang("Please check if the [{}] format is correct For example: {}", "pw_weak", "on/off")) if hasattr(args, 'admin_email'): if re.search(rep_email, args.admin_email): values['admin_email'] = public.xssencode2(args.admin_email) else: return public.return_message(-1, 0, public.lang("Please check if the [{}] format is correct For example: {}", "admin_email", "adimn@aapanel.com")) if hasattr(args, 'prefix'): values['prefix'] = public.xssencode2(args.prefix) if hasattr(args, 'php_version'): values['php_version'] = public.xssencode2(args.php_version) if hasattr(args, 'enable_cache'): values['enable_cache'] = public.xssencode2(args.enable_cache) if hasattr(args, 'wp_action'): values['slug'] = public.xssencode2(args.slug) if hasattr(args, 'slug'): values['wp_action'] = public.xssencode2(args.wp_action) if hasattr(args, 'active_id'): values['active_id'] = public.xssencode2(args.active_id) if hasattr(args, 'new_pass'): values['new_pass'] = public.xssencode2(args.new_pass) if hasattr(args, 'user'): values['user'] = public.xssencode2(args.user) if hasattr(args, 'sitename'): values['sitename'] = public.xssencode2(args.sitename) if hasattr(args, 'act'): values['act'] = public.xssencode2(args.act) if hasattr(args, 'version'): values['version'] = public.xssencode2(args.version) return public.return_message(0, 0, values) # 删除网站 def del_site(self, get): import panelSite p = panelSite.panelSite() site_info = public.M('sites').where('id=?', (get.s_id,)).find() get.id = get.s_id get.webname = site_info['name'] get.ftp = "1" get.database = "1" get.path = "1" p.DeleteSite(get) # 安装WP def deploy_wp(self, get): """ d_id 数据据库ID s_id 网站ID language 部署wordpress后的语言 weblog_title wordpress博客名 user_name wordpress后台用户名 admin_password 管理员密码 admin_password2 pw_weak 允许弱密码 admin_email 管理员邮箱 prefix wordpress数据表前缀 php_version php版本 enable_cache 开启缓存 package_version Wordpress版本 @name 部署wordpress @author zhwen<2022-03-10> """ # 校验参数 try: get.validate([ Param('domain').String().Host(), Param('weblog_title').String(), Param('language').String(), Param('php_version').String(), Param('user_name').String(), Param('admin_email').String().Email(), Param('prefix').String(), Param('pw_weak').String(), Param('admin_password').String(), Param('enable_cache').Integer(), Param('d_id').Integer(), Param('s_id').Integer(), Param('package_version').String(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) try: self.write_logs('', clean=True) values = self.check_param(get) if values['status'] == -1: # 删除自动创建的空白网站 self.del_site(get) return values values = values['message'] s_id = values['s_id'] # 网站ID d_id = values['d_id'] # 数据库ID prefix = values['prefix'] # 前缀 site_info = public.M('sites').where('id=?', (s_id,)).find() print("Get site info:\n ID: {}\npath: {}\n".format(s_id, site_info['path'])) self.write_logs(""" |-Get website information: ID: {} Path: {} """.format(s_id, site_info['path'])) db_info = public.M('databases').where('id=?', (d_id,)).find() print("Get database information: \nID: {}\nDBName: {}\nUser: {}\nPassWD: {}".format( d_id, db_info["name"], db_info["username"], db_info["password"])) self.write_logs(""" |-Get database information: ID: {} DBName: {} User: {} PassWD: {} """.format(d_id, db_info["name"], db_info["username"], db_info["password"])) values['dbname'] = db_info["name"] self.wp_user = values['user_name'] self.wp_passwd = values['admin_password'] values['db_user'] = db_info["username"] values['db_pwd'] = db_info["password"] values['site_path'] = site_info['path'] values['site_name'] = site_info['name'] # 开始下载安装包 from wp_toolkit import wp_version wp_version_obj = wp_version() if 'package_version' not in get or get.package_version is None: get.package_version = wp_version_obj.latest_version()['version'] self.write_logs('|-Package version: {}'.format(get.package_version)) # 下载特定版本的安装包 self.package_zip = wp_version_obj.download_package(get.package_version) self.unzip_package(site_info['path']) # 优化PHP res = optimize_php().optimize_php(get) if not res['status']: return public.return_message(-1, 0, res) # 初始化wp self.init_wp(values) self.write_db(s_id, d_id, prefix, get.user_name, get.admin_password) # 优化mysql optimize_db().self_db_cache(get) # 设置fastcgi缓存 if int(get.enable_cache) == 1 and public.get_webserver() == 'nginx': from wp_toolkit import wpmgr, wpfastcgi_cache # 配置Nginx-fastcgi-cache wpfastcgi_cache().set_fastcgi(values['site_path'], values['site_name'], values['php_version']) self.write_logs('|-WP Plugin nginx-helper installing...') wpmgr(s_id).init_plugin_nginx_helper() self.write_logs('|-WP Plugin nginx-helper installation succeeded') # if self.__IS_PRO_MEMBER: # from wp_toolkit import wpmgr, wpfastcgi_cache # # # 配置Nginx-fastcgi-cache # wpfastcgi_cache().set_fastcgi(values['site_path'], values['site_name'], values['php_version']) # # self.write_logs('|-WP Plugin nginx-helper installing...') # wpmgr(s_id).init_plugin_nginx_helper() # self.write_logs('|-WP Plugin nginx-helper installation succeeded') # else: # # 安装nginxHelper # # slug nginx-helper # values['slug'] = "nginx-helper" # values['wp_action'] = "install-plugin" # self.install_plugin(values) # self.act_nginx_helper_active(values) # # # 安装并启用nginx-helper插件 # self.set_nginx_helper(get) # 设置登录入口保护 if int(get.get('enable_whl', 0)) == 1: from wp_toolkit import wpmgr # 安装并启用wps-hide-login插件 self.write_logs('|-WP Plugin wps-hide-login installing...') wpmgr(s_id).init_plugin_wps_hide_login(get.get('whl_page', 'login'), get.get('whl_redirect_admin', '404')) self.write_logs('|-WP Plugin wps-hide-login installation succeeded') public.ServiceReload() self.write_logs("\n\n\n|-Deployment was successful!") return public.return_message(0, 0, public.lang("Deployment was successful!")) except: self.del_site(get) from traceback import format_exc public.print_log(format_exc()) return public.return_message(-1, 0, public.lang("Deployment failed!")) # 重新关联WP网站数据库 def reset_wp_db(self, args): """ :param args db_name 数据库名 :param args site_id 网站ID """ db_name = public.xssencode2(args.db_name) try: site_id = int(args.site_id) except: return public.return_message(-1, 0, public.lang("Site ID must be numeric")) db_info = public.M("databases").where("name=?", (db_name,)).field('id').find() if 'id' not in db_info: return public.return_message(-1, 0, public.lang("This database was not found!")) pdata = { "d_id": db_info['id'] } public.M('wordpress_onekey').where("s_id=?", (site_id,)).update(pdata) return public.return_message(0, 0, public.lang("Setup successfully!")) # 获取WP Toolkit配置信息 def get_wp_configurations(self, args: public.dict_obj): # 校验参数 try: args.validate([ Param('s_id').Require().Integer('>', 0), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) from wp_toolkit import wpmgr wpmgr_obj = wpmgr(args.s_id) wp_local_version = wpmgr_obj.get_local_version() wp_latest_version = wpmgr_obj.get_latest_version(available=True).get('version', '') can_upgrade = wp_latest_version > wp_local_version wp_toolkit_config_data = wpmgr_obj.get_wp_toolkit_config_data() return public.success_v2({ 'local_version': wp_local_version, 'latest_version': wp_latest_version, 'can_upgrade': can_upgrade, 'language': wp_toolkit_config_data['locale'], 'login_url': wp_toolkit_config_data['login_url'], 'home_url': wp_toolkit_config_data['site_url'], 'cache_enabled': self.get_cache_status(args.s_id), 'admin_user': wp_toolkit_config_data['admin_info']['user_login'], 'admin_email': wp_toolkit_config_data['admin_info']['user_email'], 'whl_enabled': wp_toolkit_config_data['whl_config'].get('activated', False), 'whl_page': wp_toolkit_config_data['whl_config'].get('whl_page', 'login'), 'whl_redirect_admin': wp_toolkit_config_data['whl_config'].get('whl_redirect_admin', '404'), }) # 保存WP Toolkit配置 def save_wp_configurations(self, args: public.dict_obj): # 校验参数 try: args.validate([ Param('s_id').Require().Integer('>', 0), Param('language').String('in', list(self.get_language(args)['message'].keys())), Param('admin_password').String('>=', 8), Param('admin_email').Email(), Param('whl_enabled').Integer('in', [0, 1]), Param('whl_page').SafePath(), Param('whl_redirect_admin').SafePath(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) from wp_toolkit import wpmgr wpmgr_obj = wpmgr(args.s_id) # 更新语言 if 'language' in args: locale = wpmgr_obj.get_local_language() if args.language != locale: wpmgr_obj.update_language(args.language) # 写操作日志 wpmgr.log_opt('Change language from [{}] to [{}] successfully', (locale, args.language)) # 更新管理员密码 if 'admin_password' in args: wpmgr_obj.set_admin_password(args.admin_password) # 写操作日志 wpmgr.log_opt('Reset admin password successfully') # 更新管理员邮箱 if 'admin_email' in args: wpmgr_obj.set_admin_email(args.admin_email) # 写操作日志 wpmgr.log_opt('Reset admin email to [{}] successfully', (args.admin_email,)) # 更新WPS-Hide-Login插件配置 if 'whl_enabled' in args: # 停用插件 if int(args.whl_enabled) == 0: wpmgr_obj.deactivate_plugins('wps-hide-login/wps-hide-login.php') # 写操作日志 wpmgr.log_opt('Deactivate plugin [{}] successfully', ('WPS Hide Login',)) # 启用插件 else: wpmgr_obj.config_plugin_wps_hide_login(args.get('whl_page', 'login'), args.get('whl_redirect_admin', '404')) # 写操作日志 wpmgr.log_opt('Activate plugin [{}] successfully', ('WPS Hide Login',)) return public.success_v2('Update successfully') ##############################对外接口-END############################## class optimize_php: def optimize_php(self, get): get.version = get.php_version # 安装缓存插件 # self.install_ext(get) # 根据主机性能调整参数 return self.php_fpm(get) def install_ext(self, get): exts = ['opcache'] import files mfile = files.files() for ext in exts: # print("开始安装php扩展 [{}]...".format(ext)) if ext == 'pathinfo': import config con = config.config() get.version = get.php_version get.type = 'on' con.setPathInfo(get) else: get.name = ext get.version = get.php_version get.type = '1' mfile.InstallSoft(get) # 优化phpfpm def php_fpm(self, get): one_key_wp().write_logs("|-Start tuning PHP FPM parameters...") mem_total = int(get_mem()) if mem_total <= 1024: get.max_children = '30' get.start_servers = '5' get.min_spare_servers = '5' get.max_spare_servers = '20' elif 1024 < mem_total <= 2048: get.max_children = '50' get.start_servers = '5' get.min_spare_servers = '5' get.max_spare_servers = '30' elif 2048 < mem_total <= 4098: get.max_children = '80' get.start_servers = '10' get.min_spare_servers = '10' get.max_spare_servers = '30' elif 4098 < mem_total <= 8096: get.max_children = '120' get.start_servers = '10' get.min_spare_servers = '10' get.max_spare_servers = '30' elif 8096 < mem_total <= 16192: get.max_children = '200' get.start_servers = '15' get.min_spare_servers = '15' get.max_spare_servers = '50' elif 16192 < mem_total <= 32384: get.max_children = '300' get.start_servers = '20' get.min_spare_servers = '20' get.max_spare_servers = '50' elif 32384 < mem_total: get.max_children = '500' get.start_servers = '20' get.min_spare_servers = '20' get.max_spare_servers = '50' # get.version = self.version import config current_conf = config.config().getFpmConfig(get) get.pm = current_conf['pm'] get.listen = current_conf['unix'] one_key_wp().write_logs(""" ===================PHP FPM parameters======================= max_children: {} start_servers: {} min_spare_servers: {} max_spare_servers: {} Running mode: {} Connection: {} ===================PHP FPM parameters======================= """.format(get.max_children, get.start_servers, get.min_spare_servers, get.max_spare_servers, get.pm, get.listen)) # self.backup_conf('/www/server/php/{}/etc/php-fpm.conf'.format(self.version)) result = config.config().setFpmConfig(get) if not result['status']: one_key_wp().write_logs("|-PHP FPM Optimization failed: {}".format(result)) return public.return_msg_gettext(False, "PHP FPM Optimization failed: {}", (result,)) one_key_wp().write_logs("|-PHP FPM optimization succeeded") return public.return_msg_gettext(True, public.lang("PHP FPM optimization succeeded")) class optimize_db: def self_db_cache(self, get): one_key_wp().write_logs("|-Start optimizing Mysql") mem_total = int(get_mem()) if mem_total <= 2048: get.key_buffer_size = '128' get.tmp_table_size = '64' get.innodb_buffer_pool_size = '256' get.innodb_log_buffer_size = '16' get.sort_buffer_size = '768' get.read_buffer_size = '768' get.read_rnd_buffer_size = '512' get.join_buffer_size = '1024' get.thread_stack = '256' get.binlog_cache_size = '64' get.thread_cache_size = '64' get.table_open_cache = '128' get.max_connections = '100' get.max_heap_table_size = '64' elif 2048 < mem_total <= 4096: get.key_buffer_size = '256' get.tmp_table_size = '384' get.innodb_buffer_pool_size = '384' get.innodb_log_buffer_size = '16' get.sort_buffer_size = '768' get.read_buffer_size = '768' get.read_rnd_buffer_size = '512' get.join_buffer_size = '2048' get.thread_stack = '256' get.binlog_cache_size = '64' get.thread_cache_size = '96' get.table_open_cache = '192' get.max_connections = '200' get.max_heap_table_size = '384' elif 4096 < mem_total <= 8192: get.key_buffer_size = '384' get.tmp_table_size = '512' get.innodb_buffer_pool_size = '512' get.innodb_log_buffer_size = '16' get.sort_buffer_size = '1024' get.read_buffer_size = '1024' get.read_rnd_buffer_size = '768' get.join_buffer_size = '2048' get.thread_stack = '256' get.binlog_cache_size = '128' get.thread_cache_size = '128' get.table_open_cache = '384' get.max_connections = '300' get.max_heap_table_size = '512' elif 8192 < mem_total <= 16384: get.key_buffer_size = '512' get.tmp_table_size = '1024' get.innodb_buffer_pool_size = '1024' get.innodb_log_buffer_size = '16' get.sort_buffer_size = '2048' get.read_buffer_size = '2048' get.read_rnd_buffer_size = '1024' get.join_buffer_size = '4096' get.thread_stack = '384' get.binlog_cache_size = '192' get.thread_cache_size = '192' get.table_open_cache = '1024' get.max_connections = '400' get.max_heap_table_size = '1024' elif 16384 < mem_total <= 32768: get.key_buffer_size = '1024' get.tmp_table_size = '2048' get.innodb_buffer_pool_size = '4096' get.innodb_log_buffer_size = '16' get.sort_buffer_size = '4096' get.read_buffer_size = '4096' get.read_rnd_buffer_size = '2048' get.join_buffer_size = '8192' get.thread_stack = '512' get.binlog_cache_size = '256' get.thread_cache_size = '256' get.table_open_cache = '2048' get.max_connections = '500' get.max_heap_table_size = '2048' elif 32768 < mem_total: get.key_buffer_size = '2048' get.tmp_table_size = '4096' get.innodb_buffer_pool_size = '8192' get.innodb_log_buffer_size = '16' get.sort_buffer_size = '8192' get.read_buffer_size = '8192' get.read_rnd_buffer_size = '4096' get.join_buffer_size = '16384' get.thread_stack = '1024' get.binlog_cache_size = '512' get.thread_cache_size = '512' get.table_open_cache = '2048' get.max_connections = '1000' get.max_heap_table_size = '4096' one_key_wp().write_logs(""" =====================Mysql parameters======================= key_buffer_size: {} tmp_table_size: {} innodb_buffer_pool_size: {} innodb_log_buffer_size: {} sort_buffer_size: {} read_buffer_size: {} read_rnd_buffer_size: {} join_buffer_size: {} thread_stack: {} binlog_cache_size: {} thread_cache_size: {} table_open_cache: {} max_connections: {} max_heap_table_size: {} =====================Mysql parameters======================= """.format(get.key_buffer_size, get.tmp_table_size, get.innodb_buffer_pool_size, get.innodb_log_buffer_size, get.sort_buffer_size, get.read_buffer_size, get.read_rnd_buffer_size, get.join_buffer_size, get.thread_stack, get.binlog_cache_size, get.thread_cache_size, get.table_open_cache, get.max_connections, get.max_heap_table_size)) import database result = database.database().SetDbConf(get) if not result['status']: one_key_wp().write_logs("|-Mysql optimization failed {}".format(result)) return public.return_msg_gettext(False, "Mysql optimization failed {}", (result,)) public.ExecShell("/etc/init.d/mysqld restart") one_key_wp().write_logs("|-Mysql optimization succeeded") return public.return_msg_gettext(True, public.lang("Mysql optimization succeeded")) # Nginx缓存加速WP站点 class fast_cgi: def get_fastcgi_conf(self, version): conf = r""" set $skip_cache 0; if ($request_method = POST) { set $skip_cache 1; } if ($query_string != "") { set $skip_cache 1; } if ($request_uri ~* "/wp-admin/|/xmlrpc.php|wp-.*.php|/feed/|index.php|sitemap(_index)?.xml") { set $skip_cache 1; } if ($http_cookie ~* "comment_author|wordpress_[a-f0-9]+|wp-postpass|wordpress_no_cache|wordpress_logged_in") { set $skip_cache 1; } location ~ (/[^/]+\.php)(/|$) { if ( !-f $document_root$1 ) { return 404; } # try_files $uri =404; fastcgi_pass unix:/tmp/php-cgi-%s.sock; fastcgi_index index.php; include fastcgi.conf; add_header Strict-Transport-Security "max-age=63072000; includeSubdomains; preload"; fastcgi_cache_bypass $skip_cache; fastcgi_no_cache $skip_cache; add_header X-Cache "$upstream_cache_status From $host"; fastcgi_cache WORDPRESS; add_header Cache-Control max-age=0; add_header Nginx-Cache "$upstream_cache_status"; add_header Last-Modified $date_gmt; add_header X-Frame-Options SAMEORIGIN; add_header X-Content-Type-Options nosniff; add_header X-XSS-Protection "1; mode=block"; etag on; fastcgi_cache_valid 200 301 302 1d; } location ~ /purge(/.*) { allow 127.0.0.1; deny all; fastcgi_cache_purge WORDPRESS "$scheme$request_method$host$1"; } """ % version return conf def get_fast_cgi_status(self, sitename, php_v): if public.get_webserver() != "nginx": return False conf_path = "/www/server/panel/vhost/nginx/{}.conf".format(sitename) if not os.path.exists(conf_path): return False content = public.readFile(conf_path) fastcgi_conf = "include enable-php-{}-wpfastcgi.conf;".format(php_v) # return conf_path,fastcgi_conf if fastcgi_conf in content: return True return False def set_nginx_conf(self): if not os.path.exists("/dev/shm/nginx-cache/wp"): os.makedirs("/dev/shm/nginx-cache/wp") one_key_wp().set_permission("/dev/shm/nginx-cache") conf = """ #AAPANEL_FASTCGI_CONF_BEGIN fastcgi_cache_key "$scheme$request_method$host$request_uri"; fastcgi_cache_path /dev/shm/nginx-cache/wp levels=1:2 keys_zone=WORDPRESS:100m inactive=60m max_size=1g; fastcgi_cache_use_stale error timeout invalid_header http_500; fastcgi_ignore_headers Cache-Control Expires Set-Cookie; #AAPANEL_FASTCGI_CONF_END """ conf_path = "/www/server/nginx/conf/nginx.conf" public.back_file(conf_path) content = public.readFile(conf_path) if not content: return if "#AAPANEL_FASTCGI_CONF_BEGIN" in content: one_key_wp().write_logs("|-Nginx FastCgi cache configuration already exists") print("Nginx FastCgi cache configuration already exists") return public.return_msg_gettext(True, public.lang("Nginx FastCgi cache configuration already exists")) rep = "http\\s*\n\\s*{" content = re.sub(rep, "http\n\t{" + conf, content) public.writeFile(conf_path, content) # 如果配置出错恢复 conf_pass = public.checkWebConfig() if conf_pass != True: public.restore_file(conf_path) one_key_wp().write_logs("|-Nginx FastCgi configuration error! {}".format(conf_pass)) print("Nginx FastCgi configuration error! {}".format(conf_pass)) return public.return_msg_gettext(False, public.lang("Nginx FastCgi configuration error!")) one_key_wp().write_logs("|-Nginx FastCgi cache configuration complete...") print("Nginx FastCgi cache configuration complete") return public.return_msg_gettext(True, public.lang("Nginx FastCgi cache configuration complete")) def set_nginx_init(self): # if not os.path.exists("/dev/shm/nginx-cache/wp"): # os.makedirs("/dev/shm/nginx-cache/wp") # one_key_wp().set_permission("/dev/shm/nginx-cache") conf2 = """ #AAPANEL_FASTCGI_CONF_BEGIN mkdir -p /dev/shm/nginx-cache/wp #AAPANEL_FASTCGI_CONF_END """ init_path = "/etc/init.d/nginx" public.back_file(init_path) content_init = public.readFile(init_path) if not content_init: return if "#AAPANEL_FASTCGI_CONF_BEGIN" in content_init: one_key_wp().write_logs("|-Nginx init FastCgi cache configuration already exists") print("Nginx init FastCgi cache configuration already exists") return public.return_msg_gettext(True, public.lang("Nginx init FastCgi cache configuration already exists")) # content_init = re.sub(r"\$NGINX_BIN -c \$CONFIGFILE", + conf2, content_init) rep2 = r"\$NGINX_BIN -c \$CONFIGFILE" content_init = re.sub(rep2, conf2 + " $NGINX_BIN -c $CONFIGFILE", content_init) public.writeFile(init_path, content_init) # 如果配置出错恢复 public.ExecShell("/etc/init.d/nginx restart") conf_pass = public.is_nginx_process_exists() if conf_pass == False: public.restore_file(init_path) one_key_wp().write_logs("|-Nginx init FastCgi configuration error! {}".format(conf_pass)) print("Nginx init FastCgi configuration error! {}".format(conf_pass)) return public.return_msg_gettext(False, public.lang("Nginx init FastCgi configuration error!")) one_key_wp().write_logs("|-Nginx init FastCgi cache configuration complete...") print("Nginx init FastCgi cache configuration complete") return public.return_msg_gettext(True, public.lang("Nginx init FastCgi cache configuration complete")) def set_fastcgi_php_conf(self, version): conf_path = "/www/server/nginx/conf/enable-php-{}-wpfastcgi.conf".format(version) if os.path.exists(conf_path): one_key_wp().write_logs("|-Nginx FastCgi PHP configuration already exists...") print("Nginx FastCgi PHP configuration already exists") return True public.writeFile(conf_path, self.get_fastcgi_conf(version)) def set_website_conf(self, version, sitename, act=None): conf_path = "/www/server/panel/vhost/nginx/{}.conf".format(sitename) public.back_file(conf_path) conf = public.readFile(conf_path) if not conf: print("Website configuration file does not exist {}".format(conf_path)) one_key_wp().write_logs("|-Website configuration file does not exist: {}".format(conf_path)) return public.return_message(-1, 0, False) if act == 'disable': fastcgi_conf = "include enable-php-{}-wpfastcgi.conf;".format(version) if fastcgi_conf not in conf: print("FastCgi configuration does not exist in website configuration") one_key_wp().write_logs("|-FastCgi configuration does not exist in website configuration, skip") return public.return_message(-1, 0, public.lang("FastCgi configuration does not exist in website configuration")) rep = r"include\s+enable-php-{}-wpfastcgi.conf;".format(version) conf = re.sub(rep, "include enable-php-{}.conf;".format(version), conf) else: fastcgi_conf = "include enable-php-{}-wpfastcgi.conf;".format(version) if fastcgi_conf in conf: one_key_wp().write_logs( "|-The FastCgi configuration already exists in the website configuration, skip it") return public.return_message(0, 0, public.lang("The FastCgi configuration already exists in the website configuration")) rep = r"include\s+enable-php-{}.conf;".format(version) one_key_wp().write_logs("|-Current configuration: {}".format(conf)) one_key_wp().write_logs("|-Regular expression: {}".format(rep)) conf = re.sub(rep, fastcgi_conf, conf) one_key_wp().write_logs("|-Modified configuration: {}".format(conf)) public.writeFile(conf_path, conf) conf_pass = public.checkWebConfig() if conf_pass != True: public.restore_file(conf_path) print("Website FastCgi configuration error {}".format(conf_pass)) one_key_wp().write_logs("|-Website FastCgi configuration error: {}", (conf_pass,)) return public.return_message(-1, 0, public.lang("Website FastCgi configuration error!")) print("Website FastCgi configuration complete") one_key_wp().write_logs("|-Website FastCgi configuration complete...") return public.return_message(0, 0, public.lang("Website FastCgi configuration complete")) def set_fastcgi(self, values): """ get.version get.name """ # 设置nginx启动文件 self.set_nginx_init() # 设置nginx全局配置 self.set_nginx_conf() # 设置fastcgi location self.set_fastcgi_php_conf(values['php_version']) # 设置网站配置文件 self.set_website_conf(values['php_version'], values['site_name']) # 设置wp的变量用于nginxhelper插件清理缓存 self.set_wp_nginx_helper(values['site_path']) # 设置userini允许访问 /dev/shm/nginx-cache/wp 目录 self.set_userini(values['site_path']) def set_wp_nginx_helper(self, site_path): cache_conf = """ #AAPANEL_FASTCGICACHE_BEGIN define('RT_WP_NGINX_HELPER_CACHE_PATH','/dev/shm/nginx-cache/wp'); #AAPANEL_FASTCGICACHE_END """ conf_file = "{}/wp-config.php".format(site_path) conf = public.readFile(conf_file) if not conf: print("Wordpress configuration file does not exist: {}".format(conf_file)) one_key_wp().write_logs("|-Wordpress configuration file does not exist: {}".format(conf_file)) return public.return_msg_gettext(False, "Wordpress configuration file does not exist: {}", (conf_file,)) if re.search(r'''define\(\s*'RT_WP_NGINX_HELPER_CACHE_PATH'\s*,''', conf): # if "RT_WP_NGINX_HELPER_CACHE_PATH" in conf: one_key_wp().write_logs("|-Cache cleaning configuration already exists, skip") print("Cache cleaning configuration already exists") return conf = conf.replace("<?php", "<?php" + cache_conf) public.writeFile(conf_file, conf) def set_userini(self, site_path): conf_file = "{}/.user.ini".format(site_path) conf = public.readFile(conf_file) if not conf: print("Anti-cross-site configuration file does not exist: {}".format(conf_file)) one_key_wp().write_logs("|-Anti-cross-site configuration file does not exist: {}".format(conf_file)) return public.return_msg_gettext(False, "Anti-cross-site configuration file does not exist: {}", (conf_file,)) if "/dev/shm/nginx-cache/wp" in conf: print("Anti-cross-site configuration is successful") one_key_wp().write_logs("|-Anti-cross-site configuration is successful...") return public.ExecShell('chattr -i {}'.format(conf_file)) conf += ":/dev/shm/nginx-cache/wp" public.writeFile(conf_file, conf) public.ExecShell('chattr +i {}'.format(conf_file)) one_key_wp().write_logs("|-Anti-cross-site configuration is successful...") # one_key_wp请求帮助函数 def retry(func, max_retries=10): retry_count = 0 while retry_count < max_retries: try: return func() # 调用传入的函数并返回结果 except Exception as e: retry_count += 1 if retry_count < max_retries: public.print_log("Access failed {} times:{}".format(retry_count, e)) time.sleep(2) # 添加适当的延迟 else: public.print_log("The maximum number of retries has been reached.") return False return None
Close