Linux ip-172-26-2-223 5.4.0-1018-aws #18-Ubuntu SMP Wed Jun 24 01:15:00 UTC 2020 x86_64
Apache
: 172.26.2.223 | : 13.59.1.209
Cant Read [ /etc/named.conf ]
8.1.13
www
www.github.com/MadExploits
Terminal
AUTO ROOT
Adminer
Backdoor Destroyer
Linux Exploit
Lock Shell
Lock File
Create User
CREATE RDP
PHP Mailer
BACKCONNECT
UNLOCK SHELL
HASH IDENTIFIER
CPANEL RESET
CREATE WP USER
BLACK DEFEND!
README
+ Create Folder
+ Create File
/
www /
server /
panel /
class_v2 /
[ HOME SHELL ]
Name
Size
Permission
Action
__pycache__
[ DIR ]
drwxr-xr-x
btdockerModelV2
[ DIR ]
drwxr-xr-x
crontabModelV2
[ DIR ]
drwxr-xr-x
databaseModelV2
[ DIR ]
drwxr-xr-x
firewallModelV2
[ DIR ]
drwxr-xr-x
logsModelV2
[ DIR ]
drwxr-xr-x
monitorModelV2
[ DIR ]
drwxr-xr-x
panelModelV2
[ DIR ]
drwxr-xr-x
power_mta
[ DIR ]
drwxr-xr-x
projectModelV2
[ DIR ]
drwxr-xr-x
safeModelV2
[ DIR ]
drwxr-xr-x
safe_warning_v2
[ DIR ]
drwxr-xr-x
ssl_domainModelV2
[ DIR ]
drwxr-xr-x
virtualModelV2
[ DIR ]
drwxr-xr-x
wp_toolkit
[ DIR ]
drwxr-xr-x
acme_v3.py
133.98
KB
-rw-r--r--
ajax_v2.py
95.41
KB
-rw-r--r--
apache_v2.py
17.28
KB
-rw-r--r--
backup_bak_v2.py
24.86
KB
-rw-r--r--
breaking_through.py
47.94
KB
-rw-r--r--
cloud_stora_upload_v2.py
19.27
KB
-rw-r--r--
common_v2.py
12.45
KB
-rw-r--r--
config_v2.py
165.36
KB
-rw-r--r--
crontab_ssl_v2.py
1.85
KB
-rw-r--r--
crontab_v2.py
111.93
KB
-rw-r--r--
data_v2.py
36.54
KB
-rw-r--r--
database_v2.py
125.54
KB
-rw-r--r--
datatool_v2.py
5.83
KB
-rw-r--r--
db_mysql_v2.py
11.41
KB
-rw-r--r--
db_v2.py
11.04
KB
-rw-r--r--
dk_db.py
18.34
KB
-rw-r--r--
download_file_v2.py
2.54
KB
-rw-r--r--
fastcgi_client_two_v2.py
12.26
KB
-rw-r--r--
fastcgi_client_v2.py
6.89
KB
-rw-r--r--
file_execute_deny_v2.py
10.34
KB
-rw-r--r--
files_v2.py
149.12
KB
-rw-r--r--
firewall_new_v2.py
22.4
KB
-rw-r--r--
firewalld_v2.py
11.09
KB
-rw-r--r--
firewalls_v2.py
17.44
KB
-rw-r--r--
flask_compress_v2.py
5.12
KB
-rw-r--r--
flask_sockets_v2.py
3.75
KB
-rw-r--r--
ftp_log_v2.py
21.72
KB
-rw-r--r--
ftp_v2.py
16.17
KB
-rw-r--r--
http_requests_v2.py
24.25
KB
-rw-r--r--
jobs_v2.py
36.98
KB
-rw-r--r--
letsencrypt_v2.py
12.85
KB
-rw-r--r--
log_analysis_v2.py
12.23
KB
-rw-r--r--
monitor_v2.py
13.53
KB
-rw-r--r--
one_key_wp_v2.py
75.79
KB
-rw-r--r--
panelControllerV2.py
4.97
KB
-rw-r--r--
panelDatabaseControllerV2.py
5.76
KB
-rw-r--r--
panelDockerControllerV2.py
5.86
KB
-rw-r--r--
panelFireControllerV2.py
4.65
KB
-rw-r--r--
panelModControllerV2.py
5.13
KB
-rw-r--r--
panelProjectControllerV2.py
6.07
KB
-rw-r--r--
panelSafeControllerV2.py
4.65
KB
-rw-r--r--
panel_api_v2.py
10.43
KB
-rw-r--r--
panel_auth_v2.py
33.21
KB
-rw-r--r--
panel_backup_v2.py
102.56
KB
-rw-r--r--
panel_dns_api_v2.py
22.2
KB
-rw-r--r--
panel_http_proxy_v2.py
11.33
KB
-rw-r--r--
panel_lets_v2.py
43.61
KB
-rw-r--r--
panel_mssql_v2.py
4.48
KB
-rw-r--r--
panel_mysql_v2.py
7.55
KB
-rw-r--r--
panel_php_v2.py
24.78
KB
-rw-r--r--
panel_ping_v2.py
2.88
KB
-rw-r--r--
panel_plugin_v2.py
125.11
KB
-rw-r--r--
panel_push_v2.py
23.78
KB
-rw-r--r--
panel_redirect_v2.py
34.02
KB
-rw-r--r--
panel_restore_v2.py
11.04
KB
-rw-r--r--
panel_site_v2.py
343.73
KB
-rw-r--r--
panel_ssl_v2.py
75.34
KB
-rw-r--r--
panel_task_v2.py
28.7
KB
-rw-r--r--
panel_video_V2.py
1.88
KB
-rw-r--r--
panel_warning_v2.py
68.71
KB
-rw-r--r--
password_v2.py
8.09
KB
-rw-r--r--
plugin_auth_v2.py
3.14
KB
-rw-r--r--
plugin_deployment_v2.py
28.85
KB
-rw-r--r--
san_baseline_v2.py
51.13
KB
-rw-r--r--
site_dir_auth_v2.py
17.67
KB
-rw-r--r--
ssh_security_v2.py
45.66
KB
-rw-r--r--
ssh_terminal_v2.py
58.86
KB
-rw-r--r--
system_v2.py
44.77
KB
-rw-r--r--
userRegister_v2.py
6.74
KB
-rw-r--r--
user_login_v2.py
21.2
KB
-rw-r--r--
vilidate_v2.py
4.94
KB
-rw-r--r--
wxapp_v2.py
5.62
KB
-rw-r--r--
Delete
Unzip
Zip
${this.title}
Close
Code Editor : acme_v3.py
#!/usr/bin/python # coding: utf-8 # ------------------------------------------------------------------- # aapanel # ------------------------------------------------------------------- # Copyright (c) 2015-2099 aapanel(http://www.aapanel.com) All rights reserved. # ------------------------------------------------------------------- # Author: hwliang <hwl@bt.cn> a # ------------------------------------------------------------------- # ------------------------------------------------------------------- # ACME v2客户端 # ------------------------------------------------------------------- import warnings warnings.filterwarnings("ignore", message=r".*doesn't\s+match\s+a\s+supported\s+version", module="requests") import re import fcntl import datetime import binascii import hashlib import base64 import json import shutil import time import os import sys import uuid os.chdir('/www/server/panel') if not 'class/' in sys.path: sys.path.insert(0, 'class/') import http_requests as requests import public try: import OpenSSL except: public.ExecShell("btpip install pyopenssl") import OpenSSL try: import dns.resolver except: public.ExecShell("btpip install dnspython") import dns.resolver import ssl_info #### # auth to 格式说明 # 旧版 auth to 格式: # 文件验证:/www/server/xxxx/xxxx # DNS->手动:dns DNS->api: CloudFlareDns|XXXXXXXX|XXXXXXXXX # # 待重构 # 新版 auth to 格式: # DNS->手动:dns DNS->api: dns#@api class acme_v2: _url = None _apis = None _config = {} _dns_domains = [] _bits = 2048 _acme_timeout = 30 _dns_class = None _user_agent = "BaoTa/1.0 (+https://www.aapanel.com)" _replay_nonce = None _verify = False _digest = "sha256" _max_check_num = 15 _wait_time = 5 _mod_index = {True: "Staging", False: "Production"} _debug = False _auto_wildcard = False _dnsapi_file = 'config/dns_api.json' _save_path = 'vhost/letsencrypt' _conf_file = 'config/letsencrypt.json' _conf_file_v2 = 'config/letsencrypt_v2.json' _request_type = 'curl' def __init__(self): if not os.path.exists(self._conf_file_v2) and os.path.exists(self._conf_file): shutil.copyfile(self._conf_file, self._conf_file_v2) if self._debug: self._url = 'https://acme-staging-v02.api.letsencrypt.org/directory' else: self._url = 'https://acme-v02.api.letsencrypt.org/directory' self.err = "" self._config = self.read_config() self._nginx_cache_file_auth = {} self._can_use_lua = None self._well_known_check_cache = {} def can_use_lua_module(self): if self._can_use_lua is None: # 查询lua_module 不为空 self._can_use_lua = public.ExecShell("nginx -V 2>&1 |grep lua_nginx_module")[0].strip() != '' return self._can_use_lua # 返回是否能通过lua 做了文件验证处理, 如果返回True,则表示可以处理了验证文件, 不再走之前的 if 验证方式 def can_use_lua_for_site(self, site_name: str, site_type: str): if self._can_use_lua is None: # 查询lua_module 不为空 self._can_use_lua = public.ExecShell("nginx -V 2>&1 |grep lua_nginx_module")[0].strip() != '' if not self._can_use_lua: return False if site_type.lower() in ("php", "proxy"): prefix = "" else: prefix = site_type.lower() + "_" ng_file = "{}/nginx/{}{}.conf".format(public.get_vhost_path(), prefix, site_name) ng_data = public.readFile(ng_file) if not ng_data: return False rep_well_known = re.compile( r"(#.*\n)?\s*include\s+/www/server/panel/vhost/nginx/well-known/.*\.conf;.*\n(#.*\n)?" r"(.*\n)*?\s*#error_page 404/404\.html;" ) # 匹配一下引入的外部配置文件,同时保证这个配置在SSL配置之前, 这样避免路由匹配问题 if rep_well_known.search(ng_data): lua_file = "{}/nginx/well-known/{}.conf".format(public.get_vhost_path(), site_name) lua_data = public.readFile(lua_file) if not lua_data or "set_by_lua_block $well_known" not in lua_data: return False else: return True else: return False # 返回是否能通过if 判断方式做了文件验证处理, 如果返回True,则表示可以 @staticmethod def can_use_if_for_file_check(site_name: str, site_type: str): if site_type.lower() in ("php", "proxy"): prefix = "" else: prefix = site_type.lower() + "_" # if 方式的文件验证必须是可重载的情况 if public.checkWebConfig() is not True: return False ng_file = "{}/nginx/{}{}.conf".format(public.get_vhost_path(), prefix, site_name) ng_data = public.readFile(ng_file) if not ng_data: return False rep_well_known = re.compile( r"(#.*\n)?\s*include\s+/www/server/panel/vhost/nginx/well-known/.*\.conf;.*\n(#.*\n)?" r"(.*\n)*?\s*#error_page 404/404\.html;" ) # 匹配一下引入的外部配置文件,同时保证这个配置在SSL配置之前, 这样避免路由匹配问题 if rep_well_known.search(ng_data): return True else: return False # 返回配置文件是否支持使用普通的文件验证 @staticmethod def can_use_base_file_check(site_name: str, site_type: str): if site_type.lower() in ("php", "proxy", "wp", "wp2"): prefix = "" else: prefix = site_type.lower() + "_" webserver = public.get_webserver() if webserver == "nginx": ng_file = "{}/nginx/{}{}.conf".format(public.get_vhost_path(), prefix, site_name) ng_data = public.readFile(ng_file) if not ng_data: return False rep_well_known = re.compile(r"location\s+([=~^]*\s*)?/?\\?\.well-known/?\s*{") if rep_well_known.search(ng_data): return True else: return False elif webserver == "apache" and prefix: # PHP 不用检查 ap_file = "{}/apache/{}{}.conf".format(public.get_vhost_path(), prefix, site_name) ap_data = public.readFile(ap_file) if not ap_data: return False rep_well_known_list = [ re.compile(r"<IfModule\s+alias_module>\s+Alias\s+/\.well-known/\s+\S+\s+</IfModule>"), re.compile(r"\s*ProxyPass\s+/\.well-known/\s+!", re.M), ] for rep_well_known in rep_well_known_list: if rep_well_known.search(ap_data): return True return False return True # 取接口目录 def get_apis(self): if not self._apis: # 尝试从配置文件中获取 api_index = self._mod_index[self._debug] if not 'apis' in self._config: self._config['apis'] = {} if api_index in self._config['apis']: if 'expires' in self._config['apis'][api_index] and 'directory' in self._config['apis'][api_index]: if time.time() < self._config['apis'][api_index]['expires']: self._apis = self._config['apis'][api_index]['directory'] return self._apis # 尝试从云端获取 res = requests.get(self._url, s_type=self._request_type) if not res.status_code in [200, 201]: result = res.json() if "type" in result: if result['type'] == 'urn:acme:error:serverInternal': raise Exception(public.lang( 'Service shutdown or internal error due to maintenance, ' 'check [ https://letsencrypt.status.io ] see for more details.') ) raise Exception(res.content) s_body = res.json() self._apis = {} self._apis['newAccount'] = s_body['newAccount'] self._apis['newNonce'] = s_body['newNonce'] self._apis['newOrder'] = s_body['newOrder'] self._apis['revokeCert'] = s_body['revokeCert'] self._apis['keyChange'] = s_body['keyChange'] # 保存到配置文件 self._config['apis'][api_index] = {} self._config['apis'][api_index]['directory'] = self._apis self._config['apis'][api_index]['expires'] = time.time() + \ 86400 # 24小时后过期 self.save_config() return self._apis # 获取帐户信息 def get_account_info(self, args): try: if not 'account' in self._config: return {} k = self._mod_index[self._debug] if not k in self._config['account']: self.get_apis() self.get_kid() account = self._config['account'][k] account['email'] = self._config['email'] self.set_crond() return account except Exception as ex: return public.return_msg_gettext(False, str(ex)) # 设置帐户信息 def set_account_info(self, args): if not 'account' in self._config: return public.return_msg_gettext(False, public.lang("The specified account does not exist")) account = json.loads(args.account) if 'email' in account: self._config['email'] = account['email'] del (account['email']) self._config['account'][self._mod_index[self._debug]] = account self.save_config() return public.return_msg_gettext(True, public.lang("Setup successfully!")) # 获取订单列表 def get_orders(self, args): if not 'orders' in self._config: return [] s_orders = [] for index in self._config['orders'].keys(): tmp_order = self._config['orders'][index] tmp_order['index'] = index s_orders.append(tmp_order) return s_orders # 删除订单 def remove_order(self, args): if not 'orders' in self._config: return public.return_msg_gettext(False, public.lang("The specified order does not exist!")) if not args.index in self._config['orders']: return public.return_msg_gettext(False, public.lang("The specified order does not exist!")) del (self._config['orders'][args.index]) self.save_config() return public.return_msg_gettext(True, public.lang("Order deleted successfully!")) # 取指定订单数据 def get_order_find(self, args): if not 'orders' in self._config: return public.return_msg_gettext(False, public.lang("The specified order does not exist!")) if not args.index in self._config['orders']: return public.return_msg_gettext(False, public.lang("The specified order does not exist!")) result = self._config['orders'][args.index] result['cert'] = self.get_cert_info(args.index) return result # 获取证书信息 def get_cert_info(self, index): cert = {} path = self._config['orders'][index]['save_path'] if not os.path.exists(path): self.download_cert(index) cert['private_key'] = public.readFile(path + "/privkey.pem") cert['fullchain'] = public.readFile(path + "/fullchain.pem") return cert # 更新证书压缩包 def update_zip(self, args): path = self._config['orders'][args.index]['save_path'] if not os.path.exists(path): # 尝试重新下载证书 self.download_cert(args.index) if not os.path.exists(path): return public.return_msg_gettext(False, public.lang("Certificate read failed, directory does not exist!")) import panelTask bt_task = panelTask.bt_task() zip_file = path + '/cert.zip' result = bt_task._zip(path, '.', path + '/cert.zip', '/dev/null', 'zip') if not os.path.exists(zip_file): return result return public.return_msg_gettext(True, zip_file) # 吊销证书 def revoke_order(self, index): if type(index) != str: index = index.index if not index in self._config['orders']: raise Exception(public.lang("The specified order does not exist!")) cert_path = self._config['orders'][index]['save_path'] if not os.path.exists(cert_path): raise Exception(public.lang("No certificate found for the specified order!")) cert = self.dump_der(cert_path) if not cert: raise Exception(public.lang("Certificate read failed!")) payload = { "certificate": self.calculate_safe_base64(cert), "reason": 4 } res = self.acme_request(self._apis['revokeCert'], payload) if res.status_code in [200, 201]: if os.path.exists(cert_path): public.ExecShell("rm -rf {}".format(cert_path)) del (self._config['orders'][index]) self.save_config() return public.return_msg_gettext(True, public.lang("Certificate revoked!")) return res.json() # 取根域名和记录值 def extract_zone(self, domain_name): top_domain_list = public.readFile('{}/config/domain_root.txt'.format(public.get_panel_path())) if top_domain_list: top_domain_list = top_domain_list.strip().split('\n') else: top_domain_list = [] old_domain_name = domain_name top_domain = "." + ".".join(domain_name.rsplit('.')[-2:]) new_top_domain = "." + top_domain.replace(".", "") is_tow_top = False if top_domain in top_domain_list: is_tow_top = True domain_name = domain_name[:-len(top_domain)] + new_top_domain if domain_name.count(".") > 1: zone, middle, last = domain_name.rsplit(".", 2) if is_tow_top: last = top_domain[1:] root = ".".join([middle, last]) else: zone = "" root = old_domain_name return root, zone # 自动构造通配符 def auto_wildcard(self, domains): if not domains: return domains domain_list = [] for domain in domains: root, zone = self.extract_zone(domain) tmp_list = zone.rsplit(".", 1) if len(tmp_list) == 1: if root not in domain_list: domain_list.append(root) if not "*." + root in domain_list: domain_list.append("*." + root) else: new_root = "{}.{}".format(tmp_list[1], root) if new_root not in domain_list: domain_list.append(new_root) if not "*." + new_root in domain_list: domain_list.append("*." + new_root) return domain_list # 构造域名列表 def format_domains(self, domains): if type(domains) != list: return [] # 是否自动构造通配符 if self._auto_wildcard: domains = self.auto_wildcard(domains) wildcard = [] tmp_domains = [] for domain in domains: domain = domain.strip() if domain in tmp_domains: continue # 将通配符域名转为验证正则表达式 f_index = domain.find("*.") if f_index not in [-1, 0]: continue if f_index == 0: wildcard.append(domain.replace( "*", r"^[\w-]+").replace(".", r"\.")) # 添加到申请列表 tmp_domains.append(domain) # 处理通配符包含 apply_domains = tmp_domains[:] for domain in tmp_domains: for w in wildcard: if re.match(w, domain): apply_domains.remove(domain) return apply_domains # 创建订单 def create_order(self, domains, auth_type, auth_to, index=None): domains = self.format_domains(domains) if not domains: raise Exception(public.lang("Need at least a domain name!")) # 构造标识 identifiers = [] for domain_name in domains: identifiers.append({"type": 'dns', "value": domain_name}) payload = {"identifiers": identifiers} # 请求创建订单 res = self.acme_request(self._apis['newOrder'], payload) if not res.status_code in [201, 200]: # 如果创建失败 e_body = res.json() if 'type' in e_body: # 如果随机数失效 if e_body['type'].find('error:badNonce') != -1: self.get_nonce(force=True) res = self.acme_request(self._apis['newOrder'], payload) # 如果帐户失效 if e_body['detail'].find('KeyID header contained an invalid account URL') != -1: k = self._mod_index[self._debug] del (self._config['account'][k]) self.get_kid() self.get_nonce(force=True) res = self.acme_request(self._apis['newOrder'], payload) if not res.status_code in [201, 200]: a_auth = res.json() ret_title = self.get_error(str(a_auth)) raise StopIteration( "{0} >>>> {1}".format( ret_title, json.dumps(a_auth) ) ) # 返回验证地址和验证 s_json = res.json() s_json['auth_type'] = auth_type s_json['domains'] = domains s_json['auth_to'] = auth_to index = self.save_order(s_json, index) return index def get_site_run_path_byid(self, site_id): ''' @name 通过site_id获取网站运行目录 @author hwliang @param site_id<int> 网站标识 @return None or string ''' if public.M('sites').where('id=? and project_type=?', (site_id, 'PHP')).count() >= 1: site_path = public.M('sites').where('id=?', site_id).getField('path') if not site_path: return None if not os.path.exists(site_path): return None args = public.dict_obj() args.id = site_id import panelSite run_path = panelSite.panelSite().GetRunPath(args) if run_path in ['/']: run_path = '' if run_path: if run_path[0] == '/': run_path = run_path[1:] site_run_path = os.path.join(site_path, run_path) if not os.path.exists(site_run_path): return site_path return site_run_path else: find = public.M('sites').where('id=?', site_id).find() if not find: return False count = public.M('sites').where( 'id=? and project_type in (?,?,?,?)', (site_id, 'Java', 'Go', 'Other', "Python") ).count() if count: try: project_info = json.loads(find['project_config']) if 'ssl_path' not in project_info: ssl_path = '/www/wwwroot/java_node_ssl' else: ssl_path = project_info['ssl_path'] if not os.path.exists(ssl_path): os.makedirs(ssl_path) return ssl_path except: return False else: import panelSite auth_to = find['path'] + '/' + panelSite.panelSite().GetRunPath(public.to_dict_obj({"id": site_id})) auth_to = auth_to.replace("//", "/") if auth_to[-1] == '/': auth_to = auth_to[:-1] if not os.path.exists(auth_to): return False return auth_to def get_site_run_path(self, domains): ''' @name 通过域名列表获取网站运行目录 @author hwliang @param domains<list> 域名列表 @return None or string ''' site_id = 0 for domain in domains: site_id = public.M('domain').where("name=?", domain).getField('pid') if site_id: break if not site_id: return None return self.get_site_run_path_byid(site_id) # 获取验证信息 def get_auths(self, index): if not index in self._config['orders']: raise Exception(public.lang("The specified order does not exist!")) # 检查是否已经获取过授权信息 if 'auths' in self._config['orders'][index]: # 检查授权信息是否过期 if time.time() < self._config['orders'][index]['auths'][0]['expires']: return self._config['orders'][index]['auths'] if self._config['orders'][index]['auth_type'] != 'dns': site_run_path = self.get_site_run_path(self._config['orders'][index]['domains']) if site_run_path: self._config['orders'][index]['auth_to'] = site_run_path # 清理旧验证 self.clear_auth_file(index) auths = [] for auth_url in self._config['orders'][index]['authorizations']: res = self.acme_request(auth_url, "") if res.status_code not in [200, 201]: raise Exception("ACME_AUTH_ERR: {}".format(res.json())) s_body = res.json() if 'status' in s_body: if s_body['status'] in ['invalid']: raise Exception('ACME_INVALID_ORDER') if s_body['status'] in ['valid']: # 跳过无需验证的域名 continue s_body['expires'] = self.utc_to_time(s_body['expires']) identifier_auth = self.get_identifier_auth(index, auth_url, s_body) if not identifier_auth: raise Exception('ACME_V_INFO_ERR') acme_keyauthorization, auth_value = self.get_keyauthorization(identifier_auth['token']) identifier_auth['acme_keyauthorization'] = acme_keyauthorization identifier_auth['auth_value'] = auth_value identifier_auth['expires'] = s_body['expires'] identifier_auth['auth_to'] = self._config['orders'][index]['auth_to'] identifier_auth['type'] = self._config['orders'][index]['auth_type'] # 设置验证信息 self.set_auth_info(identifier_auth, index=index) auths.append(identifier_auth) self._config['orders'][index]['auths'] = auths self.save_config() if not self.check_config(index, "auths", auths): self.save_config() return auths # 更新随机数 def update_replay_nonce(self, res): replay_nonce = res.headers.get('Replay-Nonce') if replay_nonce: self._replay_nonce = replay_nonce # 设置验证信息 def set_auth_info(self, identifier_auth, index=None): # 从云端验证, 暂无用 if not self.cloud_check_domain(identifier_auth['domain']): self.err = "Cloud verification failed!" # 是否手动验证DNS if identifier_auth['auth_to'] == 'dns': return None # 是否文件验证 if identifier_auth['type'] in ['http', 'tls']: self.write_auth_file( identifier_auth['auth_to'], identifier_auth['token'], identifier_auth['acme_keyauthorization'], index ) else: # auth_to=dns-api self.create_dns_record( identifier_auth['auth_to'], identifier_auth['domain'], identifier_auth['auth_value'] ) # 从云端验证域名是否可访问 def cloud_check_domain(self, domain): try: result = requests.post('{}/api/panel/checkDomain'.format(public.OfficialApiBase()), {"domain": domain, "ssl": 1}, s_type=self._request_type).json() return result['status'] except: return False # 清理验证文件 def clear_auth_file(self, index): if not self._config['orders'][index]['auth_type'] in ['http', 'tls']: return True acme_path = '{}/.well-known/acme-challenge'.format(self._config['orders'][index]['auth_to']) acme_path = acme_path.replace("//", '/') write_log('|-Verify the dir:{}'.format(acme_path)) if os.path.exists(acme_path): public.ExecShell("rm -f {}/*".format(acme_path)) acme_path = '/www/server/stop/.well-known/acme-challenge' if os.path.exists(acme_path): public.ExecShell("rm -f {}/*".format(acme_path)) def change_well_known_mod(self, path_dir: str): path_dir = path_dir.rstrip("/") if not os.path.isdir(path_dir): return False if path_dir in self._well_known_check_cache: return True else: self._well_known_check_cache[path_dir] = True import stat try: import pwd uid_data = pwd.getpwnam("www") uid = uid_data.pw_uid gid = uid_data.pw_gid except: return # 逐级给最低访问权限 while path_dir != "/": path_dir_stat = os.stat(path_dir) if path_dir_stat.st_uid == 0 and uid != 0: old_mod = stat.S_IMODE(path_dir_stat.st_mode) if not old_mod & (1 << 3): os.chmod(path_dir, old_mod + (1 << 3)) # chmod g+x if path_dir_stat.st_uid == uid: old_mod = stat.S_IMODE(path_dir_stat.st_mode) if not old_mod & (1 << 6): os.chmod(path_dir, old_mod + (1 << 6)) # chmod u+x elif path_dir_stat.st_gid == gid: old_mod = stat.S_IMODE(path_dir_stat.st_mode) if not old_mod & (1 << 3): os.chmod(path_dir, old_mod + (1 << 6)) # chmod g+x elif path_dir_stat.st_uid != uid or path_dir_stat.st_gid != gid: old_mod = stat.S_IMODE(path_dir_stat.st_mode) if not old_mod & 1: os.chmod(path_dir, old_mod + 1) # chmod o+x path_dir = os.path.dirname(path_dir) # 写验证文件 def write_auth_file(self, auth_to, token, acme_keyauthorization, index): if public.get_webserver() == "nginx": # 如果是nginx尝试使用配置文件进行验证 self.write_ngin_authx_file(auth_to, token, acme_keyauthorization, index) # 尝试写文件进行验证 try: acme_path = '{}/.well-known/acme-challenge'.format(auth_to) acme_path = acme_path.replace("//", '/') if not os.path.exists(acme_path): os.makedirs(acme_path) public.set_own(acme_path, 'www') self.change_well_known_mod(acme_path) wellknown_path = '{}/{}'.format(acme_path, token) public.writeFile(wellknown_path, acme_keyauthorization) public.set_own(wellknown_path, 'www') acme_path = '/www/server/stop/.well-known/acme-challenge' if not os.path.exists(acme_path): os.makedirs(acme_path) public.set_own(acme_path, 'www') self.change_well_known_mod(acme_path) wellknown_path = '{}/{}'.format(acme_path, token) public.writeFile(wellknown_path, acme_keyauthorization) public.set_own(wellknown_path, 'www') return True except: err = public.get_error_info() print(err) raise Exception(public.lang('Writing verification file failed: {}', err)) def write_ngin_authx_file(self, auth_to, token, acme_keyauthorization, index): site_name, project_type = self.get_site_name_by_domains(self._config["orders"][index]["domains"]) if site_name is None: return if self.can_use_lua_for_site(site_name, project_type): return if project_type.lower() in ("php", "proxy"): nginx_conf_path = "{}/vhost/nginx/{}.conf".format(public.get_panel_path(), site_name) else: nginx_conf_path = "{}/vhost/nginx/{}_{}.conf".format(public.get_panel_path(), project_type.lower(), site_name) nginx_conf = public.readFile(nginx_conf_path) if nginx_conf is False: return file_check_config_path = "/www/server/panel/vhost/nginx/well-known/{}.conf".format(site_name) if not os.path.exists("/www/server/panel/vhost/nginx/well-known"): os.makedirs("/www/server/panel/vhost/nginx/well-known", 0o755) # 如果主配置中,没有引用则尝试添加,添加失败就跳出 if not re.search(r"\s*include\s+/www/server/panel/vhost/nginx/well-known/.*\.conf;", nginx_conf, re.M): ssl_line = re.search(r"(#.*\n\s*)?#error_page 404/404\.html;", nginx_conf) if ssl_line is None: return default_cert_apply_check = ( "#CERT-APPLY-CHECK--START\n" " # Configuration related to file verification for SSL certificate application - Do not delete\n" " include /www/server/panel/vhost/nginx/well-known/{}.conf;\n" " #CERT-APPLY-CHECK--END\n " ).format(site_name) if not os.path.exists(file_check_config_path): public.writeFile(file_check_config_path, "") new_conf = nginx_conf.replace(ssl_line.group(), default_cert_apply_check + ssl_line.group(), 1) public.writeFile(nginx_conf_path, new_conf) isError = public.checkWebConfig() if isError is not True: public.writeFile(nginx_conf_path, nginx_conf) return # 如果主配置有引用, 不再检测位置关系,因为不能保证用户的自定义配置的优先级, 直接进行文件验证的 lua 方式和 if 方式的尝试 if self.can_use_lua_module(): self.write_lua_file_for_site(file_check_config_path) return # 开始尝试if 验证方式 if auth_to not in self._nginx_cache_file_auth: self._nginx_cache_file_auth[auth_to] = [] self._nginx_cache_file_auth[auth_to].append((token, acme_keyauthorization)) tmp_data = [] for token, acme_key in self._nginx_cache_file_auth[auth_to]: tmp_data.append(( 'if ($request_uri ~ "^/\\.well-known/acme-challenge/{}.*"){{\n' ' return 200 "{}";\n' '}}\n' ).format(token, acme_key)) public.writeFile(file_check_config_path, "\n".join(tmp_data)) isError = public.checkWebConfig() if isError is True: public.serviceReload() else: public.writeFile(file_check_config_path, "") @staticmethod def write_lua_file_for_site(file_check_config_path: str): old_data = public.readFile(file_check_config_path) if isinstance(old_data, str) and "set_by_lua_block $well_known" in old_data: return lua_file_data = r""" set $well_known ''; if ( $uri ~ "^/.well-known/" ) { set_by_lua_block $well_known { --get path local m,err = ngx.re.match(ngx.var.uri,"/.well-known/(.*)","isjo") -- If the path matches if m then -- Splicing file path local filename = ngx.var.document_root .. m[0] -- Determine if the file path is legal if not ngx.re.find(m[1],"\\\\./","isjo") then -- Determine if the file exists local is_exists = io.open(filename, "r") if not is_exists then -- Java project? filename = "/www/wwwroot/java_node_ssl" .. m[0] end -- release if is_exists then is_exists:close() end -- read file local fp = io.open(filename,'r') if fp then local file_body = fp:read("*a") fp:close() if file_body then ngx.header['content-type'] = 'text/plain' return file_body end end end end return "" } } if ( $well_known != "" ) { return 200 $well_known; } """ public.writeFile(file_check_config_path, lua_file_data) isError = public.checkWebConfig() if isError is True: public.serviceReload() else: public.writeFile(file_check_config_path, old_data) # 解析域名 # def create_dns_record(self, auth_to, domain, dns_value): # # 如果为手动解析 # if auth_to == 'dns': # return None # # from panelDnsapi import DnsMager # # self._dns_class = DnsMager().get_dns_obj_by_domain(domain) # self._dns_class.create_dns_record(public.de_punycode(domain), dns_value) # self._dns_domains.append({"domain": domain, "dns_value": dns_value}) # 解析域名 def create_dns_record(self, auth_to, domain, dns_value): # 如果为手动解析 if auth_to == 'dns': return None if auth_to.find('|') != -1: import panelDnsapi dns_name, key, secret = self.get_dnsapi(auth_to) self._dns_class = getattr(panelDnsapi, dns_name)(key, secret) self._dns_class.create_dns_record(public.de_punycode(domain), dns_value) else: pass # todo 待重构 # from sslModel import dataModel # dataModel.main().add_dns_value_by_domain(domain, dns_value, is_let_txt=True) self._dns_domains.append({"domain": domain, "dns_value": dns_value}) return # 解析DNSAPI信息 def get_dnsapi(self, auth_to): tmp = auth_to.split('|') dns_name = tmp[0] key = "None" secret = "None" if len(tmp) < 3: try: dnsapi_config = json.loads(public.readFile(self._dnsapi_file)) for dc in dnsapi_config: if dc['name'] != dns_name: continue if not dc['data']: continue key = dc['data'][0]['value'] secret = dc['data'][1]['value'] except: raise Exception(public.lang("No valid DNSAPI key information found")) else: key = tmp[1] secret = tmp[2] return dns_name, key, secret # 删除域名解析 def remove_dns_record(self): if not self._dns_domains: return None for dns_info in self._dns_domains: try: if self._dns_class: self._dns_class.delete_dns_record( public.de_punycode(dns_info['domain']), dns_info['dns_value']) else: pass # todo 待重构 # from sslModel import dataModel # dataModel.main().del_dns_value_by_domain(dns_info['domain'], is_let_txt=True) except Exception as e: pass # 验证域名 def auth_domain(self, index): self._config['orders'][index]['auth_tag'] = True self.save_config() if index not in self._config['orders']: raise Exception(public.lang("The specified order does not exist!")) if "auths" not in self._config['orders'][index]: raise Exception(public.lang("Order verification information is missing, please try reapplying!")) # 开始验证 for auth in self._config['orders'][index]['auths']: res = self.check_auth_status(auth['url']) # 检查是否需要验证 if res.json()['status'] == 'invalid': raise Exception(public.lang('Domain name verification failed. Please try to apply again!')) if res.json()['status'] == 'pending': if auth['type'] == 'dns': # 尝试提前验证dns解析 self.check_dns( domain=f"_acme-challenge.{auth['domain'].replace('*.', '')}", value=auth['auth_value'], s_type="TXT", ) self.respond_to_challenge(auth) # 检查验证结果 for i in range(len(self._config['orders'][index]['auths'])): self.check_auth_status( self._config['orders'][index]['auths'][i]['url'], ['valid', 'invalid'] ) self._config['orders'][index]['status'] = 'valid' # 验证单个域名 def auth_domain_api(self, get): index = get.index domain = get.domain self.get_apis() write_log("|-domain is Verifying...:{}".format(domain)) if index not in self._config['orders']: return public.return_msg_gettext(False, public.lang('The order does not exist!')) order = self._config['orders'][index] if "auths" not in order: return public.return_msg_gettext(False, public.lang( 'The order verification information has been lost. Please try to apply again!')) for auth in order['auths']: if auth.get("status") == "invalid": return public.return_msg_gettext(False, public.lang( "The domain name verification failed. Please try to apply again!")) if domain and auth['domain'] != domain: continue try: res = self.check_auth_status(auth['url']) # 检查是否需要验证 if res.json()['status'] == 'pending': self.respond_to_challenge(auth) _return = self.check_auth_status(auth['url'], ['valid', 'invalid']).json() auth['status'] = _return['status'] return auth auth['status'] = res.json()['status'] write_log("|-Verification succeeded!") return auth except StopIteration as e: auth['status'] = 'invalid' ex = str(e) if ex.find(">>>>") != -1: msg = ex.split(">>>>") msg[1] = json.loads(msg[1]) else: msg = ex write_log(public.get_error_info()) auth['error'] = msg return public.return_msg_gettext(False, public.lang(msg)) finally: self.save_config() if "pending" not in [i.get("status", "pending") for i in order['auths']]: return self.apply_dns_auth(get) write_log("|-This domain name was not found in the order: {}".format(domain)) return public.return_msg_gettext(False, "This domain name was not found in the order: {}".format(domain)) # 检查验证状态 def check_auth_status(self, url, desired_status=None): desired_status = desired_status or ["pending", "valid", "invalid"] number_of_checks = 0 while True: if desired_status == ['valid', 'invalid']: write_log('|-{} Query verification results..'.format(str(number_of_checks + 1))) time.sleep(self._wait_time) check_authorization_status_response = self.acme_request(url, "") a_auth = check_authorization_status_response.json() if not isinstance(a_auth, dict): write_log(a_auth) continue authorization_status = a_auth["status"] number_of_checks += 1 if authorization_status in desired_status: if authorization_status == "invalid": write_log("|-Verification failed") try: if 'error' in a_auth['challenges'][0]: ret_title = a_auth['challenges'][0]['error']['detail'] elif 'error' in a_auth['challenges'][1]: ret_title = a_auth['challenges'][1]['error']['detail'] elif 'error' in a_auth['challenges'][2]: ret_title = a_auth['challenges'][2]['error']['detail'] else: ret_title = str(a_auth) ret_title = self.get_error(ret_title) except: ret_title = str(a_auth) raise StopIteration( "{0} >>>> {1}".format( ret_title, json.dumps(a_auth) ) ) break if number_of_checks == self._max_check_num: raise StopIteration( public.lang( 'Error: Attempted verification {} times. The maximum number of verifications is {}. The verification interval is {} seconds.', str(number_of_checks), str(self._max_check_num), str(self._wait_time) )) if desired_status == ['valid', 'invalid']: write_log("|-Verification succeeded!") return check_authorization_status_response # 格式化错误输出 def get_error(self, error): write_log("error_result: " + str(error)) if error.find("Max checks allowed") >= 0: return public.lang( 'CA cannot verify your domain name, please check if the domain name resolution is correct, or wait 5-10 minutes and try again.') elif error.find("Max retries exceeded with") >= 0 or error.find('status_code=0 ') != -1: return public.lang("CA server connection timed out, please try again later.") elif error.find("The domain name belongs") >= 0: return public.lang( 'The domain name does not belong to this DNS service provider, please make sure the domain name is filled in correctly.') elif error.find("domains in the last 168 hours") != -1 and error.find("Error creating new order") != -1: return public.lang( "Issuance failed, the root domain name of domain name %s exceeds the maximum weekly issuance limit!" % re.findall( r"hours:\s+(.+?),", error)) elif error.find('login token ID is invalid') >= 0: return public.lang("DNS server connection failed, please check if the key is correct.") elif error.find('Error getting validation data') != -1: return public.lang( 'Data validation failed and the CA was unable to get the correct captcha from the authenticated connection.') elif "too many certificates already issued for exact set of domains" in error: return public.lang('Issuing failed, the domain {} has exceeded the limit of weekly reissues!', str(re.findall("exact set of domains: (.+):", error))) elif "Error creating new account :: too many registrations for this IP" in error: return public.lang( 'Issuing failed, the current server IP has reached the limit of creating up to 10 accounts every 3 hours.') elif "DNS problem: NXDOMAIN looking up A for" in error: return public.lang( 'Validation failed, domain name was not resolved, or resolution did not take effect!') elif "Invalid response from" in error: return public.lang( 'Verification failed, domain name resolution error or verification URL cannot be accessed!') elif error.find('TLS Web Server Authentication') != -1: return public.lang("Connection to CA server failed, please try again later.") elif error.find('Name does not end in a public suffix') != -1: return public.lang('Unsupported domain name {}, please check the domain name is correct!', str(re.findall("Cannot issue for \"(.+)\":", error))) elif error.find('No valid IP addresses found for') != -1: return public.get_msg_gettext( 'No resolution record was found for domain name {}, please check if the domain name resolution takes effect!', (str(re.findall("No valid IP addresses found for (.+)", error)),)) elif error.find('No TXT record found at') != -1: return public.get_msg_gettext( 'No valid TXT resolution record was found in the domain name {}, please check whether the TXT record is parsed correctly. If it is applied by DNSAPI, please try again in 10 minutes!', (str(re.findall("No TXT record found at (.+)", error)),)) elif error.find('Incorrect TXT record') != -1: return public.get_msg_gettext( 'A wrong TXT record was found on {}: {}, please check whether the TXT resolution is correct, if it is applied by DNSAPI, please try again in 10 minutes!', (str(re.findall("found at (.+)", error)), str(re.findall("Incorrect TXT record \"(.+)\"", error)))) elif error.find('Domain not under you or your user') != -1: return public.get_msg_gettext( 'This domain name does not exist under this dnspod account, adding resolution failed!') elif error.find('SERVFAIL looking up TXT for') != -1: return public.get_msg_gettext( 'No valid TXT resolution record was found in the domain name {}, please check whether the TXT record is parsed correctly. If it is applied by DNSAPI, please try again in 10 minutes!', (str(re.findall("looking up TXT for (.+)", error)),)) elif error.find('Timeout during connect') != -1: return public.get_msg_gettext( 'The connection timed out and the CA server was unable to access your website!') elif error.find("DNS problem: SERVFAIL looking up CAA for") != -1: return public.get_msg_gettext( 'Domain name {} is currently required to verify the CAA record, please parse the CAA record manually, or retry the application after 1 hour!', (str(re.findall("looking up CAA for (.+)", error)),)) elif error.find("Read timed out.") != -1: return public.get_msg_gettext( 'The verification timed out. Please check if the domain name is resolved correctly. If it is resolved correctly, the connection between the server and LetsEncrypt may be abnormal. Please try again later!') elif error.find('Cannot issue for') != -1: return public.get_msg_gettext( 'Cannot issue a certificate for {}, cannot apply for a wildcard certificate with a domain name suffix directly!', (str(re.findall(r'for\s+"(.+)"', error)),)) elif error.find('too many failed authorizations recently'): return public.get_msg_gettext( 'The account has more than 5 failed orders within 1 hour, please wait 1 hour and try again!') elif error.find("Error creating new order") != -1: return public.lang("Order creation failed, please try again later!") elif error.find("Too Many Requests") != -1: return public.get_msg_gettext( 'More than 5 verification failures in 1 hour, the application is temporarily banned, please try again later!') elif error.find('HTTP Error 400: Bad Request') != -1: return public.lang("CA server denied access, please try again later!") elif error.find('Temporary failure in name resolution') != -1: return public.get_msg_gettext( 'The DNS of the server is faulty and the domain name cannot be resolved. Please use the Linux toolbox to check the DNS configuration') elif error.find('Too Many Requests') != -1: return public.lang("Too many requests for this domain name. Please try again 3 hours later") elif error.find('Only domain names are supported') != -1: return public.lang("Let's Encrypt only supports applying for certificates using domain names") elif error.find('DNSSEC: DNSKEY Missing') != -1: return public.lang("The CA cannot find or obtain the DNSKEY record used to verify the DNSSEC signature") else: return error # 发送验证请求 def respond_to_challenge(self, auth): payload = {"keyAuthorization": "{0}".format(auth['acme_keyauthorization'])} respond_to_challenge_response = self.acme_request(auth['dns_challenge_url'], payload) return respond_to_challenge_response # 发送CSR def send_csr(self, index): csr = self.create_csr_new(index) payload = {"csr": self.calculate_safe_base64(csr)} send_csr_response = self.acme_request( url=self._config['orders'][index]['finalize'], payload=payload ) if send_csr_response.status_code not in [200, 201]: if send_csr_response.status_code == 0: raise ValueError( "Error: [Connection reset by peer], the request process may be accidentally intercepted, " "if only this domain name cannot apply, then the domain name may be abnormal!" ) raise ValueError( "Error: Sending CSR: Response Status {status_code} Response:{response}".format( status_code=send_csr_response.status_code, response=send_csr_response.json(), ) ) send_csr_response_json = send_csr_response.json() certificate_url = send_csr_response_json.get("certificate", "") self._config['orders'][index]['certificate_url'] = certificate_url self.save_config() return certificate_url # 获取证书到期时间 def get_cert_timeout(self, cert_data): info = ssl_info.ssl_info().load_ssl_info_by_data(cert_data) if not info: return int(time.time() + (86400 * 90)) try: return public.to_date(times=info['notAfter']) except: return int(time.time() + (86400 * 90)) # 下载证书 def download_cert(self, index): if self._debug is True: return { "cert": "---debug mode cert---", "private_key": "---debug mode private_key---", "cert_timeout": int(time.time() + (86400 * 90)), "domains": self._config['orders'][index]['domains'], } res = self.acme_request(self._config['orders'][index]['certificate_url'], "") if res.status_code not in [200, 201]: raise Exception(public.lang('Failed to download certificate: {}'.format(res.json()))) pem_certificate = res.content if type(pem_certificate) == bytes: pem_certificate = pem_certificate.decode('utf-8') cert = self.split_ca_data(pem_certificate) cert['cert_timeout'] = self.get_cert_timeout(cert['cert']) cert['private_key'] = self._config['orders'][index]['private_key'] cert['domains'] = self._config['orders'][index]['domains'] del (self._config['orders'][index]['private_key']) del (self._config['orders'][index]['auths']) del (self._config['orders'][index]['expires']) del (self._config['orders'][index]['authorizations']) del (self._config['orders'][index]['finalize']) del (self._config['orders'][index]['identifiers']) if 'cert' in self._config['orders'][index]: del (self._config['orders'][index]['cert']) self._config['orders'][index]['status'] = 'valid' self._config['orders'][index]['cert_timeout'] = cert['cert_timeout'] domain_name = self._config['orders'][index]['domains'][0] self._config['orders'][index]['save_path'] = '{}/{}'.format( self._save_path, domain_name ) cert['save_path'] = self._config['orders'][index]['save_path'] self.save_config() self.save_cert(cert, index) return cert # 保存证书到文件 def save_cert(self, cert, index): try: from ssl_manage import SSLManger SSLManger().save_by_data(cert['cert'] + cert['root'], cert['private_key']) domain_name = self._config['orders'][index]['domains'][0] path = self._config['orders'][index]['save_path'] if not os.path.exists(path): os.makedirs(path, 384) # 存储证书 key_file = path + "/privkey.pem" pem_file = path + "/fullchain.pem" public.writeFile(key_file, cert['private_key']) public.writeFile(pem_file, cert['cert'] + cert['root']) public.writeFile(path + "/cert.csr", cert['cert']) public.writeFile(path + "/root_cert.csr", cert['root']) self.set_exclude_hash(index, cert['cert'] + cert['root']) # 转为IIS证书 try: pfx_buffer = self.dump_pkcs12( cert['private_key'], cert['cert'] + cert['root'], cert['root'], domain_name) except: pfx_buffer = ssl_info.ssl_info().dump_pkcs12_new( cert['private_key'], cert['cert'] + cert['root'], cert['root'], domain_name) public.writeFile(path + "/fullchain.pfx", pfx_buffer, 'wb+') ps = '''Document description: privkey.pem Certificate private key fullchain.pem PEM format certificate with certificate chain (nginx/apache) root_cert.csr Root certificate cert.csr Domain name certificate fullchain.pfx Certificate format for IIS How to use in the aaPanel: privkey.pem Paste into the key entry box fullchain.pem Paste into certificate input box ''' public.writeFile(path + '/Description.txt', ps) self.sub_all_cert(key_file, pem_file) except: write_log(public.get_error_info()) def set_exclude_hash(self, order, exclude_hash): try: path = '{}/data/exclude_hash.json'.format(public.get_panel_path()) try: data = json.loads(public.readFile(path)) except: data = self.get_exclude_hash(public.to_dict_obj({})) if "exclude_hash_let" not in data: data["exclude_hash_let"] = {} data['exclude_hash_let'].update({order: self._hash(certificate=exclude_hash)}) public.writeFile(path, json.dumps(data)) except: pass def get_exclude_hash(self, get): path = '{}/data/exclude_hash.json'.format(public.get_panel_path()) import panelSSL exclude_data = panelSSL.panelSSL().get_exclude_hash(get) if exclude_data.get('version_let') == '1': return exclude_data if "exclude_hash_let" not in exclude_data: exclude_data["exclude_hash_let"] = {} data = self.read_config() try: self.get_apis() except: return exclude_data for order in data.get('orders', {}).values(): if order['status'] != "valid" or not order.get("certificate_url"): continue try: res = self.acme_request( order['certificate_url'], "") if res.status_code not in [200, 201]: continue pem_certificate = res.content if type(pem_certificate) == bytes: pem_certificate = pem_certificate.decode('utf-8') cert = self.split_ca_data(pem_certificate) exclude_data["exclude_hash_let"].update( {order['index']: self._hash(certificate=cert['cert'] + cert['root'])}) except: pass exclude_data['version_let'] = '1' public.writeFile(path, json.dumps(exclude_data)) return exclude_data def _hash(self, cert_filename: str = None, certificate: str = None, ignore_errors: bool = False): if cert_filename is not None and os.path.isfile(cert_filename): certificate = public.readFile(cert_filename) if not isinstance(certificate, str) or not certificate.startswith("-----BEGIN"): if ignore_errors: return None raise ValueError("证书格式错误") md5_obj = hashlib.md5() md5_obj.update(certificate.encode("utf-8")) return md5_obj.hexdigest() # 通过域名获取网站名称 def get_site_name_by_domains(self, domains): sql = public.M('domain') site_sql = public.M('sites') siteName, project_type = None, None for domain in domains: pid = sql.where('name=?', domain).getField('pid') if pid: site_data = site_sql.where('id=?', pid).field('name,project_type').find() siteName, project_type = site_data["name"], site_data["project_type"] break return siteName, project_type # 替换服务器上的同域名同品牌证书 def sub_all_cert(self, key_file, pem_file): cert_init = self.get_cert_init(pem_file) # 获取新证书的基本信息 paths = ['/www/server/panel/vhost/cert', '/www/server/panel/vhost/ssl', '/www/server/panel'] is_panel = False for path in paths: if not os.path.exists(path): continue for p_name in os.listdir(path): to_path = path + '/' + p_name to_pem_file = to_path + '/fullchain.pem' to_key_file = to_path + '/privkey.pem' to_info = to_path + '/info.json' # 判断目标证书是否存在 if not os.path.exists(to_pem_file): if not p_name in ['ssl']: continue to_pem_file = to_path + '/certificate.pem' to_key_file = to_path + '/privateKey.pem' if not os.path.exists(to_pem_file): continue # 获取目标证书的基本信息 to_cert_init = self.get_cert_init(to_pem_file) # 判断证书品牌是否一致 try: if to_cert_init['issuer'] != cert_init['issuer'] and to_cert_init['issuer'].find( "Let's Encrypt") == -1 and to_cert_init.get('issuer_O', '') != "Let's Encrypt": continue except: continue # 判断目标证书的到期时间是否较早 if to_cert_init['notAfter'] > cert_init['notAfter']: continue # 判断认识名称是否一致 if len(to_cert_init['dns']) != len(cert_init['dns']): continue is_copy = True for domain in to_cert_init['dns']: if not domain in cert_init['dns']: is_copy = False if not is_copy: continue # 替换新的证书文件和基本信息 public.writeFile( to_pem_file, public.readFile(pem_file, 'rb'), 'wb') public.writeFile( to_key_file, public.readFile(key_file, 'rb'), 'wb') public.writeFile(to_info, json.dumps(cert_init)) write_log( '|-Detected that the certificate under {} ' 'overlaps with the certificate of this application and has an earlier expiration time, ' 'and has been replaced with a new certificate!'.format(to_path) ) if path == paths[-1]: is_panel = True # 重载web服务 public.serviceReload() # if is_panel: public.restart_panel() # 检查指定证书是否在订单列表 def check_order_exists(self, pem_file): try: cert_init = self.get_cert_init(pem_file) if not cert_init: return None if not (cert_init['issuer'].find("Let's Encrypt") != -1 or cert_init['issuer'] in ( 'R3', 'R10', 'R11') or cert_init.get('issuer_O', '') == "Let's Encrypt"): return None for index in self._config['orders'].keys(): if not 'save_path' in self._config['orders'][index]: continue for domain in self._config['orders'][index]['domains']: if domain in cert_init['dns']: return index return pem_file except: return None # 取证书基本信息API def get_cert_init_api(self, args): if not os.path.exists(args.pem_file): args.pem_file = 'vhost/cert/{}/fullchain.pem'.format(args.siteName) if not os.path.exists(args.pem_file): return public.return_msg_gettext(False, public.lang("The specified certificate file does not exist!")) cert_init = self.get_cert_init(args.pem_file) if not cert_init: return public.return_msg_gettext(False, public.lang("Certificate information acquisition failed!")) try: cert_init['dnsapi'] = json.loads(public.readFile(self._dnsapi_file)) except: cert_init['dnsapi'] = [] return cert_init # 获取指定证书基本信息 def get_cert_init(self, pem_file): return ssl_info.ssl_info().load_ssl_info(pem_file) # 转换时间 def strf_date(self, sdate): return time.strftime('%Y-%m-%d', time.strptime(sdate, '%Y%m%d%H%M%S')) # 证书转为DER def dump_der(self, cert_path): cert = OpenSSL.crypto.load_certificate( OpenSSL.crypto.FILETYPE_PEM, public.readFile(cert_path + '/cert.csr')) return OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert) # 证书转为pkcs12 # noinspection PyUnresolvedReferences def dump_pkcs12(self, key_pem=None, cert_pem=None, ca_pem=None, friendly_name=None): p12 = OpenSSL.crypto.PKCS12() if cert_pem: p12.set_certificate(OpenSSL.crypto.load_certificate( OpenSSL.crypto.FILETYPE_PEM, cert_pem.encode())) if key_pem: p12.set_privatekey(OpenSSL.crypto.load_privatekey( OpenSSL.crypto.FILETYPE_PEM, key_pem.encode())) if ca_pem: p12.set_ca_certificates((OpenSSL.crypto.load_certificate( OpenSSL.crypto.FILETYPE_PEM, ca_pem.encode()),)) if friendly_name: p12.set_friendlyname(friendly_name.encode()) return p12.export() # 拆分根证书 def split_ca_data(self, cert): sp_key = '-----END CERTIFICATE-----\n' datas = cert.split(sp_key) return {"cert": datas[0] + sp_key, "root": sp_key.join(datas[1:])} # 构造可选名称 def get_alt_names(self, index): domain_name = self._config['orders'][index]['domains'][0] domain_alt_names = [] if len(self._config['orders'][index]['domains']) > 1: domain_alt_names = self._config['orders'][index]['domains'][1:] return domain_name, domain_alt_names # 检查DNS记录 def check_dns(self, domain, value, s_type='TXT'): write_log('|-Attempt to verify DNS records locally, ' 'domain name: {}, type: {} record value: {}'.format(domain, s_type, value)) time.sleep(10) n = 0 while n < 20: n += 1 try: import dns.resolver ns = dns.resolver.query(domain, s_type) for j in ns.response.answer: for i in j.items: txt_value = i.to_text().replace('"', '').strip() write_log('|-Number of verifications: {}, value: {}'.format(n, txt_value)) if txt_value == value: write_log("|-Local authentication succeeded!") return True except: try: import dns.resolver except: return False time.sleep(3) write_log("|-Local authentication failed!") return True # 创建CSR # noinspection PyUnresolvedReferences def create_csr(self, index): if 'csr' in self._config['orders'][index]: return self._config['orders']['csr'] domain_name, domain_alt_names = self.get_alt_names(index) X509Req = OpenSSL.crypto.X509Req() X509Req.get_subject().CN = domain_name if domain_alt_names: SAN = "DNS:{0}, ".format(domain_name).encode("utf8") + ", ".join( "DNS:" + i for i in domain_alt_names ).encode("utf8") else: SAN = "DNS:{0}".format(domain_name).encode("utf8") X509Req.add_extensions( [ OpenSSL.crypto.X509Extension( "subjectAltName".encode("utf8"), critical=False, value=SAN ) ] ) pk = OpenSSL.crypto.load_privatekey( OpenSSL.crypto.FILETYPE_PEM, self.create_certificate_key( index).encode() ) X509Req.set_pubkey(pk) try: X509Req.set_version(2) except ValueError as e: # pyOpenSSL 新版本需要必须设置版本为0 X509Req.set_version(0) X509Req.sign(pk, self._digest) return OpenSSL.crypto.dump_certificate_request(OpenSSL.crypto.FILETYPE_ASN1, X509Req) def create_csr_new(self, index): from cryptography import x509 from cryptography.x509.oid import NameOID from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.serialization import load_pem_private_key, Encoding # 如果已经生成了 CSR,直接返回 if 'csr' in self._config['orders'][index]: return self._config['orders'][index]['csr'] # 获取域名和备用域名 domain_name, domain_alt_names = self.get_alt_names(index) # 创建X509请求对象 csr_builder = x509.CertificateSigningRequestBuilder() csr_builder = csr_builder.subject_name( x509.Name([ x509.NameAttribute(NameOID.COMMON_NAME, domain_name) ]) ) # 添加 SubjectAltName 扩展 san_list = [x509.DNSName(domain_name)] if domain_alt_names: san_list.extend([x509.DNSName(name) for name in domain_alt_names]) csr_builder = csr_builder.add_extension( x509.SubjectAlternativeName(san_list), critical=False ) # 生成私钥 pk = self.create_certificate_key(index).encode() private_key = load_pem_private_key(pk, password=None) # 签署 CSR csr = csr_builder.sign(private_key, hashes.SHA256()) # 返回 CSR (ASN1 格式) return csr.public_bytes(Encoding.DER) # 构造域名验证头和验证值 def get_keyauthorization(self, token): acme_header_jwk_json = json.dumps( self.get_acme_header("GET_THUMBPRINT")["jwk"], sort_keys=True, separators=(",", ":") ) acme_thumbprint = self.calculate_safe_base64( hashlib.sha256(acme_header_jwk_json.encode("utf8")).digest() ) acme_keyauthorization = "{0}.{1}".format(token, acme_thumbprint) base64_of_acme_keyauthorization = self.calculate_safe_base64( hashlib.sha256(acme_keyauthorization.encode("utf8")).digest() ) return acme_keyauthorization, base64_of_acme_keyauthorization # 构造验证信息 def get_identifier_auth(self, index, url, auth_info): s_type = self.get_auth_type(index) write_log('|-Verification type: {}'.format(s_type)) domain = auth_info['identifier']['value'] wildcard = False # 处理通配符 if 'wildcard' in auth_info: wildcard = auth_info['wildcard'] if wildcard: domain = "*." + domain for auth in auth_info['challenges']: if auth['type'] != s_type: continue identifier_auth = { "domain": domain, "url": url, "wildcard": wildcard, "token": auth['token'], "dns_challenge_url": auth['url'], } return identifier_auth return None # 获取域名验证方式 def get_auth_type(self, index): if not index in self._config['orders']: raise Exception(public.lang("The specified order does not exist!")) s_type = 'http-01' if 'auth_type' in self._config['orders'][index]: if self._config['orders'][index]['auth_type'] == 'dns': s_type = 'dns-01' elif self._config['orders'][index]['auth_type'] == 'tls': s_type = 'tls-alpn-01' else: s_type = 'http-01' return s_type # 保存订单 def save_order(self, order_object, index): if not 'orders' in self._config: self._config['orders'] = {} renew = False if not index: # index = public.md5(json.dumps(order_object['identifiers'])) index = public.md5(str(uuid.uuid4())) else: renew = True order_object['certificate_url'] = self._config['orders'][index]['certificate_url'] order_object['save_path'] = self._config['orders'][index]['save_path'] order_object['expires'] = self.utc_to_time(order_object['expires']) self._config['orders'][index] = order_object self._config['orders'][index]['index'] = index if not renew: self._config['orders'][index]['create_time'] = int(time.time()) self._config['orders'][index]['renew_time'] = 0 self.save_config() return index # UTC时间转时间戳 def utc_to_time(self, utc_string): try: utc_string = utc_string.split('.')[0] utc_date = datetime.datetime.strptime( utc_string, "%Y-%m-%dT%H:%M:%S") # 按北京时间返回 return int(time.mktime(utc_date.timetuple())) + (3600 * 8) except: return int(time.time() + 86400 * 7) # 获取kid def get_kid(self, force=False): # 如果配置文件中不存在kid或force = True时则重新注册新的acme帐户 if not 'account' in self._config: self._config['account'] = {} k = self._mod_index[self._debug] if not k in self._config['account']: self._config['account'][k] = {} if not 'kid' in self._config['account'][k]: self._config['account'][k]['kid'] = self.register() self.save_config() time.sleep(3) self._config = self.read_config() return self._config['account'][k]['kid'] # 注册acme帐户 def register(self, existing=False): if not 'email' in self._config: self._config['email'] = 'demo@aapanel.com' if existing: payload = {"onlyReturnExisting": True} elif self._config['email']: payload = { "termsOfServiceAgreed": True, "contact": ["mailto:{0}".format(self._config['email'])], } else: payload = {"termsOfServiceAgreed": True} res = self.acme_request(url=self._apis['newAccount'], payload=payload) if res.status_code not in [201, 200, 409]: raise Exception(public.get_msg_gettext('Registration for ACME account failed: {}', (str(res.json()),))) kid = res.headers["Location"] return kid # 请求到ACME接口 def acme_request(self, url, payload): headers = {"User-Agent": self._user_agent} payload = self.stringfy_items(payload) if payload == "": payload64 = payload else: payload64 = self.calculate_safe_base64(json.dumps(payload)) protected = self.get_acme_header(url) protected64 = self.calculate_safe_base64(json.dumps(protected)) signature = self.sign_message( message="{0}.{1}".format(protected64, payload64) ) # bytes # signature = self.sign_message_new( # message="{0}.{1}".format(protected64, payload64) # ) # bytes signature64 = self.calculate_safe_base64(signature) # str data = json.dumps( {"protected": protected64, "payload": payload64, "signature": signature64} ) headers.update({"Content-Type": "application/jose+json"}) response = requests.post( url, data=data.encode("utf8"), timeout=self._acme_timeout, headers=headers, verify=self._verify, s_type=self._request_type ) # 更新随机数 self.update_replay_nonce(response) return response # 计算signature def sign_message(self, message): try: pk = OpenSSL.crypto.load_privatekey( OpenSSL.crypto.FILETYPE_PEM, self.get_account_key().encode()) return OpenSSL.crypto.sign(pk, message.encode("utf8"), self._digest) except: return self.sign_message_new(message) def sign_message_new(self, message): from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import padding import ssl_info pk = ssl_info.ssl_info().analysis_private_key(self.get_account_key()) return pk.sign(message.encode("utf8"), padding.PKCS1v15(), hashes.SHA256()) # 系列化payload def stringfy_items(self, payload): if isinstance(payload, str): return payload for k, v in payload.items(): if isinstance(k, bytes): k = k.decode("utf-8") if isinstance(v, bytes): v = v.decode("utf-8") payload[k] = v return payload # 获取随机数 def get_nonce(self, force=False): # 如果没有保存上一次的随机数或force=True时则重新获取新的随机数 if not self._replay_nonce or force: headers = {"User-Agent": self._user_agent} response = requests.get( self._apis['newNonce'], timeout=self._acme_timeout, headers=headers, verify=self._verify, s_type=self._request_type ) self._replay_nonce = response.headers["Replay-Nonce"] return self._replay_nonce # 获请ACME请求头 def get_acme_header(self, url): header = {"alg": "RS256", "nonce": self.get_nonce(), "url": url} if url in [self._apis['newAccount'], 'GET_THUMBPRINT']: from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization private_key = serialization.load_pem_private_key( self.get_account_key().encode(), password=None, backend=default_backend(), ) public_key_public_numbers = private_key.public_key().public_numbers() exponent = "{0:x}".format(public_key_public_numbers.e) exponent = "0{0}".format(exponent) if len( exponent) % 2 else exponent modulus = "{0:x}".format(public_key_public_numbers.n) jwk = { "kty": "RSA", "e": self.calculate_safe_base64(binascii.unhexlify(exponent)), "n": self.calculate_safe_base64(binascii.unhexlify(modulus)), } header["jwk"] = jwk else: header["kid"] = self.get_kid() return header # 转为无填充的Base64 def calculate_safe_base64(self, un_encoded_data): if sys.version_info[0] == 3: if isinstance(un_encoded_data, str): un_encoded_data = un_encoded_data.encode("utf8") r = base64.urlsafe_b64encode(un_encoded_data).rstrip(b"=") return r.decode("utf8") # 获用户取密钥对 def get_account_key(self): if not 'account' in self._config: self._config['account'] = {} k = self._mod_index[self._debug] if not k in self._config['account']: self._config['account'][k] = {} if not 'key' in self._config['account'][k]: self._config['account'][k]['key'] = self.create_key() if type(self._config['account'][k]['key']) == bytes: self._config['account'][k]['key'] = self._config['account'][k]['key'].decode() self.save_config() return self._config['account'][k]['key'] # 获取证书密钥对 def create_certificate_key(self, index): # 判断是否已经创建private_key if 'private_key' in self._config['orders'][index]: return self._config['orders'][index]['private_key'] # 创建新的私钥 private_key = self.create_key() if type(private_key) == bytes: private_key = private_key.decode() # 保存私钥到订单配置文件 self._config['orders'][index]['private_key'] = private_key self.save_config() return private_key # 创建Key def create_key(self, key_type=OpenSSL.crypto.TYPE_RSA): key = OpenSSL.crypto.PKey() key.generate_key(key_type, self._bits) private_key = OpenSSL.crypto.dump_privatekey( OpenSSL.crypto.FILETYPE_PEM, key) return private_key def create_key_new(self, key_type): from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import rsa private_key = rsa.generate_private_key( public_exponent=65537, # 公共指数 key_size=self._bits, # 密钥大小(2048位) ) private_key_pem = private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() ) return private_key_pem.decode() # 写配置文件 def save_config(self): fp = open(self._conf_file_v2, 'w+') fcntl.flock(fp, fcntl.LOCK_EX) # 加锁 fp.write(json.dumps(self._config)) fcntl.flock(fp, fcntl.LOCK_UN) # 解锁 fp.close() return True # 读配置文件 def read_config(self): if not os.path.exists(self._conf_file_v2): self._config['orders'] = {} self._config['account'] = {} self._config['apis'] = {} self._config['email'] = public.M('config').where('id=?', (1,)).getField('email') if self._config['email'] in [public.en_hexb('4d6a67334f5459794e545932514846784c6d4e7662513d3d')]: self._config['email'] = None self.save_config() return self._config tmp_config = public.readFile(self._conf_file_v2) if not tmp_config: return self._config try: self._config = json.loads(tmp_config) except: self.save_config() return self._config return self._config def check_config(self, index, key, value) -> bool: tmp_config = public.readFile(self._conf_file_v2) if not tmp_config: return False try: config_data = json.loads(tmp_config) except: return False if index not in config_data.get("orders", {}): return False if key not in config_data['orders'][index]: return False if config_data['orders'][index][key] == value: return True else: return False # 申请证书 def apply_cert(self, domains, auth_type='dns', auth_to='Dns_com|None|None', **args): index = '' write_log("", "wb+") try: self.get_apis() index = None if 'index' in args: index = args['index'] if not index: # 判断是否只想验证域名 write_log(public.lang("|-Creating order..")) index = self.create_order(domains, auth_type, auth_to) write_log(public.lang("|-Getting verification information..")) self.get_auths(index) # add dns record or file record if auth_to == 'dns' and len(self._config['orders'][index]['auths']) > 0: auth_domains = [i["domain"].replace("*.", "") for i in self._config['orders'][index]['auths']] if len(auth_domains) != len(set(auth_domains)): self._config['orders'][index]["error"] = True self._config['orders'][index]["error_msg"] = ( "Conflicts in resolution records have been detected. " "Please verify the following domain names separately." ) return self._config['orders'][index] write_log(public.lang("|-Verifying domain name..")) self.auth_domain(index) self.remove_dns_record() write_log(public.lang("|-Sending CSR..")) self.send_csr(index) write_log(public.lang("|-Downloading certificate..")) cert = self.download_cert(index) cert['status'] = True cert['msg'] = public.lang("Application successful!") write_log(public.lang("|-Successful application, deploying to site..")) return cert except Exception as ex: self.remove_dns_record() ex = str(ex) if ex.find(">>>>") != -1: msg = ex.split(">>>>") msg[1] = json.loads(msg[1]) else: msg = ex write_log(public.get_error_info()) _res = {"status": False, "msg": msg, "index": index} return _res # 申请证书 - api def apply_cert_api(self, args): """ @name 申请证书 @param args.domains: list 域名列表 @param args.auth_type: str 认证方式 @param args.auth_to: str 认证路径 @param args.auto_wildcard: str 是否自动组合泛域名 """ if not 'id' in args: return public.return_msg_gettext(False, public.lang("Website ID cannot be empty!")) if 'auto_wildcard' in args and args.auto_wildcard == '1': self._auto_wildcard = True find = public.M('sites').where('id=?', (args.id,)).find() if not find: return public.return_msg_gettext(False, public.lang("Website lost, unable to continue applying for certificate")) if args.auth_type in ['http', 'tls']: if find["status"] != "1": return public.return_msg_gettext(False, "The current website is not enabled, so file verification cannot be used") if not self.can_use_base_file_check(find["name"], find["project_type"]): webserver: str = public.get_webserver() msg = public.lang("The service ({}) configuration file of the current project has been modified and " "does not support file verification. Please choose another method or restore" " the configuration file".format(webserver.title())) if webserver != 'nginx': return public.return_msg_gettext(False, msg) # nginx 检测其他两种方案的可行性 if not self.can_use_lua_for_site(find["name"], find["project_type"]) and \ not self.can_use_if_for_file_check(find["name"], find["project_type"]): return public.return_msg_gettext(False, msg) # 是否为指定站点 count = public.M('sites').where( 'id=? and project_type in (?,?,?,?)', (args.id, 'Java', 'Go', 'Other', "Python") ).count() if count: try: project_info = json.loads(find['project_config']) if 'ssl_path' not in project_info: ssl_path = '/www/wwwroot/java_node_ssl' else: ssl_path = project_info['ssl_path'] if not os.path.exists(ssl_path): os.makedirs(ssl_path) args.auth_to = ssl_path except: return public.return_msg_gettext(False, public.lang( "There is an issue with the current project configuration file, please rebuild it")) else: if re.match(r"^\d+$", args.auth_to): import panelSite args.auth_to = find['path'] + '/' + panelSite.panelSite().GetRunPath(args) args.auth_to = args.auth_to.replace("//", "/") if args.auth_to[-1] == '/': args.auth_to = args.auth_to[:-1] if not os.path.exists(args.auth_to): return public.return_msg_gettext(False, public.lang( "Invalid site directory, please check if the specified site exists!")) # 检查认证环境 if args.auth_type in ['http', 'tls']: check_result = self.check_auth_env(args) if check_result: return check_result return self.apply_cert(json.loads(args.domains), args.auth_type, args.auth_to) # 检查认证环境 def check_auth_env(self, args): for domain in json.loads(args.domains): if public.checkIp(domain): continue if domain.find('*.') != -1 and args.auth_type in ['http', 'tls']: return public.return_msg_gettext( False, public.lang( 'Wildcard domain names cannot use the "file verification" method to apply for certificates!') ) data = public.M('sites').where('id=?', (args.id,)).find() if not data: return public.return_msg_gettext( False, public.lang("The website is missing and it's impossible to continue applying for the certificate.") ) else: args.siteName = data['name'] site_type = data["project_type"] use_nginx_conf_to_auth = False if args.auth_type in ['http', 'tls'] and public.get_webserver() == "nginx": # nginx 在lua验证和可重启的 if self.can_use_lua_for_site(args.siteName, site_type): use_nginx_conf_to_auth = True else: if self.can_use_if_for_file_check(args.siteName, site_type): use_nginx_conf_to_auth = True import panelSite s = panelSite.panelSite() if args.auth_type in ['http', 'tls'] and use_nginx_conf_to_auth is False: try: args.sitename = args.siteName data = s.GetRedirectList(args) # 检查重定向是否开启 if type(data) == list: for x in data: if x['type']: return public.return_msg_gettext(False, public.lang('SITE_SSL_ERR_301')) data = s.GetProxyList(args) # 检查反向代理是否开启 if type(data) == list: for x in data: if x['type']: return public.return_msg_gettext( False, public.lang('Sites with reverse proxy enabled cannot apply for SSL!') ) # 检查旧重定向是否开启 data = s.Get301Status(args) if data['status']: return public.return_msg_gettext(False, public.lang('The website has already enabled redirection.' ' Please turn it off before applying again!')) # 判断是否强制HTTPS if s.IsToHttps(args.siteName): return public.return_msg_gettext(False, public.lang('After configuring mandatory HTTPS ' 'the "file verification" method cannot ' 'be used to apply for certificates!')) except: return False else: if args.auth_to.find('Dns_com') != -1: if not os.path.exists('plugin/dns/dns_main.py'): return public.return_msg_gettext(False, public.lang( 'Please go to the software store to install [Cloud Resolution] ' 'first and complete the domain name NS binding.') ) return False # DNS手动验证 def apply_dns_auth(self, args): if not hasattr(args, "index") or not args.index: return public.return_msg_gettext(False, public.lang( "Incomplete parameter information, no index parameter [index]")) return self.apply_cert([], auth_type='dns', auth_to='dns', index=args.index) # 创建计划任务 def set_crond(self): try: echo = public.md5(public.md5('renew_lets_ssl_bt')) find = public.M('crontab').where('echo=?', (echo,)).find() cron_id = find['id'] if find else None import crontab import random args_obj = public.dict_obj() if not cron_id: cronPath = public.GetConfigValue('setup_path') + '/cron/' + echo shell = '{} -u /www/server/panel/class/acme_v2.py --renew_v2=1'.format(sys.executable) public.writeFile(cronPath, shell) # 使用随机时间 hour = random.randint(0, 23) minute = random.randint(1, 59) args_obj.id = public.M('crontab').add( 'name,type,where1,where_hour,where_minute,echo,addtime,status,save,backupTo,sType,sName,sBody,urladdress', ("Renew Let's Encrypt Certificate", 'day', '', hour, minute, echo, time.strftime('%Y-%m-%d %X', time.localtime()), 0, '', 'localhost', 'toShell', '', shell, '')) crontab.crontab().set_cron_status(args_obj) else: # 检查任务如果是0点10分执行,改为随机时间 if find['where_hour'] == 0 and find['where_minute'] == 10: # 使用随机时间 hour = random.randint(0, 23) minute = random.randint(1, 59) public.M('crontab').where('id=?', (cron_id,)).save('where_hour,where_minute,status', (hour, minute, 0)) # 停用任务 args_obj.id = cron_id crontab.crontab().set_cron_status(args_obj) # 启用任务 public.M('crontab').where('id=?', (cron_id,)).setField('status', 1) crontab.crontab().set_cron_status(args_obj) cron_path = public.get_cron_path() if os.path.exists(cron_path): cron_s = public.readFile(cron_path) if cron_s.find(echo) == -1: public.M('crontab').where('id=?', (cron_id,)).setField('status', 0) args_obj.id = cron_id crontab.crontab().set_cron_status(args_obj) except: pass # 获取当前正在使用此证书的网站目录 def get_ssl_used_site(self, index): hash_dic = self.get_exclude_hash(public.dict_obj()) if not hash_dic: return False ssl_hash = hash_dic.get("exclude_hash_let", {}).get(index, '') if not ssl_hash: return False cert_paths = 'vhost/cert' import panelSite args = public.dict_obj() args.siteName = '' for c_name in os.listdir(cert_paths): skey_file = '{}/{}/fullchain.pem'.format(cert_paths, c_name) try: skey = self._hash(cert_filename=skey_file) except: continue if not skey: continue if skey == ssl_hash: args.siteName = c_name site_info = public.M('sites').where('name=?', c_name).find() if not site_info or isinstance(site_info, str): return False if site_info["project_type"] not in ("PHP", "proxy"): if not os.path.isdir(site_info["path"]): return os.path.dirname(site_info["path"]) else: return site_info["path"] run_path = panelSite.panelSite().GetRunPath(args) if not run_path: continue sitePath = public.M('sites').where('name=?', c_name).getField('path') if not sitePath: continue to_path = "{}/{}".format(sitePath, run_path) to_path = to_path.replace("//", "/") return to_path return False def get_index(self, domains): ''' @name 获取标识 @author hwliang<2022-02-10> @param domains<list> 域名列表 @return string ''' identifiers = [] for domain_name in domains: identifiers.append({"type": 'dns', "value": domain_name}) return public.md5(json.dumps(identifiers)) # 续签同品牌其它证书 def renew_cert_other(self): ''' @name 续签同品牌其它证书 @author hwliang<2022-02-10> @return void ''' cert_path = "{}/vhost/cert".format(public.get_panel_path()) if not os.path.exists(cert_path): return new_time = time.time() + (86400 * 30) n = 0 if not 'orders' in self._config: self._config['orders'] = {} import panelSite siteObj = panelSite.panelSite() args = public.dict_obj() for siteName in os.listdir(cert_path): try: cert_file = '{}/{}/fullchain.pem'.format(cert_path, siteName) if not os.path.exists(cert_file): continue # 无证书文件 siteInfo = public.M('sites').where('name=?', siteName).find() if not siteInfo: continue # 无网站信息 cert_init = self.get_cert_init(cert_file) if not cert_init: continue # 无法获取证书 end_time = time.mktime(time.strptime(cert_init['notAfter'], '%Y-%m-%d')) if end_time > new_time: continue # 未到期 try: if not cert_init['issuer'] in ['R3', "Let's Encrypt"] and cert_init['issuer'].find( "Let's Encrypt") == -1 and cert_init.get('issuer_O', "") != "Let's Encrypt": continue # 非同品牌证书 except: continue if isinstance(cert_init['dns'], str): cert_init['dns'] = [cert_init['dns']] index = self.get_index(cert_init['dns']) if index in self._config['orders'].keys(): continue # 已在订单列表 n += 1 write_log("|-Renewing additional certificate {}, domain name:{}..".format(n, cert_init['subject'])) write_log("|-Creating order..") args.id = siteInfo['id'] runPath = siteObj.GetRunPath(args) if runPath and not runPath in ['/']: path = siteInfo['path'] + '/' + runPath else: path = siteInfo['path'] self.renew_cert_to(cert_init['dns'], 'http', path.replace('//', '/')) except: write_log("|-Renewal failed:") # 关闭强制https def close_httptohttps(self, siteName): try: if not siteName: siteName import panelSite site_obj = panelSite.panelSite() if not site_obj.IsToHttps(siteName): return False get = public.dict_obj() get.siteName = siteName site_obj.CloseToHttps(get) return True except: return False # 恢复强制https def rep_httptohttps(self, siteName): try: if not siteName: return False import panelSite site_obj = panelSite.panelSite() if not site_obj.IsToHttps(siteName): get = public.dict_obj() get.siteName = siteName site_obj.HttpToHttps(get) return True except: return False # 设置自动续签状态 def set_auto_renew_status(self, index, status, error_msg=''): """ @name 设置自动续签状态 @param index: @param status: @param error_msg: @return: """ path = "{}/config/letsencrypt_auto_renew.json".format(public.get_panel_path()) try: data = json.loads(public.readFile(path)) data[index] = {"status": status, "error_msg": error_msg} except: data = {index: {"status": status, "error_msg": error_msg}} public.writeFile(path, json.dumps(data)) # 续签 def renew_cert_to(self, domains, auth_type, auth_to, index=None): siteName = None if os.path.exists(auth_to): if public.M('sites').where('path=?', auth_to).count() == 1: site_id = public.M('sites').where('path=?', auth_to).getField('id') siteName = public.M('sites').where('path=?', auth_to).getField('name') r_status = public.M('sites').where('path=?', auth_to).getField('status') if r_status != '1': write_log( "|- This certificate uses the【file verification】method," "but the website【{}】has not been started," "so the renewal can only be skipped。".format(siteName) ) return public.return_msg_gettext(False, public.lang( "|- This certificate uses the【file verification】method," "but the website【{}】has not been started," "so the renewal can only be skipped。".format(siteName)) ) import panelSite siteObj = panelSite.panelSite() args = public.dict_obj() args.id = site_id runPath = siteObj.GetRunPath(args) if runPath and not runPath in ['/']: path = auth_to + '/' + runPath if os.path.exists(path): auth_to = path.replace('//', '/') else: siteName, _ = self.get_site_name_by_domains(domains) isError = public.checkWebConfig() if isError is not True and public.get_webserver() == "nginx": write_log( "|- The certificate uses the file verification method, but currently it cannot overload the nginx server configuration file and can only skip renewal.") write_log("|- The error message in the configuration file is as follows:") write_log(isError) is_rep = self.close_httptohttps(siteName) try: index = self.create_order( domains, auth_type, auth_to.replace('//', '/'), index ) write_log("|-Getting verification information..") self.get_auths(index) # add dns record or file record write_log("|-Verifying domain name..") self.auth_domain(index) write_log("|-Sending CSR..") self.remove_dns_record() self.send_csr(index) write_log("|-Downloading certificate..") cert = self.download_cert(index) self._config['orders'][index]['renew_time'] = int(time.time()) # 清理失败重试记录 self._config['orders'][index]['retry_count'] = 0 self._config['orders'][index]['next_retry_time'] = 0 # 保存证书配置 self.save_config() cert['status'] = True cert['msg'] = 'Renewed successfully!' write_log("|-Renewed successfully!!") except Exception as e: if str(e).find('please try again later') == -1: # 受其它证书影响和连接CA失败的的不记录重试次数 if index: # 设置下次重试时间 self._config['orders'][index]['next_retry_time'] = int(time.time() + (86400 * 2)) # 记录重试次数 if not 'retry_count' in self._config['orders'][index].keys(): self._config['orders'][index]['retry_count'] = 1 self._config['orders'][index]['retry_count'] += 1 # 保存证书配置 self.save_config() e = str(e) if e.find(">>>>") != -1: msg = e.split(">>>>")[0] err = json.loads(e.split(">>>>")[1]) else: msg = e err = {} write_log("|-" + msg) return {"status": False, "msg": msg, "err": err} finally: if is_rep: self.rep_httptohttps(siteName) write_log("-" * 70) return cert # 续签证书 def renew_cert(self, index, cycle=None): write_log("", "wb+") set_status = False index_info = None try: order_index = [] if index: set_status = True if type(index) != str: index = index.index if not index in self._config['orders']: write_log("|-指定订单号不存在,无法续签!") self.set_auto_renew_status(index, -1, "指定订单号不存在,无法续签!") raise Exception( public.lang("The specified order number does not exist and cannot be renewed!") ) if cycle: s_time = time.time() + (int(cycle) * 86400) if self._config['orders'][index]['cert_timeout'] > s_time: self.set_auto_renew_status( index, 0, "|-the expiration date is greater than{}days,skip the renewal!".format(cycle) ) write_log("|-过期时间大于{}天,跳过续签!".format(cycle)) return order_index.append(index) else: s_time = time.time() + (30 * 86400) if not 'orders' in self._config: self._config['orders'] = {} for i in self._config['orders'].keys(): if not 'save_path' in self._config['orders'][i]: continue if 'cert' in self._config['orders'][i]: self._config['orders'][i]['cert_timeout'] = self._config['orders'][i]['cert']['cert_timeout'] if not 'cert_timeout' in self._config['orders'][i]: self._config['orders'][i]['cert_timeout'] = int(time.time()) if self._config['orders'][i]['cert_timeout'] > s_time: continue # 已删除的网站直接跳过续签 _auth_to = self.get_ssl_used_site(i) if not _auth_to: continue # 域名不存在? for domain in self._config['orders'][i]['domains']: if domain.find('*') != -1: break if not public.M('domain').where("name=?", (domain,)).count() and not public.M('binding').where( "domain=?", domain).count(): _auth_to = None write_log("|-Skip deleted domain names: {}".format(self._config['orders'][i]['domains'])) if not _auth_to: continue self._config['orders'][i]['auth_to'] = _auth_to # 是否到了允许重试的时间 if 'next_retry_time' in self._config['orders'][i]: timeout = self._config['orders'][i]['next_retry_time'] - int(time.time()) if timeout > 0: write_log( '|-Skipping domain name: {} this time, due to last renewal failure, we need to wait {} hours before trying again'.format( self._config['orders'][i]['domains'], int(timeout / 60 / 60))) continue # 加入到续签订单 order_index.append(i) if not order_index: write_log("|-No SSL certificate found within 30 days!") self.get_apis() self.renew_cert_other() write_log("|-All tasks have been processed!") return write_log("|-A total of {} certificates need to be renewed".format(len(order_index))) n = 0 self.get_apis() cert = None err_msg = "" for index in order_index: n += 1 domains = _test_domains(self._config['orders'][index]['domains'], self._config['orders'][index]['auth_to'], self._config['orders'][index]['auth_type']) if len(domains) == 0: write_log( "|-The domain names under the {} certificate are all unused (these domains are: [%s]) and have been skipped.".format( n, ",".join(self._config['orders'][index]['domains']))) err_msg = "All domain names are unused and have been skipped." if set_status: self.set_auto_renew_status(index, -1, err_msg) continue else: index_info = self._config['orders'][index] self._config['orders'][index]['domains'] = domains write_log( '|-Renewing certificate number of {},domain: {}..'.format(n, str( self._config['orders'][index]['domains'])) ) write_log("|-Creating order..") cert = self.renew_cert_to(self._config['orders'][index]['domains'], self._config['orders'][index]['auth_type'], self._config['orders'][index]['auth_to'], index) err_msg = "Renewal failed!" if not cert: return public.return_msg_gettext(False, public.lang(err_msg)) if cert.get('status') is False and set_status: self.set_auto_renew_status(index, -1, cert.get('msg')) self._config['orders'][index] = index_info self.save_config() if (cert.get('status') is None or cert.get('status') is True) and set_status: self.set_auto_renew_status(index, 1, "The renewal was successful!") return cert except Exception as ex: self.remove_dns_record() ex = str(ex) if ex.find(">>>>") != -1: msg = ex.split(">>>>") msg[1] = json.loads(msg[1]) else: msg = ex write_log(public.get_error_info()) if set_status: self.set_auto_renew_status(index, -1, msg) if index_info: self._config['orders'][index] = index_info self.save_config() return public.return_msg_gettext(False, public.lang(msg)) # 强制改走v2 def renew_cert_v2(self, index, cycle=30): if not cycle: cycle = 30 cycle = int(cycle) if index: hash_list = [index] else: # 获取所有网站证书 from sslModel import certModel certModel = certModel.main() use_cert_list = certModel.get_cert_to_site(True) hash_list = use_cert_list.keys() s = 0 from sslModel import base for ssl_hash in hash_list: s += 1 write_log(f"|-Renewing the {s} certificate,There are {len(hash_list)} certificates in total...") cert_data = public.M('ssl_info').where('hash=?', ssl_hash).find() if not cert_data: write_log( "|-【{}】The specified certificate information was not found" " and the renewal cannot be carried out!".format(ssl_hash) ) continue try: cert_info = json.loads(cert_data['info']) auth_info = json.loads(cert_data['auth_info']) except: write_log(public.get_error_info()) write_log( "|-【{}】The format of the certificate information is incorrect and" " renewal is not possible. Please try to renew it manually!".format(ssl_hash) ) continue if cert_info.get('issuer') not in ("R3", "R8", "R11", "R10", "R5") and cert_info.get( 'issuer_O') != "Let's Encrypt": write_log("|-【{}】It's not a Let's Encrypt certificate and cannot be renewed!".format(ssl_hash)) continue # 计算 30 天后的日期 future_date = (datetime.datetime.now().date() + datetime.timedelta(days=cycle)).strftime('%Y-%m-%d') if future_date < cert_info['notAfter']: write_log( "|-【{}】The expiration date is greater than {} days," " so there is no need for renewal!".format(ssl_hash, cycle) ) continue # 判断是否有泛域名 wildcard = False if "*" in ",".join(cert_info['dns']): write_log( f"|-【{ssl_hash}】 with domain【{cert_data.get('subject', '')}】There is a wildcard domain name, " f"and only the DNS verification method can be used for renewal!" ) wildcard = True auth_domains = [] dns_name, key, secret = "", "None", "None" try: if auth_info.get("auth_type") == "dns": # 判断是否绑定了dns-api dns_name, key, secret = self.get_dnsapi(auth_info.get("auth_to", "")) except Exception as e: public.print_log(f"find dns api info error: %s" % e) for i in cert_info['dns']: if dns_name and key != "None" and secret != "None": auth_domains.append(i) else: root_domain, _, _ = base.sslBase().extract_zone(i) write_log( "|-The root domain name【{}】is not bound to the dns-api, " "Skip the domain name: {}!".format(root_domain, i) ) continue if not auth_domains: write_log( "|-【{}】None of the domain names are bound to the dns-api," " so the DNS verification renewal cannot be used.".format(ssl_hash) ) dns_auth = False if wildcard: continue else: dns_auth = True # dns api auth if set(auth_domains) == set(cert_info['dns']): write_log( "|-【{}】All domain names have been bound to the dns-api. " "An attempt is being made to use the DNS verification method for renewal.".format(ssl_hash) ) self.get_apis() cert = self.renew_cert_to(domains=auth_domains, auth_type="dns", auth_to=f"{dns_name}|{key}|{secret}") if cert.get('status') is False: continue continue else: http_auth = False domains = "" site_info = {} sites = {} write_log("|-【{}】Checking whether file verification is available.".format(ssl_hash)) for domain in cert_info['dns']: domain_info = public.M('domain').where("name=?", domain).find() if not domain_info: write_log("|-The domain name【{}】does not exist. Skip it!".format(domain)) continue if not sites.get(domain_info["pid"]): sites[domain_info["pid"]] = [domain] else: sites[domain_info["pid"]].append(domain) if not sites: write_log( "|-【{}】No available file verification sites were found. " "Please try to renew it manually.".format(ssl_hash) ) # 暂时不做多网站文件验证 if len(sites.keys()) > 1: write_log( "|-【{}】It has been detected that the verification domain names " "are scattered across multiple sites. " "Multiple-site file verification is currently not supported!".format(ssl_hash) ) for site_id, domains in sites.items(): site_info = public.M('sites').where("id=?", site_id).find() if not site_info: write_log("|-site【{}】do not exist. Skip it!".format(site_id)) break if set(domains) == set(cert_info['dns']) or len(domains) > len(auth_domains): http_auth = True break if http_auth and domains and site_info: # http auth write_log("|-【{}】Trying to use file verification for renewal!".format(ssl_hash)) self.get_apis() cert = self.renew_cert_to(domains=domains, auth_type="http", auth_to=site_info['path']) if cert.get('status') is False: continue continue elif dns_auth: # dns auth write_log("|-【{}】Trying to use DNS verification for renewal!".format(ssl_hash)) self.get_apis() cert = self.renew_cert_to(domains=auth_domains, auth_type="dns", auth_to="dns-api") if cert.get('status') is False: continue continue else: write_log( "|-【{}】No available verification methods were found." " Please try to renew it manually.".format(ssl_hash) ) continue return def get_order_list(self, get): """ 获取订单列表 """ self.get_exclude_hash(get) data = self.read_config() if not data.get('orders'): return [] del_list = [] _return = [] orders = list(data['orders'].values()) for i in range(len(orders) - 1, -1, -1): if orders[i]['status'] == "valid" and (not orders[i].get("save_path") or not orders[i].get("cert_timeout")): del_list.append(orders[i]['index']) del orders[i] continue if orders[i]['status'] == "valid": orders[i]['expires'] = orders[i]['cert_timeout'] if orders[i]['status'] == "ready": orders[i]['status'] = "pending" try: end_time = int( (orders[i]['expires'] - datetime.datetime.today().timestamp()) / (60 * 60 * 24) ) except Exception as e: end_time = 90 orders[i]['endDay'] = end_time try: arg = public.to_dict_obj({"index": ",".join(del_list), "d": "1"}) self.delete_order(arg) except: pass return orders def get_order_detail(self, get): """ 订单详情 """ order_index = get.index orders = self.read_config() data = orders["orders"].get(order_index) if not data: return public.return_msg_gettext(False, public.lang("No information about this order has been found.")) if not data.get('auths'): if not data.get('authorizations'): return public.return_msg_gettext(False, public.lang( "The order verification information has been lost. Please try to apply again!")) try: self.get_apis() data['auths'] = [] for auth_url in data['authorizations']: res = self.acme_request(auth_url, "") if res.status_code not in [200, 201]: return public.return_msg_gettext(False, public.lang( "The order verification information has been lost. Please try to apply again!")) s_body = res.json() identifier_auth = self.get_identifier_auth(order_index, auth_url, s_body) acme_keyauthorization, auth_value = self.get_keyauthorization( identifier_auth['token']) identifier_auth['acme_keyauthorization'] = acme_keyauthorization identifier_auth['auth_value'] = auth_value identifier_auth['expires'] = s_body['expires'] identifier_auth['auth_to'] = self._config['orders'][order_index]['auth_to'] identifier_auth['type'] = self._config['orders'][order_index]['auth_type'] data['auths'].append(identifier_auth) self.save_config() except: return public.return_msg_gettext(False, public.lang( "The order verification information has been lost. Please try to apply again!")) endtime = ((data.get('expires', 0) or 0) - datetime.datetime.today().timestamp()) / (60 * 60 * 24) if endtime <= 0: return public.return_msg_gettext(False, public.lang("The order has expired. Please apply again!")) _return = {"auths": []} if data['auth_type'] == 'dns': auth_domains = [i["domain"].replace("*.", "") for i in data['auths']] if len(auth_domains) != len(set(auth_domains)): _return["error"] = True _return["error_msg"] = ("Conflicts in the parsing records have been detected. " "Please verify the following domain names respectively.") for auth in data['auths']: domain = auth['domain'] domains = auth['domain'].split('.') if domains[0] == '*': domain = ".".join(domains[1:]) _return["auths"].append({ "domain": auth['domain'], "status": auth.get('status', "pending"), "data": [{ "domain": "_acme-challenge.{}".format(domain), "auth_value": auth["auth_value"], "type": "TXT", "must": "YES" }, { "domain": domain, "auth_value": '0 issue "letsencrypt.org"', "type": "CAA", "must": "NO" }] }) return _return else: for auth in data['auths']: domain = auth['domain'] _return["auths"].append({ "domain": domain, "data": [{ "domain": auth['domain'], "file_path": "{}.well-known/acme-challenge/{}".format(auth['auth_to'], auth['token']), "content": auth['acme_keyauthorization'], "must": "YES" }] }) return _return def validate_domain(self, get): order_index = get.index data = self.read_config() data = data["orders"].get(order_index) if not data: return public.return_msg_gettext(False, public.lang("The information of this order was not found.")) if not data.get('auths'): return public.return_msg_gettext(False, public.lang( "The order verification information is lost. Please try to apply again.")) endtime = ((data.get('expires', 0) or 0) - datetime.datetime.today().timestamp()) / (60 * 60 * 24) if endtime <= 0: return public.return_msg_gettext(False, public.lang("The order has expired. Please apply again.")) return self.apply_cert([], "dns", "dns", index=order_index) def delete_order(self, get): return self._delete_order(get)["finish_list"][0] def _delete_order(self, get): from sslModel import certModel certModel = certModel.main() if ('index' not in get or not get.index) and ('ssl_hash' not in get or not get.ssl_hash): return {'status': False, 'msg': "Required parameters are missing.", 'finish_list': []} # 强制删除已经部署的证书 force = False if hasattr(get, 'force'): force = get.force local = True if 'local' not in get else get.local cloud = True if 'cloud' not in get else get.cloud path = '{}/data/exclude_hash.json'.format(public.get_panel_path()) exclude_data = self.get_exclude_hash(get) # 组合删除数据 del_data = [] if 'index' in get and get.index: index_list = get.index.split(',') for index in index_list: ssl_hash = exclude_data.get("exclude_hash_let", {}).get(index) del_data.append({"index": index, "ssl_hash": ssl_hash}) if 'ssl_hash' in get and get.ssl_hash: ssl_hash_list = get.ssl_hash.split(',') for ssl_hash in ssl_hash_list: append_data = {"index": "", "ssl_hash": ssl_hash} for index, value in exclude_data.get("exclude_hash_let", {}).items(): if value == ssl_hash: append_data["index"] = index break del_data.append(append_data) data = self.read_config() finish_list = [] for d in del_data: finish = {"name": "let's Encrypt"} # 删除本地证书 if d['ssl_hash']: try: certModel.remove_cert(ssl_hash=d['ssl_hash'], local=local, cloud=cloud, force=force) finish["status"] = True finish["msg"] = "del successfully." except Exception as e: finish["status"] = False finish['msg'] = "Del failed:{}".format(str(e)) finish_list.append(finish) continue # 删除订单 if d['index'] and d['index'] in data['orders'] and local: try: if d['index'] in exclude_data["exclude_hash_let"]: del exclude_data["exclude_hash_let"][d['index']] del data['orders'][d['index']] finish["status"] = True finish["msg"] = "del successfully." except Exception as e: finish['status'] = False finish['msg'] = "Del failed:{}".format(str(e)) finish_list.append(finish) public.writeFile(path, json.dumps(exclude_data)) public.writeFile(self._conf_file_v2, json.dumps(data)) if 'd' in get: return data return {'status': True, 'msg': "删除成功", 'finish_list': finish_list} def download_cert_to_local(self, get): index = get.index orders = self.read_config() order = orders['orders'].get(index) if not order: return public.return_msg_gettext(False, public.lang("Download failed. The order was not found.")) exclude_data = self.get_exclude_hash(get) ssl_hash = exclude_data.get("exclude_hash_let", {}).get(index) if not ssl_hash: return public.return_msg_gettext(False, public.lang( "The order is unfinished or the order information is incorrect. Download failed.")) from sslModel import certModel return certModel.main().download_cert(public.to_dict_obj({"ssl_hash": ssl_hash})) def SetCertToSite(self, get): import panelSSL exclude_data = self.get_exclude_hash(get) ssl_hash = exclude_data['exclude_hash_let'].get(get.index) if not ssl_hash: return public.return_msg_gettext(False, public.lang("This certificate was not found.")) get.ssl_hash = ssl_hash return panelSSL.panelSSL().SetCertToSite(get) def _test_domains(domains, auth_to, auth_type): # 检查站点域名变更情况, 若有删除域名,则在续签时,删除已经不使用的域名,再执行续签任务 # 是dns验证的跳过 if auth_to.find("|") != -1 or auth_to.startswith("dns#@"): return domains # 是泛域名的跳过 for domain in domains: if domain.find("*.") != -1: return domains sql = public.M('domain') site_sql = public.M('sites') for domain in domains: pid = sql.where('name=?', domain).getField('pid') if pid and site_sql.where('id=?', pid).find(): site_domains = [i["name"] for i in sql.where('pid=?', (pid,)).field("name").select()] break else: site_id = site_sql.where('path=?', auth_to).getField('id') if bool(site_id) and str(site_id).isdigit(): site_domains = [i["name"] for i in sql.where('pid=?', (site_id,)).field("name").select()] else: # 全都查询不到,认为这个站点已经被删除 return [] del_domains = list(set(domains) - set(site_domains)) for i in del_domains: domains.remove(i) return domains def echo_err(msg): write_log("\033[31m=" * 65) write_log("|-error: {}\033[0m".format(msg)) exit() # 写日志 def write_log(log_str, mode="ab+"): if __name__ == "__main__": print(log_str) return _log_file = 'logs/letsencrypt.log' f = open(_log_file, mode) log_str += "\n" f.write(log_str.encode('utf-8')) f.close() return True if __name__ == "__main__": import argparse p = argparse.ArgumentParser(usage=public.get_msg_gettext( 'Required parameters: --domain list of domain names, multiple separated by commas!') ) p.add_argument('--domain', default=None, help=public.lang("Please specify the domain name to apply for a certificate"), dest="domains") p.add_argument('--type', default=None, help=public.lang("Please specify verification type"), dest="auth_type") p.add_argument('--path', default=None, help=public.lang("Please specify the website document root"), dest="path") p.add_argument('--dnsapi', default=None, help=public.lang("Please specify DNSAPI"), dest="dnsapi") p.add_argument('--dns_key', default=None, help=public.lang("Please specify DNSAPI key"), dest="key") p.add_argument('--dns_secret', default=None, help=public.lang("Please specify DNSAPI secret"), dest="secret") p.add_argument('--index', default=None, help=public.lang("Specify the order index"), dest="index") p.add_argument('--renew', default=None, help=public.lang("renew certificate"), dest="renew") p.add_argument('--renew_v2', default=None, help=public.lang("renew certificate v2"), dest="renew_v2") p.add_argument('--revoke', default=None, help=public.lang("Revoke certificate"), dest="revoke") p.add_argument('--cycle', default=None, help=public.lang("Renew when the expiration time is lte certain time"), dest="cycle") args = p.parse_args() cert = None if args.revoke: if not args.index: echo_err( public.lang("Please enter the index of the order to be revoked in the --index parameter")) p = acme_v2() result = p.revoke_order(args.index) write_log(result) exit() if args.renew_v2 or args.renew: # force run v2 sys.path.append(public.get_panel_path()) p = acme_v2() if args.cycle: try: int(args.cycle) except: args.cycle = None p.renew_cert_v2(args.index, args.cycle) exit() else: try: if not args.index: if not args.domains: echo_err(public.get_msg_gettext( 'Please specify the domain name for which you want to apply for a certificate in the --domain parameter, multiple separated by commas (,)')) if not args.auth_type in ['http', 'tls', 'dns']: echo_err(public.get_msg_gettext( 'Please specify the correct authentication type in the --type parameter, supporting dns and http')) auth_to = '' if args.auth_type in ['http', 'tls']: if not args.path: echo_err( public.lang("Please specify the website document root in the --path parameter!")) if not os.path.exists(args.path): echo_err(public.get_msg_gettext('The specified site root does not exist, please check: {}', (args.path,))) auth_to = args.path else: if args.dnsapi == '0': auth_to = 'dns' else: if not args.key: echo_err(public.get_msg_gettext( 'When applying using dnsapi, specify the dnsapi key in the --dns_key parameter!')) if not args.secret: echo_err(public.get_msg_gettext( 'When applying using dnsapi, specify the secret of dnsapi in the --dns_secret parameter!')) auth_to = "{}|{}|{}".format( args.dnsapi, args.key, args.secret) domains = args.domains.strip().split(',') p = acme_v2() cert = p.apply_cert( domains, auth_type=args.auth_type, auth_to=auth_to) if args.dnsapi == '0': acme_txt = '_acme-challenge.' acme_caa = '1 issue letsencrypt.org' write_log("=" * 65) write_log( "\033[32m" + public.get_msg_gettext( '|-Manual order submission is successful, please resolve DNS records according to the following tips: ' ) + "\033[0m" ) write_log("=" * 65) write_log(public.get_msg_gettext('|-Order index: {}', (cert['index'],))) write_log( public.lang("|-Retry the command") + ": ./acme_v2.py --index=\"{}\"", cert['index'] ) write_log( public.get_msg_gettext( '|-A total of \033[36m{}\033[0m domain name records need to be resolved.', (len(cert['auths']),) ) ) for i in range(len(cert['auths'])): write_log('-' * 70) write_log(public.get_msg_gettext( '|-The \033[36m{}\033[0m domain names are: {}, please resolve the following information: ', (str(i + 1), cert['auths'][i]['domain']))) write_log(public.get_msg_gettext( '|-Record Type: TXT Record Name: \033[41m{}\033[0m Record Value: \033[41m{}\033 [0m [Required]', (acme_txt + cert['auths'][i]['domain'].replace('*.', ''), cert['auths'][i]['auth_value']))) write_log(public.get_msg_gettext( '|-Record type: CAA Record name: \033[41m{}\033[0m Record value: \033[41m{}\033[0m [Optional]', (cert['auths'][i]['domain'].replace('*.', ''), acme_caa))) write_log('-' * 70) input_data = "" while input_data not in ['y', 'Y', 'n', 'N']: input_msg = public.lang( 'Please wait 2-3 minutes after completing the resolution and ' 'enter Y and press Enter to continue verifying the domain name: ' ) if sys.version_info[0] == 2: # noinspection PyUnresolvedReferences input_data = raw_input(input_msg) else: input_data = input(input_msg) if input_data in ['n', 'N']: write_log("=" * 65) write_log(public.lang("|-The user abandons the application and exits the program!")) exit() cert = p.apply_cert( [], auth_type=args.auth_type, auth_to='dns', index=cert['index'] ) else: # 重新验证 p = acme_v2() cert = p.apply_cert([], auth_type='dns', auth_to='dns', index=args.index) except Exception as ex: write_log("|-{}".format(public.get_error_info())) exit() if not cert: exit() write_log("=" * 65) write_log("|-Certificate obtained successfully!") write_log("=" * 65) write_log('Certificate expiration time: {}'.format(public.format_date(times=cert['cert_timeout']))) write_log('Certificate saved at: {}/'.format(cert['save_path']))
Close