Linux ip-172-26-2-223 5.4.0-1018-aws #18-Ubuntu SMP Wed Jun 24 01:15:00 UTC 2020 x86_64
Apache
: 172.26.2.223 | : 18.118.27.44
Cant Read [ /etc/named.conf ]
8.1.13
www
www.github.com/MadExploits
Terminal
AUTO ROOT
Adminer
Backdoor Destroyer
Linux Exploit
Lock Shell
Lock File
Create User
CREATE RDP
PHP Mailer
BACKCONNECT
UNLOCK SHELL
HASH IDENTIFIER
CPANEL RESET
CREATE WP USER
BLACK DEFEND!
README
+ Create Folder
+ Create File
/
www /
server /
panel /
class_v2 /
[ HOME SHELL ]
Name
Size
Permission
Action
__pycache__
[ DIR ]
drwxr-xr-x
btdockerModelV2
[ DIR ]
drwxr-xr-x
crontabModelV2
[ DIR ]
drwxr-xr-x
databaseModelV2
[ DIR ]
drwxr-xr-x
firewallModelV2
[ DIR ]
drwxr-xr-x
logsModelV2
[ DIR ]
drwxr-xr-x
monitorModelV2
[ DIR ]
drwxr-xr-x
panelModelV2
[ DIR ]
drwxr-xr-x
power_mta
[ DIR ]
drwxr-xr-x
projectModelV2
[ DIR ]
drwxr-xr-x
safeModelV2
[ DIR ]
drwxr-xr-x
safe_warning_v2
[ DIR ]
drwxr-xr-x
ssl_domainModelV2
[ DIR ]
drwxr-xr-x
virtualModelV2
[ DIR ]
drwxr-xr-x
wp_toolkit
[ DIR ]
drwxr-xr-x
acme_v3.py
133.98
KB
-rw-r--r--
ajax_v2.py
95.41
KB
-rw-r--r--
apache_v2.py
17.28
KB
-rw-r--r--
backup_bak_v2.py
24.86
KB
-rw-r--r--
breaking_through.py
47.94
KB
-rw-r--r--
cloud_stora_upload_v2.py
19.27
KB
-rw-r--r--
common_v2.py
12.45
KB
-rw-r--r--
config_v2.py
165.36
KB
-rw-r--r--
crontab_ssl_v2.py
1.85
KB
-rw-r--r--
crontab_v2.py
111.93
KB
-rw-r--r--
data_v2.py
36.54
KB
-rw-r--r--
database_v2.py
125.54
KB
-rw-r--r--
datatool_v2.py
5.83
KB
-rw-r--r--
db_mysql_v2.py
11.41
KB
-rw-r--r--
db_v2.py
11.04
KB
-rw-r--r--
dk_db.py
18.34
KB
-rw-r--r--
download_file_v2.py
2.54
KB
-rw-r--r--
fastcgi_client_two_v2.py
12.26
KB
-rw-r--r--
fastcgi_client_v2.py
6.89
KB
-rw-r--r--
file_execute_deny_v2.py
10.34
KB
-rw-r--r--
files_v2.py
149.12
KB
-rw-r--r--
firewall_new_v2.py
22.4
KB
-rw-r--r--
firewalld_v2.py
11.09
KB
-rw-r--r--
firewalls_v2.py
17.44
KB
-rw-r--r--
flask_compress_v2.py
5.12
KB
-rw-r--r--
flask_sockets_v2.py
3.75
KB
-rw-r--r--
ftp_log_v2.py
21.72
KB
-rw-r--r--
ftp_v2.py
16.17
KB
-rw-r--r--
http_requests_v2.py
24.25
KB
-rw-r--r--
jobs_v2.py
36.98
KB
-rw-r--r--
letsencrypt_v2.py
12.85
KB
-rw-r--r--
log_analysis_v2.py
12.23
KB
-rw-r--r--
monitor_v2.py
13.53
KB
-rw-r--r--
one_key_wp_v2.py
75.79
KB
-rw-r--r--
panelControllerV2.py
4.97
KB
-rw-r--r--
panelDatabaseControllerV2.py
5.76
KB
-rw-r--r--
panelDockerControllerV2.py
5.86
KB
-rw-r--r--
panelFireControllerV2.py
4.65
KB
-rw-r--r--
panelModControllerV2.py
5.13
KB
-rw-r--r--
panelProjectControllerV2.py
6.07
KB
-rw-r--r--
panelSafeControllerV2.py
4.65
KB
-rw-r--r--
panel_api_v2.py
10.43
KB
-rw-r--r--
panel_auth_v2.py
33.21
KB
-rw-r--r--
panel_backup_v2.py
102.56
KB
-rw-r--r--
panel_dns_api_v2.py
22.2
KB
-rw-r--r--
panel_http_proxy_v2.py
11.33
KB
-rw-r--r--
panel_lets_v2.py
43.61
KB
-rw-r--r--
panel_mssql_v2.py
4.48
KB
-rw-r--r--
panel_mysql_v2.py
7.55
KB
-rw-r--r--
panel_php_v2.py
24.78
KB
-rw-r--r--
panel_ping_v2.py
2.88
KB
-rw-r--r--
panel_plugin_v2.py
125.11
KB
-rw-r--r--
panel_push_v2.py
23.78
KB
-rw-r--r--
panel_redirect_v2.py
34.02
KB
-rw-r--r--
panel_restore_v2.py
11.04
KB
-rw-r--r--
panel_site_v2.py
343.73
KB
-rw-r--r--
panel_ssl_v2.py
75.34
KB
-rw-r--r--
panel_task_v2.py
28.7
KB
-rw-r--r--
panel_video_V2.py
1.88
KB
-rw-r--r--
panel_warning_v2.py
68.71
KB
-rw-r--r--
password_v2.py
8.09
KB
-rw-r--r--
plugin_auth_v2.py
3.14
KB
-rw-r--r--
plugin_deployment_v2.py
28.85
KB
-rw-r--r--
san_baseline_v2.py
51.13
KB
-rw-r--r--
site_dir_auth_v2.py
17.67
KB
-rw-r--r--
ssh_security_v2.py
45.66
KB
-rw-r--r--
ssh_terminal_v2.py
58.86
KB
-rw-r--r--
system_v2.py
44.77
KB
-rw-r--r--
userRegister_v2.py
6.74
KB
-rw-r--r--
user_login_v2.py
21.2
KB
-rw-r--r--
vilidate_v2.py
4.94
KB
-rw-r--r--
wxapp_v2.py
5.62
KB
-rw-r--r--
Delete
Unzip
Zip
${this.title}
Close
Code Editor : panel_ssl_v2.py
# coding: utf-8 # ------------------------------------------------------------------- # aaPanel # ------------------------------------------------------------------- # Copyright (c) 2015-2016 aaPanel(www.aapanel.com) All rights reserved. # ------------------------------------------------------------------- # Author: hwliang <hwl@aapanel.com> # ------------------------------------------------------------------- # ------------------------------ # SSL接口 # ------------------------------ from panel_auth_v2 import panelAuth as panelAuth import public, os, sys, binascii, urllib, json, time, datetime, re from ssl_manage import SSLManger # 新的ssl管理 from Crypto import Random from Crypto.PublicKey import RSA from Crypto.Cipher import PKCS1_v1_5 as PKCS1_cipher import base64 from public.validate import Param try: from BTPanel import cache, session except: pass class panelSSL: # __APIURL = public.GetConfigValue('home') + '/api/Auth' # __APIURL2 = public.GetConfigValue('home') + '/api/Cert' # __BINDURL = 'https://wafapi.aapanel.com/Auth/GetAuthToken' # 获取token 获取官网token __BINDURL = '{}/api/user'.format(public.OfficialApiBase()) # 获取token 获取官网token # __BINDURL = 'http://dev.aapanel.com/api/user' # 获取token 获取官网token __CODEURL = 'https://wafapi.aapanel.com/Auth/GetBindCode' # 获取绑定验证码 __UPATH = 'data/userInfo.json' # __APIURL = 'http://dev.aapanel.com/api' __APIURL = '{}/api'.format(public.OfficialApiBase()) __PUBKEY = 'data/public.key' # 证书购买 # __APIURL_CERT = '{}/api/cert'.format(public.OfficialApiBase()) __userInfo = None # 用户信息 从文件中读取的 __PDATA = None _check_url = None # 构造方法 def __init__(self): pdata = {} data = {} # 存放调用接口的参数 # 记录了用户信息 if os.path.exists(self.__UPATH): my_tmp = public.readFile(self.__UPATH) if my_tmp: try: self.__userInfo = json.loads(my_tmp) except: self.__userInfo = {} else: self.__userInfo = {} # public.print_log('初始化 !!!!!!!!!!!!!!!!!!!用户信息: {}'.format(self.__userInfo)) try: if self.__userInfo: # 记录里没有这两个key pdata['access_key'] = self.__userInfo['access_key'] data['secret_key'] = self.__userInfo['secret_key'] # pdata['access_key'] = 'test' # data['secret_key'] = '123456' except: # self.__userInfo = {} pdata['access_key'] = 'test' data['secret_key'] = '123456' else: pdata['access_key'] = 'test' data['secret_key'] = '123456' pdata['data'] = data self.__PDATA = pdata # public.print_log('初始化------->最后 !!!!!!!!!!!!!!!!!!!用户信息: {}'.format(self.__userInfo)) def en_code_rsa(self, data): pk = public.readFile(self.__PUBKEY) if not pk: return False pub_k = RSA.importKey(pk) cipher = PKCS1_cipher.new(pub_k) rsa_text = base64.b64encode(cipher.encrypt(bytes(data.encode("utf8")))) return str(rsa_text, encoding='utf-8') # 获取Token 最新 # def GetToken(self, get): # rtmp = "" # data = {} # data['username'] = get.username # data['password'] = public.md5(get.password) # data['serverid'] = panelAuth().get_serverid() # pdata = {} # pdata['data'] = self.De_Code(data) # try: # rtmp = public.httpPost(self.__BINDURL, pdata) # result = json.loads(rtmp) # result['data'] = self.En_Code(result['data']) # if result['data']: # result['data']['serverid'] = data['serverid'] # public.writeFile(self.__UPATH, json.dumps(result['data'])) # public.flush_plugin_list() # del (result['data']) # session['focre_cloud'] = True # return result # except Exception as ex: # # bind = 'data/bind.pl' # # if os.path.exists(bind): os.remove(bind) # # return public.returnMsg(False,'连接服务器失败!<br>' + str(ex)) # raise public.error_conn_cloud(str(ex)) # # # 删除Token 最新 # def DelToken(self, get): # if os.path.exists(self.__UPATH): os.remove(self.__UPATH) # session['focre_cloud'] = True # return public.returnMsg(True, public.lang("SSL_BTUSER_UN")) # 获取Token todo 设置绑定账号在用 def GetToken(self, get): rtmp = "" data = {} data['identification'] = self.en_code_rsa(get.username) # data['username'] = self.en_code_rsa(get.username) data['password'] = self.en_code_rsa(get.password) data['from_panel'] = self.en_code_rsa('1') # 1 代表从面板登录 data['environment_info'] = json.dumps(public.fetch_env_info()) try: # https://www.aapanel.com isPro = False if hasattr(get, 'isPro') and get.isPro: isPro = True sUrl = '{}/pro/api/user/login'.format(public.OfficialApiBase()) else: sUrl = '{}/api/user/login'.format(public.OfficialApiBase()) rtmp = public.httpPost(sUrl, data) # public.print_log("写入用户信息 @@@@222 {}".format(self.__APIURL + '/user/login')) result = json.loads(rtmp) # public.print_log("登录写入用户信息 @@@@ {}".format(result)) if isPro: if result.get('err_no', None) == 2002: # return public.return_message(-1, 0, public.lang("The email has not been validated")) # return public.return_message(-1, 0, {"validated": False}) return public.return_message(-1, 0, {"result": "The email has not been validated","validated": False}) if result['success']: bind = 'data/bind.pl' if os.path.exists(bind): os.remove(bind) userinfo = result['res']['user_data'] userinfo['token'] = result['res']['access_token'] # 用户信息写入文件 public.writeFile(self.__UPATH, json.dumps(userinfo)) # if bool: # # public.print_log("写入用户信息 成功 {}".format(userinfo)) # else: # public.print_log("写入用户信息 失败") session['focre_cloud'] = True return public.return_message(0, 0, public.lang("Bind successfully")) # return result else: return public.return_message(-1, 0, result.get('res', public.lang("Invalid username or email or password! please check and try again!"))) except Exception as ex: # public.print_log(public.get_error_info()) # public.print_log("获取token 失败 {}".format(ex)) bind = 'data/bind.pl' if os.path.exists(bind): os.remove(bind) return public.return_message(-1,0, '%s<br>%s' % ( public.lang("Failed to connect server!"), str(rtmp))) # 删除Token todo def DelToken(self, get): uinfo = public.readFile(self.__UPATH) try: uinfo = json.loads(uinfo) public.writeFile(self.__UPATH, json.dumps({'server_id': uinfo['server_id']})) except: public.ExecShell("rm -f " + self.__UPATH) session['focre_cloud'] = True return public.return_message(0, 0, public.lang("Unbound!")) # 获取用户信息 todo # def GetUserInfo(self, get): # result = {} # # # public.print_log("@@@@@@@@@@获取用户信息 开始----- {}".format(self.__userInfo)) # # if self.__userInfo: # userTmp = {} # userTmp['username'] = self.__userInfo['username'][0:3] + '****' + self.__userInfo['username'][-4:] # result['status'] = True # result['msg'] = public.lang("SSL_GET_SUCCESS") # result['data'] = userTmp # else: # userTmp = {} # userTmp['username'] = public.lang("SSL_NOT_BTUSER") # result['status'] = False # result['msg'] = public.lang("SSL_NOT_BTUSER") # result['data'] = userTmp # return result def GetUserInfo(self, get): # public.print_log("获取用户信息222") return_status = -1 result = {} try: if self.__userInfo: userTmp = {} userTmp['username'] = self.__userInfo['email'][0:3] + '****' + self.__userInfo['email'][-4:] return_status= 0 result['msg'] = public.lang("Got successfully!") result['data'] = userTmp else: userTmp = {} userTmp['username'] = public.lang("Please bind your account!") result['msg'] = public.lang("Please bind your account!") result['data'] = userTmp except: userTmp = {} userTmp['username'] = public.lang("Please bind your account!") result['msg'] = public.lang("Please bind your account!") result['data'] = userTmp return public.return_message(return_status,0,result) # 获取产品列表 todo # def get_product_list(self, get): # p_type = 'dv' # if 'p_type' in get: p_type = get.p_type # result = self.request('get_product_list?p_type={}'.format(p_type)) # return result # # 获取产品列表2 todo # def get_product_list_v2(self, get): # p_type = 'dv' # if 'p_type' in get: p_type = get.p_type # # result = self.request('get_product_list_v2?p_type={}'.format(p_type)) # return result # 获取产品列表2 todo 产品列表 def get_product_list_v2(self, get): return_status = 0 result = self.request('cert/product/list') # if "success" in result: # print(f"键 '{key_to_check}' 存在于字典中。") # else: # print(f"键 '{key_to_check}' 不存在于字典中。") return public.return_message(0,0,result) # 获取商业证书订单列表 todo 用户订单列表 def get_order_list(self, get): result = self.request('cert/user/list') # 获取当前登录用户的SSL证书列表 return public.return_message(0,0,result) # 下载证书 todo def download_cert(self, get): self.__PDATA['uc_id'] = get.uc_id result = self.request('cert/user/download') return result # 获指定商业证书订单 def get_order_find(self, get): self.__PDATA['uc_id'] = get.uc_id result = self.request('cert/user/info') return result # 获取证书管理员信息 todo def get_cert_admin(self, get): result = self.request('cert/user/administrator') return result # 完善资料CA(先支付接口) todo 可能是支付后的完善信息接口 def apply_order_ca(self, args): pdata = json.loads(args.pdata) result = self.check_ssl_caa(pdata['domains']) if result: return result self.__PDATA['data'] = pdata result = self.request('cert/user/update_profile') return result # 部署指定商业证书 todo 部署证书 def set_cert(self, get): siteName = get.siteName certInfoall = self.get_order_find(get) if certInfoall["success"] is False: return public.return_message(-1, 0, certInfoall["res"]) certInfo = certInfoall["res"] path = '/www/server/panel/vhost/cert/' + siteName if not os.path.exists(path): public.ExecShell('mkdir -p ' + path) csrpath = path + "/fullchain.pem" keypath = path + "/privkey.pem" pidpath = path + "/certOrderId" other_file = path + '/partnerOrderId' if os.path.exists(other_file): os.remove(other_file) other_file = path + '/README' if os.path.exists(other_file): os.remove(other_file) public.writeFile(keypath, certInfo['private_key']) public.writeFile(csrpath, certInfo['certificate'] + "\n" + certInfo['ca_certificate']) # 改记录 uc_id public.writeFile(pidpath, get.uc_id) import panel_site_v2 as panelSite panelSite.panelSite().SetSSLConf(get) public.serviceReload() return public.return_message(0, 0, public.lang("Setup successfully!")) # # 生成商业证书支付订单 暂无 # def apply_order_pay(self, args): # self.__PDATA['data'] = json.loads(args.pdata) # result = self.check_ssl_caa(self.__PDATA['data']['domains']) # if result: return result # result = self.request('apply_cert_order') # return result # 检查CAA记录是否正确 def check_ssl_caa(self, domains, clist=['sectigo.com', 'digicert.com', 'comodoca.com']): ''' @name 检查CAA记录是否正确 @param domains 域名列表 @param clist 正确的记录值关键词 @return bool ''' try: data = {} for domain in domains: root, zone = public.get_root_domain(domain) for d in [domain, root, '_acme-challenge.{}'.format(root), '_acme-challenge.{}'.format(domain)]: ret = public.query_dns(d, 'CAA') if not ret: continue slist = [] for val in ret: if val['value'] in clist: return False slist.append(val) if len(slist) > 0: data[d] = slist if data: result = {} result['status'] = False result[ 'msg'] = 'error: There is a CAA record in the DNS resolution of the domain name. Please delete it and apply again ' result['data'] = json.dumps(data) result['caa_list'] = data return result except: pass return False # 提交商业证书订单到CA # def apply_order(self, args): # self.__PDATA['data']['oid'] = args.oid # result = self.request('apply_cert') # if result['status'] == True: # self.__PDATA['data'] = {} # result['verify_info'] = self.get_verify_info(args) # return result # 获取证书域名验证结果 todo 暂未使用 # 用到: 处理验证信息 set_verify_info 完善资料 apply_order_ca 续签证书 renew_cert_order def get_verify_info(self, args): self.__PDATA['uc_id'] = args.uc_id verify_info = self.request('cert/user/validate_domains') if verify_info['success']: return "success" return "error" # is_file_verify = 'fileName' in verify_info # verify_info['paths'] = [] # verify_info['hosts'] = [] # for domain in verify_info['domains']: # if is_file_verify: # siteRunPath = self.get_domain_run_path(domain) # if not siteRunPath: # # if domain[:4] == 'www.': domain = domain[:4] # verify_info['paths'].append(verify_info['path'].replace('example.com', domain)) # continue # verify_path = siteRunPath + '/.well-known/pki-validation' # if not os.path.exists(verify_path): # os.makedirs(verify_path) # verify_file = verify_path + '/' + verify_info['fileName'] # if os.path.exists(verify_file): continue # public.writeFile(verify_file, verify_info['content']) # else: # original_domain = domain # # if domain[:4] == 'www.': domain = domain[:4] # verify_info['hosts'].append(verify_info['host'] + '.' + domain) # if 'auth_to' in args: # root, zone = public.get_root_domain(domain) # res = self.create_dns_record(args['auth_to'], verify_info['host'] + '.' + root, # verify_info['value'], original_domain) # print(res) # return verify_info # 处理验证信息 todo 如果传参 要传uc_id def set_verify_info(self, args): # self.__PDATA['uc_id'] = args.uc_id # 新增 verify_info = self.get_verify_info(args) is_file_verify = 'fileName' in verify_info verify_info['paths'] = [] verify_info['hosts'] = [] for domain in verify_info['domains']: if domain[:2] == '*.': domain = domain[2:] if is_file_verify: siteRunPath = self.get_domain_run_path(domain) if not siteRunPath: # if domain[:4] == 'www.': domain = domain[4:] verify_info['paths'].append(verify_info['path'].replace('example.com', domain)) continue verify_path = siteRunPath + '/.well-known/pki-validation' if not os.path.exists(verify_path): os.makedirs(verify_path) verify_file = verify_path + '/' + verify_info['fileName'] if os.path.exists(verify_file): continue public.writeFile(verify_file, verify_info['content']) else: original_domain = domain # if domain[:4] == 'www.': domain = domain[4:] verify_info['hosts'].append(verify_info['host'] + '.' + domain) if 'auth_to' in args: root, zone = public.get_root_domain(domain) self.create_dns_record(args['auth_to'], verify_info['host'] + '.' + root, verify_info['value'], original_domain) return verify_info # 获取指定域名的PATH def get_domain_run_path(self, domain): pid = public.M('domain').where('name=?', (domain,)).getField('pid') if not pid: return False return self.get_site_run_path(pid) # 获取网站运行目录 def get_site_run_path(self, pid): ''' @name 获取网站运行目录 @author hwliang<2020-08-05> @param pid(int) 网站标识 @return string ''' siteInfo = public.M('sites').where('id=?', (pid,)).find() siteName = siteInfo['name'] sitePath = siteInfo['path'] webserver_type = public.get_webserver() setupPath = '/www/server' path = None if webserver_type == 'nginx': filename = setupPath + '/panel/vhost/nginx/' + siteName + '.conf' if os.path.exists(filename): conf = public.readFile(filename) rep = r'\s*root\s+(.+);' tmp1 = re.search(rep, conf) if tmp1: path = tmp1.groups()[0] elif webserver_type == 'apache': filename = setupPath + '/panel/vhost/apache/' + siteName + '.conf' if os.path.exists(filename): conf = public.readFile(filename) rep = r'\s*DocumentRoot\s*"(.+)"\s*\n' tmp1 = re.search(rep, conf) if tmp1: path = tmp1.groups()[0] else: filename = setupPath + '/panel/vhost/openlitespeed/' + siteName + '.conf' if os.path.exists(filename): conf = public.readFile(filename) rep = r"vhRoot\s*(.*)" path = re.search(rep, conf) if not path: path = None else: path = path.groups()[0] if not path: path = sitePath return path # 验证URL是否匹配 def check_url_txt(self, args, timeout=5): url = args.url content = args.content import http_requests res = http_requests.get(url, s_type='curl', timeout=timeout) result = res.text if not result: return 0 if result.find('11001') != -1 or result.find('curl: (6)') != -1: return -1 if result.find('curl: (7)') != -1 or res.status_code in [403, 401]: return -5 if result.find('Not Found') != -1 or result.find('not found') != -1 or res.status_code in [404]: return -2 if result.find('timed out') != -1: return -3 if result.find('301') != -1 or result.find('302') != -1 or result.find( 'Redirecting...') != -1 or res.status_code in [301, 302]: return -4 if result == content: return 1 return 0 # 更换验证方式 # todo? ['data'] def again_verify(self, args): self.__PDATA['uc_id'] = args.uc_id self.__PDATA['dcv_method'] = args.dcv_method result = self.request('cert/user/update_dcv') return result # 获取商业证书验证结果 def get_verify_result(self, args): self.__PDATA['uc_id'] = args.uc_id res = self.request('cert/user/validate') if res['success'] is False: return res verify_info = res['res'] if verify_info['status'] in ['COMPLETE', False]: return verify_info is_file_verify = 'CNAME_CSR_HASH' != verify_info['data']['dcvList'][0]['dcvMethod'] verify_info['paths'] = [] verify_info['hosts'] = [] if verify_info['data']['application']['status'] == 'ongoing': return public.return_message(-1, 0, public.lang("In verification, please contact aaPanel if the audit still fails after 24 hours")) for dinfo in verify_info['data']['dcvList']: is_https = dinfo['dcvMethod'] == 'HTTPS_CSR_HASH' if is_https: is_https = 's' else: is_https = '' domain = dinfo['domainName'] if domain[:2] == '*.': domain = domain[2:] dinfo['domainName'] = domain if is_file_verify: # 判断是否是Springboot 项目 if public.M('sites').where('id=?', ( public.M('domain').where('name=?', (dinfo['domainName'])).getField('pid'),)).getField( 'project_type') == 'Java' or public.M('sites').where('id=?', ( public.M('domain').where('name=?', (dinfo['domainName'])).getField('pid'),)).getField( 'project_type') == 'Go' or public.M('sites').where('id=?', ( public.M('domain').where('name=?', (dinfo['domainName'])).getField('pid'),)).getField( 'project_type') == 'Other': siteRunPath = '/www/wwwroot/java_node_ssl' else: siteRunPath = self.get_domain_run_path(domain) # if domain[:4] == 'www.': domain = domain[4:] status = 0 url = 'http' + is_https + '://' + domain + '/.well-known/pki-validation/' + verify_info['data'][ 'DCVfileName'] get = public.dict_obj() get.url = url get.content = verify_info['data']['DCVfileContent'] status = self.check_url_txt(get) verify_info['paths'].append({'url': url, 'status': status}) if not siteRunPath: continue verify_path = siteRunPath + '/.well-known/pki-validation' if not os.path.exists(verify_path): os.makedirs(verify_path) verify_file = verify_path + '/' + verify_info['data']['DCVfileName'] if os.path.exists(verify_file): continue public.writeFile(verify_file, verify_info['data']['DCVfileContent']) else: # if domain[:4] == 'www.': domain = domain[4:] domain, subb = public.get_root_domain(domain) dinfo['domainName'] = domain verify_info['hosts'].append(verify_info['data']['DCVdnsHost'] + '.' + domain) return verify_info # 取消订单 暂无 def cancel_cert_order(self, args): self.__PDATA['data']['oid'] = args.oid result = self.request('cancel_cert_order') return result # 单独购买人工安装服务 def apply_cert_install_pay(self, args): ''' @name 单独购买人工安装服务 @param args<dict_obj>{ 'uc_id'<int> 订单ID } ''' self.__PDATA['uc_id'] = args.uc_id result = self.request('cert/order/deployment_assistance') return result # 生成商业证书支付订单 todo 生成支付订单 下单支付 def apply_cert_order_pay(self, args): pdata = json.loads(args.pdata) self.__PDATA['data'] = pdata result = self.request('cert/order/create') return public.return_message(0,0,result) # 模拟支付 # def pay_test(self, args): # out_trade_no = args.out_trade_no # # /api/common/stripe/{out_trade_no} # # result = self.request_test('order/pay') # result = public.return_msg_gettext(False, '测试用 模拟支付!') # url = "https://dev.aapanel.com/api/common/stripe/" + out_trade_no # response_data = public.httpGet(url) # # # public.print_log("******************** url: {}".format(url)) # # try: # result = json.loads(response_data) # except: # pass # return result # 申请证书 ??? def ApplyDVSSL(self, get): """ 申请证书 """ if not 'orgName' in get: return public.return_message(-1, 0, public.lang("missing parameter: orgName")) if not 'orgPhone' in get: return public.return_message(-1, 0, public.lang("missing parameter: orgPhone")) if not 'orgPostalCode' in get: return public.return_message(-1, 0, public.lang("missing parameter: orgPostalCode")) if not 'orgRegion' in get: return public.return_message(-1, 0, public.lang("missing parameter: orgRegion")) if not 'orgCity' in get: return public.return_message(-1, 0, public.lang("missing parameter: orgCity")) if not 'orgAddress' in get: return public.return_message(-1, 0, public.lang("missing parameter: orgAddress")) if not 'orgDivision' in get: return public.return_message(-1, 0, public.lang("missing parameter: orgDivision")) get.id = public.M('domain').where('name=?', (get.domain,)).getField('pid') if hasattr(get, 'siteName'): get.path = public.M('sites').where('id=?', (get.id,)).getField('path') else: get.siteName = public.M('sites').where('id=?', (get.id,)).getField('name') # 当申请二级域名为www时,检测主域名是否绑定到同一网站 if get.domain[:4] == 'www.': if not public.M('domain').where('name=? AND pid=?', (get.domain[4:], get.id)).count(): return public.return_message(-1, 0, "Request for [%s] certificate requires verification [%s] Please bind and resolve [%s] to the site!" % ( get.domain, get.domain[4:], get.domain[4:])) # 判断是否是Java项目 if public.M('sites').where('id=?', (get.id,)).getField('project_type') == 'Java' or public.M('sites').where( 'id=?', (get.id,)).getField('project_type') == 'Go' or public.M('sites').where('id=?', (get.id,)).getField( 'project_type') == 'Other': get.path = '/www/wwwroot/java_node_ssl/' runPath = '' # 判断是否是Node项目 elif public.M('sites').where('id=?', (get.id,)).getField('project_type') == 'Node': get.path = public.M('sites').where('id=?', (get.id,)).getField('path') runPath = '' # 判断是否是python项目 elif public.M('sites').where( 'id=?', (get.id,)).getField('project_type') == 'Python': get.path = public.M('sites').where('id=?', (get.id,)).getField('path') runPath = '' else: runPath = self.GetRunPath(get) if runPath != False and runPath != '/': get.path += runPath authfile = get.path + '/.well-known/pki-validation/fileauth.txt' if not self.CheckDomain(get): if not os.path.exists(authfile): return public.return_message(-1, 0, public.lang("Unable to write validation file: {}", authfile)) else: msg = '''can't correct access validation file <br><a class="btlink" href="{c_url}" target="_blank">{c_url}</a> <br><br> <p></b>Possible cause:</b></p> 1、the resolution is not correct, or the resolution does not work [please resolve the domain correctly, or wait for the resolution to work and try again]<br> 2、 check whether the 301/302 redirection is set [please temporarily turn off the redirection related configuration]<br> 3、 Check whether the site has HTTPS deployed and set mandatory HTTPS [Please temporarily turn off mandatory HTTPS feature]<br>'''.format( c_url=self._check_url) return public.return_message(-1, 0, msg) action = 'ApplyDVSSL' if hasattr(get, 'partnerOrderId'): self.__PDATA['data']['partnerOrderId'] = get.partnerOrderId action = 'ReDVSSL' self.__PDATA['data']['domain'] = get.domain self.__PDATA['data']['orgPhone'] = get.orgPhone self.__PDATA['data']['orgPostalCode'] = get.orgPostalCode self.__PDATA['data']['orgRegion'] = get.orgRegion self.__PDATA['data']['orgCity'] = get.orgCity self.__PDATA['data']['orgAddress'] = get.orgAddress self.__PDATA['data']['orgDivision'] = get.orgDivision self.__PDATA['data']['orgName'] = get.orgName self.__PDATA['data'] = self.De_Code(self.__PDATA['data']) try: result = public.httpPost(self.__APIURL + 'user/' + action, self.__PDATA) except Exception as ex: raise public.error_conn_cloud(str(ex)) try: result = json.loads(result) except: return result if 'status' in result: if not result['status']: return result result['data'] = self.En_Code(result['data']) try: if not 'authPath' in result['data']: result['data']['authPath'] = '/.well-known/pki-validation/' authfile = get.path + result['data']['authPath'] + result['data']['authKey'] except: if 'authKey' in result['data']: authfile = get.path + '/.well-known/pki-validation/' + result['data']['authKey'] else: return public.return_message(-1, 0, public.lang(" Failed to get the validation file!")) if 'authValue' in result['data']: public.writeFile(authfile, result['data']['authValue']) return result # 发送请求 todo def request(self, dname): self.__PDATA['data'] = json.dumps(self.__PDATA['data']) url_headers = { "authorization": "bt {}".format(self.__userInfo['token']) } result = public.return_msg_gettext(False, 'Failed to connect to the official website, please try again later!') try: # response_data = public.httpPost(self.__APIURL + '/' + dname, self.__PDATA) response_data = public.httpPost(self.__APIURL + '/' + dname, data=self.__PDATA, headers=url_headers) except Exception as ex: raise public.error_conn_cloud(str(ex)) try: result = json.loads(response_data) except: pass return result # # 发送请求 todo 测试购买证书 # def request_test(self, dname): # self.__PDATA['data'] = json.dumps(self.__PDATA['data']) # # "Content-Type": "application/json", # url_headers = { # "authorization": "bt {}".format(self.__userInfo['token']) # } # # result = public.return_msg_gettext(False, '测试用 The request failed, please try again later!') # try: # response_data = public.httpPost(self.__APIURLtest + '/' + dname, data=self.__PDATA, headers=url_headers) # # # public.print_log("******************** url: {}".format(self.__APIURLtest + '/' + dname)) # # except Exception as ex: # raise public.error_conn_cloud(str(ex)) # # # try: # result = json.loads(response_data) # except: # pass # return result # 获取订单列表 ??? def GetOrderList(self, get): if hasattr(get, 'siteName'): path = '/etc/letsencrypt/live/' + get.siteName + '/partnerOrderId' if os.path.exists(path): self.__PDATA['data']['partnerOrderId'] = public.readFile(path) else: path = '/www/server/panel/vhost/cert/' + get.siteName + '/partnerOrderId' if os.path.exists(path): self.__PDATA['data']['partnerOrderId'] = public.readFile(path) self.__PDATA['data'] = self.De_Code(self.__PDATA['data']) try: rs = public.httpPost(self.__APIURL + 'user/GetSSLList', self.__PDATA) except Exception as ex: raise public.error_conn_cloud(str(ex)) try: result = json.loads(rs) except: return public.return_message(-1, 0, public.lang("Failed to get, please try again later!")) result['data'] = self.En_Code(result['data']) for i in range(len(result['data'])): result['data'][i]['endtime'] = self.add_months(result['data'][i]['createTime'], result['data'][i]['validityPeriod']) return result # 计算日期增加(月) def add_months(self, dt, months): import calendar dt = datetime.datetime.fromtimestamp(dt / 1000) month = dt.month - 1 + months year = dt.year + month // 12 month = month % 12 + 1 day = min(dt.day, calendar.monthrange(year, month)[1]) return (time.mktime(dt.replace(year=year, month=month, day=day).timetuple()) + 86400) * 1000 # 申请证书 def GetDVSSL(self, get): get.id = public.M('domain').where('name=?', (get.domain,)).getField('pid') if hasattr(get, 'siteName'): get.path = public.M('sites').where('id=?', (get.id,)).getField('path') else: get.siteName = public.M('sites').where('id=?', (get.id,)).getField('name') # 当申请二级域名为www时,检测主域名是否绑定到同一网站 if get.domain[:4] == 'www.': if not public.M('domain').where('name=? AND pid=?', (get.domain[4:], get.id)).count(): return public.return_message(-1, 0, "Apply for [{}] certificate to verify [{}] Please bind [{}] and resolve to the site!".format( get.domain, get.domain[4:], get.domain[4:])) # 检测是否开启强制HTTPS if not self.CheckForceHTTPS(get.siteName): return public.return_message(-1, 0, public.lang("[Force HTTPS] is enabled on the current website, please turn off this function before applying for an SSL certificate!")) # 获取真实网站运行目录 runPath = self.GetRunPath(get) if runPath != False and runPath != '/': get.path += runPath # 提前模拟测试验证文件值是否正确 authfile = get.path + '/.well-known/pki-validation/fileauth.txt' if not self.CheckDomain(get): if not os.path.exists(authfile): return public.return_message(-1, 0, 'Cannot create [{}]', (authfile,)) else: msg = ''''Unable to access the verification file<br><a class="btlink" href="{c_url}" target="_blank">{c_url}</a> <br><br> <p></b>Possible reasons:</b></p> 1. Incorrect or ineffective DNS resolution [Please ensure correct domain name resolution or wait for the resolution to take effect and try again]<br> 2. Check if there are any 301/302 redirects set [Temporarily disable redirect-related configurations]<br> 3. Check if the website has enforced HTTPS [Temporarily disable the enforced HTTPS feature]<br>'''.format( c_url=self._check_url) return public.return_message(-1, 0, msg) action = 'GetDVSSL' if hasattr(get, 'partnerOrderId'): self.__PDATA['data']['partnerOrderId'] = get.partnerOrderId action = 'ReDVSSL' self.__PDATA['data']['domain'] = get.domain self.__PDATA['data'] = self.De_Code(self.__PDATA['data']) result = public.httpPost(self.__APIURL + 'user/' + action, self.__PDATA) try: result = json.loads(result) except: return result result['data'] = self.En_Code(result['data']) try: if 'authValue' in result['data'].keys(): public.writeFile(authfile, result['data']['authValue']) except: try: public.writeFile(authfile, result['data']['authValue']) except: return result return result # 检测是否强制HTTPS def CheckForceHTTPS(self, siteName): conf_file = '/www/server/panel/vhost/nginx/{}.conf'.format(siteName) if not os.path.exists(conf_file): return True conf_body = public.readFile(conf_file) if not conf_body: return True if conf_body.find('HTTP_TO_HTTPS_START') != -1: return False return True # 获取运行目录 def GetRunPath(self, get): if hasattr(get, 'siteName'): get.id = public.M('sites').where('name=?', (get.siteName,)).getField('id') else: get.id = public.M('sites').where('path=?', (get.path,)).getField('id') if not get.id: return False import panelSite result = panelSite.panelSite().GetSiteRunPath(get) return result['runPath'] # 检查域名是否解析 def CheckDomain(self, get): try: # 创建目录 spath = get.path + '/.well-known/pki-validation' if not os.path.exists(spath): os.makedirs(spath, 0o755, True) # public.ExecShell("mkdir -p '" + spath + "'") # 生成并写入检测内容 epass = public.GetRandomString(32) public.writeFile(spath + '/fileauth.txt', epass) # 检测目标域名访问结果 if get.domain[:4] == 'www.': # 申请二级域名为www时检测主域名 get.domain = get.domain[4:] import http_requests self._check_url = 'http://127.0.0.1/.well-known/pki-validation/fileauth.txt' result = http_requests.get(self._check_url, s_type='curl', timeout=6, headers={"host": get.domain}).text self.__test = result if result == epass: return True self._check_url = self._check_url.replace('127.0.0.1', get.domain) return False except: self._check_url = self._check_url.replace('127.0.0.1', get.domain) return False # 确认域名 def Completed(self, get): self.__PDATA['data']['partnerOrderId'] = get.partnerOrderId self.__PDATA['data'] = self.De_Code(self.__PDATA['data']) if hasattr(get, 'siteName'): get.path = public.M('sites').where('name=?', (get.siteName,)).getField('path') if public.M('sites').where('id=?', (public.M('domain').where('name=?', (get.siteName)).getField('pid'),)).getField( 'project_type') == 'Java' or public.M('sites').where('id=?', ( public.M('domain').where('name=?', (get.siteName)).getField('pid'),)).getField( 'project_type') == 'Go' or public.M('sites').where('id=?', ( public.M('domain').where('name=?', (get.siteName)).getField('pid'),)).getField( 'project_type') == 'Other': runPath = '/www/wwwroot/java_node_ssl' else: runPath = self.GetRunPath(get) if runPath != False and runPath != '/': get.path += runPath tmp = public.httpPost(self.__APIURL + 'user/SyncOrder', self.__PDATA) try: sslInfo = json.loads(tmp) except: return public.return_message(-1, 0, tmp) sslInfo['data'] = self.En_Code(sslInfo['data']) try: if public.M('sites').where('id=?', ( public.M('domain').where('name=?', (get.siteName)).getField('pid'),)).getField( 'project_type') == 'Java' or public.M('sites').where('id=?', ( public.M('domain').where('name=?', (get.siteName)).getField('pid'),)).getField( 'project_type') == 'Go' or public.M('sites').where('id=?', ( public.M('domain').where('name=?', (get.siteName)).getField('pid'),)).getField( 'project_type') == 'Other': spath = '/www/wwwroot/java_node_ssl/.well-known/pki-validation' else: spath = get.path + '/.well-known/pki-validation' if not os.path.exists(spath): public.ExecShell("mkdir -p '" + spath + "'") public.writeFile(spath + '/' + sslInfo['data']['authKey'], sslInfo['data']['authValue']) except: return public.return_message(-1, 0, public.lang("Verification error!")) try: result = json.loads(public.httpPost(self.__APIURL + 'user/Completed', self.__PDATA)) if 'data' in result: result['data'] = self.En_Code(result['data']) except: result = public.return_msg_gettext(True, 'Checking...') n = 0; my_ok = False while True: if n > 5: break time.sleep(5) rRet = json.loads(public.httpPost(self.__APIURL + 'user/SyncOrder', self.__PDATA)) n += 1 rRet['data'] = self.En_Code(rRet['data']) try: if rRet['data']['stateCode'] == 'COMPLETED': my_ok = True break except: return public.get_error_info() if not my_ok: return result return rRet # 同步指定订单 def SyncOrder(self, get): self.__PDATA['data']['partnerOrderId'] = get.partnerOrderId self.__PDATA['data'] = self.De_Code(self.__PDATA['data']) result = json.loads(public.httpPost(self.__APIURL + 'user/SyncOrder', self.__PDATA)) result['data'] = self.En_Code(result['data']) return result # 获取证书 def GetSSLInfo(self, get): # 校验参数 try: get.validate([ Param('siteName').String(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) self.__PDATA['data']['partnerOrderId'] = get.partnerOrderId self.__PDATA['data'] = self.De_Code(self.__PDATA['data']) time.sleep(3) result = json.loads(public.httpPost(self.__APIURL + 'user/GetSSLInfo', self.__PDATA)) result['data'] = self.En_Code(result['data']) if not 'privateKey' in result['data']: return result # 写配置到站点 if hasattr(get, 'siteName'): try: siteName = get.siteName path = '/www/server/panel/vhost/cert/' + siteName if not os.path.exists(path): public.ExecShell('mkdir -p ' + path) csrpath = path + "/fullchain.pem" keypath = path + "/privkey.pem" pidpath = path + "/partnerOrderId" # 清理旧的证书链 public.ExecShell('rm -f ' + keypath) public.ExecShell('rm -f ' + csrpath) public.ExecShell('rm -rf ' + path + '-00*') public.ExecShell('rm -rf /etc/letsencrypt/archive/' + get.siteName) public.ExecShell('rm -rf /etc/letsencrypt/archive/' + get.siteName + '-00*') public.ExecShell('rm -f /etc/letsencrypt/renewal/' + get.siteName + '.conf') public.ExecShell('rm -f /etc/letsencrypt/renewal/' + get.siteName + '-00*.conf') public.ExecShell('rm -f ' + path + '/README') public.ExecShell('rm -f ' + path + '/certOrderId') public.writeFile(keypath, result['data']['privateKey']) public.writeFile(csrpath, result['data']['cert'] + result['data']['certCa']) public.writeFile(pidpath, get.partnerOrderId) import panel_site_v2 as panelSite panelSite.panelSite().SetSSLConf(get) public.serviceReload() return public.return_message(0, 0, public.lang("Setup successfully!")) except: return public.return_message(-1, 0, public.lang("Failed to set")) result['data'] = self.En_Code(result['data']) return result def GetSiteDomain(self, get): """ @name 获取网站域名对应的站点名 @param cert_list 证书域名列表 @auther hezhihong return 证书域名对应的站点名字典,如证书域名未绑定则为空 """ all_site = [] # 所有站点名列表 cert_list = [] # 证书域名列表 site_list = [] # 证书域名列表对应的站点名列表 all_domain = [] # 所有域名列表 try: cert_list = json.loads(get.cert_list) except: pass result = {} # 取所有站点名和所有站点的绑定域名 all_sites = public.M('sites').field('name').select() for site in all_sites: all_site.append(site['name']) if not cert_list: continue tmp_dict = {} tmp_dict['name'] = site['name'] pid = public.M('sites').where("name=?", (site['name'],)).getField('id') domain_list = public.M('domain').where("pid=?", (pid,)).field('name').select() for domain in domain_list: all_domain.append(domain['name']) # 取证书域名所在的所有域名列表 site_domain = [] # 证书域名对应的站点名列表 if cert_list and all_domain: for cert in cert_list: d_cert = '' if re.match(r"^\*\..*", cert): d_cert = cert.replace('*.', '') for domain in all_domain: if cert == domain: site_domain.append(domain) else: replace_str = domain.split('.')[0] + '.' if d_cert and d_cert == domain.replace(replace_str, ''): site_domain.append(domain) # 取证书域名对应的站点名 for site in site_domain: site_id = public.M('domain').where("name=?", (site,)).getField('pid') site_name = public.M('sites').where("id=?", (site_id,)).getField('name') site_list.append(site_name) site_list = sorted(set(site_list), key=site_list.index) result['all'] = all_site result['site'] = site_list return result def SetBatchCertToSite(self, get): """ @name 批量部署证书 @auther hezhihong """ ssl_list = [] if not hasattr(get, 'BatchInfo') or not get.BatchInfo: return public.return_message(-1, 0, public.lang("parameter error")) else: ssl_list = json.loads(get.BatchInfo) if isinstance(ssl_list, list): total_num = len(ssl_list) resultinfo = {"total": total_num, "success": 0, "faild": 0, "successList": [], "faildList": []} successList = [] faildList = [] successnum = 0 failnum = 0 for Info in ssl_list: set_result = {} set_result['status'] = True get.certName = set_result['certName'] = Info['certName'] get.siteName = set_result['siteName'] = str(Info['siteName']) # 站点名称必定为字符串 get.isBatch = True if "ssl_hash" in Info: get.ssl_hash = Info['ssl_hash'] result = self.SetCertToSite(get) if not result or result.get("status") == -1 or result.get("status") is False: set_result['status'] = False failnum += 1 faildList.append(set_result) else: successnum += 1 successList.append(set_result) public.writeSpeed('setssl', successnum + failnum, total_num) import firewalls get.port = '443' get.ps = 'HTTPS' firewalls.firewalls().AddAcceptPort(get) public.serviceReload() resultinfo['success'] = successnum resultinfo['faild'] = failnum resultinfo['successList'] = successList resultinfo['faildList'] = faildList if hasattr(get, "set_https_mode") and get.set_https_mode.strip() in (True, 1, "1", "true"): import panelSite sites_obj = panelSite.panelSite() if not sites_obj.get_https_mode(): sites_obj.set_https_mode() else: return public.return_message(-1, 0, public.lang("Parameter type error")) return resultinfo # 部署证书夹证书 def SetCertToSite(self, get): """ @name 兼容批量部署 @auther hezhihong """ # 校验参数 try: get.validate([ Param('siteName').String(), Param('certName').String(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) try: result = self.GetCert(get) if not 'privkey' in result: return result siteName = get.siteName path = '/www/server/panel/vhost/cert/' + siteName if not os.path.exists(path): public.ExecShell('mkdir -p ' + path) csrpath = path + "/fullchain.pem" keypath = path + "/privkey.pem" # 清理旧的证书链 public.ExecShell('rm -f ' + keypath) public.ExecShell('rm -f ' + csrpath) public.ExecShell('rm -rf ' + path + '-00*') public.ExecShell('rm -rf /etc/letsencrypt/archive/' + get.siteName) public.ExecShell('rm -rf /etc/letsencrypt/archive/' + get.siteName + '-00*') public.ExecShell('rm -f /etc/letsencrypt/renewal/' + get.siteName + '.conf') public.ExecShell('rm -f /etc/letsencrypt/renewal/' + get.siteName + '-00*.conf') public.ExecShell('rm -f ' + path + '/README') if os.path.exists(path + '/certOrderId'): os.remove(path + '/certOrderId') public.writeFile(keypath, result['privkey']) public.writeFile(csrpath, result['fullchain']) import panel_site_v2 as panelSite panelSite.panelSite().SetSSLConf(get) public.serviceReload() return public.return_message(0, 0, public.lang("Setup successfully!")) except Exception as ex: import traceback public.print_log(traceback.format_exc()) public.print_log(f"error: {ex}") if 'isBatch' in get: return public.return_message(-1,0,"") return public.return_message(-1,0, 'SET_ERROR,' + public.get_error_info()) # 获取证书列表 def GetCertList(self, get): try: vpath = '/www/server/panel/vhost/ssl' if not os.path.exists(vpath): public.ExecShell("mkdir -p " + vpath) data = [] for d in os.listdir(vpath): mpath = vpath + '/' + d + '/info.json' if not os.path.exists(mpath): continue tmp = public.readFile(mpath) if not tmp: continue tmp1 = json.loads(tmp) data.append(tmp1) return public.return_message(0,0, data) except: return public.return_message(-1,0, []) # 删除证书 def RemoveCert(self, get): # 校验参数 try: get.validate([ Param('certName').String(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) try: vpath = '/www/server/panel/vhost/ssl/' + get.certName.replace("*.", '') if not os.path.exists(vpath): return public.return_message(-1, 0, public.lang("Certificate does NOT exist!")) public.ExecShell("rm -rf " + vpath) return public.return_message(0, 0, public.lang("Certificate deleted!")) except: return public.return_message(-1, 0, public.lang("Failed to delete!")) # 保存证书 def SaveCert(self, get): try: certInfo = self.GetCertName(get) if not certInfo: return_message=public.return_msg_gettext(False, public.lang('Certificate parsing failed')) del return_message['status'] return public.return_message(-1,0, return_message['msg']) SSLManger().save_by_file(get.certPath, get.keyPath) vpath = '/www/server/panel/vhost/ssl/' + certInfo['subject'] vpath = vpath.replace("*.", '') if not os.path.exists(vpath): public.ExecShell("mkdir -p " + vpath) public.writeFile(vpath + '/privkey.pem', public.readFile(get.keyPath)) public.writeFile(vpath + '/fullchain.pem', public.readFile(get.certPath)) public.writeFile(vpath + '/info.json', json.dumps(certInfo)) return_message=public.return_msg_gettext(True, public.lang('Successfully saved certificate!')) del return_message['status'] return public.return_message(0,0, return_message['msg']) except: return_message=public.return_msg_gettext(False, public.lang('Failed to save certificate!')) del return_message['status'] return public.return_message(-1,0, return_message['msg']) # 读取证书 def GetCert(self, get): if "ssl_hash" in get: path = f"/www/server/panel/vhost/ssl_saved/{get.ssl_hash}" return { "privkey": public.readFile(f"{path}/privkey.pem"), "fullchain": public.readFile(f"{path}/fullchain.pem"), } vpath = os.path.join('/www/server/panel/vhost/ssl', get.certName.replace("*.", '')) if not os.path.exists(vpath): return public.return_message(-1, 0, public.lang("Certificate does NOT exist!")) data = {} data['privkey'] = public.readFile(vpath + '/privkey.pem') data['fullchain'] = public.readFile(vpath + '/fullchain.pem') return data # 获取证书名称 def GetCertName(self, get): return self.get_cert_init(get.certPath) # try: # openssl = '/usr/local/openssl/bin/openssl' # if not os.path.exists(openssl): openssl = 'openssl' # result = public.ExecShell(openssl + " x509 -in "+get.certPath+" -noout -subject -enddate -startdate -issuer") # tmp = result[0].split("\n") # data = {} # data['subject'] = tmp[0].split('=')[-1] # data['notAfter'] = self.strfToTime(tmp[1].split('=')[1]) # data['notBefore'] = self.strfToTime(tmp[2].split('=')[1]) # if tmp[3].find('O=') == -1: # data['issuer'] = tmp[3].split('CN=')[-1] # else: # data['issuer'] = tmp[3].split('O=')[-1].split(',')[0] # if data['issuer'].find('/') != -1: data['issuer'] = data['issuer'].split('/')[0] # result = public.ExecShell(openssl + " x509 -in "+get.certPath+" -noout -text|grep DNS") # data['dns'] = result[0].replace('DNS:','').replace(' ','').strip().split(',') # return data # except: # print(public.get_error_info()) # return None def get_unixtime(self, data, format="%Y-%m-%d %H:%M:%S"): import time timeArray = time.strptime(data, format) timeStamp = int(time.mktime(timeArray)) return timeStamp # 获取指定证书基本信息 def get_cert_init_o(self, pem_file): if not os.path.exists(pem_file): return None try: import OpenSSL result = {} x509 = OpenSSL.crypto.load_certificate( OpenSSL.crypto.FILETYPE_PEM, public.readFile(pem_file)) # 取产品名称 issuer = x509.get_issuer() result['issuer'] = '' if hasattr(issuer, 'CN'): result['issuer'] = issuer.CN if not result['issuer']: is_key = [b'0', '0'] issue_comp = issuer.get_components() if len(issue_comp) == 1: is_key = [b'CN', 'CN'] for iss in issue_comp: if iss[0] in is_key: result['issuer'] = iss[1].decode() break if not result['issuer']: if hasattr(issuer, 'O'): result['issuer'] = issuer.O # 取到期时间 result['notAfter'] = self.strf_date( bytes.decode(x509.get_notAfter())[:-1]) # 取申请时间 result['notBefore'] = self.strf_date( bytes.decode(x509.get_notBefore())[:-1]) # 取可选名称 result['dns'] = [] for i in range(x509.get_extension_count()): s_name = x509.get_extension(i) if s_name.get_short_name() in [b'subjectAltName', 'subjectAltName']: s_dns = str(s_name).split(',') for d in s_dns: result['dns'].append(d.split(':')[1]) subject = x509.get_subject().get_components() # 取主要认证名称 if len(subject) == 1: result['subject'] = subject[0][1].decode() else: if not result['dns']: for sub in subject: if sub[0] == b'CN': result['subject'] = sub[1].decode() break # result['dns'].append(result['subject']) if 'subject' in result: result['dns'].append(result['subject']) else: result['subject'] = result['dns'][0] result['endtime'] = int( int(time.mktime(time.strptime(result['notAfter'], "%Y-%m-%d")) - time.time()) / 86400) return result except: return None # 获取指定证书基本信息 def get_cert_init(self, pem_file): if "/www/server/panel/class" not in sys.path: sys.path.insert(0, "/www/server/panel/class") import ssl_info return ssl_info.ssl_info().load_ssl_info(pem_file) # 转换时间 def strf_date(self, sdate): return time.strftime('%Y-%m-%d', time.strptime(sdate, '%Y%m%d%H%M%S')) # 转换时间 def strfToTime(self, sdate): import time return time.strftime('%Y-%m-%d', time.strptime(sdate, '%b %d %H:%M:%S %Y %Z')) # 获取产品列表 def GetSSLProduct(self, get): self.__PDATA['data'] = self.De_Code(self.__PDATA['data']) result = json.loads(public.httpPost(self.__APIURL + 'user/GetSSLProduct', self.__PDATA)) result['data'] = self.En_Code(result['data']) return result # 加密数据 def De_Code(self, data): if sys.version_info[0] == 2: import urllib pdata = urllib.urlencode(data) return binascii.hexlify(pdata) else: import urllib.parse pdata = urllib.parse.urlencode(data) if type(pdata) == str: pdata = pdata.encode('utf-8') return binascii.hexlify(pdata).decode() # 解密数据 def En_Code(self, data): if sys.version_info[0] == 2: import urllib result = urllib.unquote(binascii.unhexlify(data)) else: import urllib.parse if type(data) == str: data = data.encode('utf-8') tmp = binascii.unhexlify(data) if type(tmp) != str: tmp = tmp.decode('utf-8') result = urllib.parse.unquote(tmp) if type(result) != str: result = result.decode('utf-8') return json.loads(result) # 手动一键续签 def renew_lets_ssl(self, get): if not os.path.exists('vhost/cert/crontab.json'): return public.return_message(-1, 0, public.lang("There are currently no certificates to renew!")) old_list = json.loads(public.ReadFile("vhost/cert/crontab.json")) cron_list = old_list if hasattr(get, 'siteName'): if not get.siteName in old_list: return public.return_message(-1, 0, public.lang("There is no certificate that can be renewed on the current website..")) cron_list = {} cron_list[get.siteName] = old_list[get.siteName] import panelLets lets = panelLets.panelLets() result = {} result['status'] = True result['sucess_list'] = [] result['err_list'] = [] for siteName in cron_list: data = cron_list[siteName] ret = lets.renew_lest_cert(data) if ret['status']: result['sucess_list'].append(siteName) else: result['err_list'].append({"siteName": siteName, "msg": ret['msg']}) return result # todo? def renew_cert_order(self, args): ''' @name 续签商用证书 @author cjx @version 1.0 ''' if not 'pdata' in args: return public.return_message(-1, 0, public.lang("The pdata parameter cannot be empty!")) pdata = json.loads(args.pdata) self.__PDATA['data'] = pdata result = self.request('renew_cert_order') if result['status'] == True: self.__PDATA['data'] = {} args['oid'] = result['oid'] result['verify_info'] = self.get_verify_info(args) return result def GetAuthToken(self, get): """ 登录官网获取Token @get.username 官网手机号 @get.password 官网账号密码 """ rtmp = "" data = {} data['username'] = public.rsa_decrypt(get.username) data['password'] = public.md5(public.rsa_decrypt(get.password)) data['serverid'] = panelAuth().get_serverid() if 'code' in get: data['code'] = get.code if 'token' in get: data['token'] = get.token pdata = {} pdata['data'] = self.De_Code(data) try: rtmp = public.httpPost(self.__BINDURL, pdata) result = json.loads(rtmp) result['data'] = self.En_Code(result['data']) if not result['status']: return result if result['data']: if result['data']['serverid'] != data['serverid']: # 保存新的serverid public.writeFile('data/sid.pl', result['data']['serverid']) public.writeFile(self.__UPATH, json.dumps(result['data'])) if os.path.exists('data/bind_path.pl'): os.remove('data/bind_path.pl') public.flush_plugin_list() del (result['data']) session['focre_cloud'] = True return result except Exception as ex: error = str(ex) if error.lower().find('json') >= 0: error = '<br>错误:连接宝塔官网异常,请按照以下方法排除问题后重试:<br>解决方法:<a target="_blank" class="btlink" href="https://www.bt.cn/bbs/thread-87257-1-1.html">https://www.bt.cn/bbs/thread-87257-1-1.html</a><br>' # raise public.PanelError(error) return public.return_message(-1, 0, 6) else: return public.return_message(-1, 0, 6) # raise public.error_conn_cloud(error) # return public.return_message(-1, 0, public.lang("连接服务器失败!<br>{}", rtmp)) def GetBindCode(self, get): """ 获取验证码 """ rtmp = "" data = {} data['username'] = get.username data['token'] = get.token pdata = {} pdata['data'] = self.De_Code(data) try: rtmp = public.httpPost(self.__CODEURL, pdata) result = json.loads(rtmp) return result except Exception as ex: raise public.error_conn_cloud(str(ex)) # return public.returnMsg(False,'连接服务器失败!<br>' + rtmp) # 解析DNSAPI信息 def get_dnsapi(self, auth_to): tmp = auth_to.split('|') dns_name = tmp[0] key = "None" secret = "None" if len(tmp) < 3: dnsapi_config = json.loads(public.readFile('{}/config/dns_api.json'.format(public.get_panel_path()))) for dc in dnsapi_config: if dc['name'] != dns_name: continue if not dc['data']: continue key = dc['data'][0]['value'] secret = dc['data'][1]['value'] else: key = tmp[1] secret = tmp[2] return dns_name, key, secret # 获取dnsapi对象 def get_dns_class(self, auth_to): try: import panelDnsapi dns_name, key, secret = self.get_dnsapi(auth_to) dns_class = getattr(panelDnsapi, dns_name)(key, secret) dns_class._type = 1 return dns_class except: return None # 解析域名 def create_dns_record(self, auth_to, domain, dns_value, original_domain=None): # 如果为手动解析 if auth_to == 'dns': return None from panelDnsapi import DnsMager dns_class = DnsMager().get_dns_obj_by_domain(original_domain) dns_class._type = 1 if not dns_class: return public.return_message(-1, 0, public.lang("The operation failed. Please check that the key is correct")) # 申请前删除caa记录 root, zone = public.get_root_domain(domain) try: dns_class.remove_record(public.de_punycode(root), '@', 'CAA') except: pass try: dns_class.create_dns_record(public.de_punycode(domain), dns_value) return public.return_message(0, 0, public.lang("Added successfully")) except: return public.return_message(-1, 0, public.get_error_info()) # 检测ssl验证方式 def check_ssl_method(self, get): """ @name 检测ssl验证方式 @domain string 域名 """ domain = get.domain if public.M('sites').where('id=?', (public.M('domain').where('name=?', (domain)).getField('pid'),)).getField( 'project_type') == 'Java': siteRunPath = '{}/java_node_ssl'.format(public.M("config").getField("sites_path")) else: siteRunPath = self.get_domain_run_path(domain) if not siteRunPath: return public.return_message(-1, 0, public.lang("Failed to get the website path. Please check if the website exists")) verify_path = siteRunPath + '/.well-known/pki-validation' if not os.path.exists(verify_path): os.makedirs(verify_path) # 生成临时文件 check_val = public.GetRandomString(16) verify_file = '{}/{}.txt'.format(verify_path, check_val) public.writeFile(verify_file, check_val) if not os.path.exists(verify_file): return public.return_message(-1, 0, public.lang("Failed to create the validation file. Check if the write was blocked")) res = {} msg = [' domain name [{}] validation file cannot be accessed correctly'.format(domain), 'Probable cause', '1、the resolution was not correct, or the resolution did not work [Please resolve the domain correctly, or wait for the resolution to work and try again]', '2、check whether 301/302 redirects are set [please temporarily turn off redirects related configuration]', '3、check whether the site has enabled reverse proxy [please temporarily turn off reverse proxy configuration]' ] res['HTTP_CSR_HASH'] = msg res['HTTPS_CSR_HASH'] = msg # 检测HTTP/https访问 args = public.dict_obj() for stype in ['http', 'https']: args.url = '{}://{}/.well-known/pki-validation/{}.txt'.format(stype, domain, check_val) args.content = check_val if self.check_url_txt(args, 2) == 1: res['{}_CSR_HASH'.format(stype).upper()] = 1 # 检测caa记录 result = self.check_ssl_caa([domain]) if not result: res['CNAME_CSR_HASH'] = 1 else: res['CNAME_CSR_HASH'] = json.loads(result['data']) if os.path.exists(verify_file): os.remove(verify_file) return res @staticmethod def upload_cert_to_cloud(get): ssl_id = None ssl_hash = None try: if "ssl_id" in get: ssl_id = int(get.ssl_id) if "ssl_hash" in get: ssl_hash = get.ssl_hash.strip() except (ValueError, AttributeError, KeyError): return public.return_message(-1, 0, "parameter error") from ssl_manage import SSLManger try: return SSLManger().upload_cert(ssl_id, ssl_hash) except ValueError as e: return public.return_message(-1, 0, str(e)) except Exception as e: return public.return_message(-1, 0, "operation mistake:" + str(e)) @staticmethod def remove_cloud_cert(get): ssl_id = None ssl_hash = None local = False try: if "ssl_id" in get: ssl_id = int(get.ssl_id) if "ssl_hash" in get: ssl_hash = get.ssl_hash.strip() if "local" in get and get.local.strip() in ("1", 1, True, "true"): local = True except (ValueError, AttributeError, KeyError): return public.return_message(-1, 0, "parameter error") from ssl_manage import SSLManger try: return SSLManger().remove_cert(ssl_id, ssl_hash, local=local) except ValueError as e: return public.return_message(-1, 0, str(e)) except Exception as e: return public.return_message(-1, 0, "operation mistake:" + str(e)) # 未使用 @staticmethod def refresh_cert_list(get=None): from ssl_manage import SSLManger try: return public.return_message(0, 0, SSLManger().get_cert_list(force_refresh=True)) except ValueError as e: return public.return_message(-1, 0, str(e)) except Exception as e: return public.return_message(-1, 0, "operation mistake:" + str(e)) @staticmethod def get_cert_info(get): ssl_id = None ssl_hash = None try: if "ssl_id" in get: ssl_id = int(get.ssl_id) if "ssl_hash" in get: ssl_hash = get.ssl_hash.strip() except (ValueError, AttributeError, KeyError): return public.return_message(-1, 0, "parameter error") from ssl_manage import SSLManger try: ssl_mager = SSLManger() target = ssl_mager.find_ssl_info(ssl_id, ssl_hash) if target is None: return public.return_message(-1, 0, public.lang("No certificate information was obtained")) target.update(ssl_mager.get_cert_for_deploy(target["hash"])) return target except ValueError as e: return public.return_message(-1, 0, str(e)) except Exception as e: return public.return_message(-1, 0, "operation mistake:" + str(e)) @staticmethod def get_cert_list(get): """ search_limit 0 -> 所有证书 search_limit 1 -> 没有过期的证书 search_limit 2 -> 有效期小于等于15天的证书 但未过期 search_limit 3 -> 过期的证书 search_limit 4 -> 过期时间1年以上的证书 """ search_name = None search_limit = 0 force_refresh = False try: if "search_name" in get: search_name = get.search_name.strip() if "search_limit" in get: search_limit = int(get.search_limit.strip()) if "force_refresh" in get and get.force_refresh.strip() in ("1", 1, "True", True): force_refresh = True except (ValueError, AttributeError, KeyError): return public.return_message(-1, 0, "parameter error") param = None if search_name is not None: param = ['subject LIKE ?', ["%{}%".format(search_name)]] now = datetime.datetime.now() filter_func = lambda x: True if search_limit == 1: date = now.strftime("%Y-%m-%d") filter_func = lambda x: x["not_after"] >= date elif search_limit == 2: date1 = now.strftime("%Y-%m-%d") date2 = (now + datetime.timedelta(days=15)).strftime("%Y-%m-%d") filter_func = lambda x: date1 <= x["not_after"] <= date2 elif search_limit == 3: date = now.strftime("%Y-%m-%d") filter_func = lambda x: x["not_after"] < date elif search_limit == 4: date = (now + datetime.timedelta(days=366)).strftime("%Y-%m-%d") filter_func = lambda x: x["not_after"] > date from ssl_manage import SSLManger try: res_list = SSLManger().get_cert_list(param=param, force_refresh=force_refresh) return public.return_message(0, 0, list(filter(filter_func, res_list))) except ValueError as e: return public.return_message(-1, 0, str(e)) except Exception as e: return public.return_message(-1, 0, "operation mistake:" + str(e)) def verify_mail_any(self, args): if "email" not in args: return public.return_message(-1, 0, "Missing email parameter") email = args.email data = {'email': email} try: sUrl = '{}/api/user/sendVerifyMailAny'.format(public.OfficialApiBase()) rtmp = public.httpPost(sUrl, data) result = json.loads(rtmp) if result['success']: return public.return_message(0, 0, result['res']) else: return public.return_message(-1, 0, result['res']) except Exception as e: return public.return_message(-1, 0, "err:" + str(e))
Close