Linux ip-172-26-2-223 5.4.0-1018-aws #18-Ubuntu SMP Wed Jun 24 01:15:00 UTC 2020 x86_64
Apache
: 172.26.2.223 | : 3.148.248.235
Cant Read [ /etc/named.conf ]
8.1.13
www
www.github.com/MadExploits
Terminal
AUTO ROOT
Adminer
Backdoor Destroyer
Linux Exploit
Lock Shell
Lock File
Create User
CREATE RDP
PHP Mailer
BACKCONNECT
UNLOCK SHELL
HASH IDENTIFIER
CPANEL RESET
CREATE WP USER
BLACK DEFEND!
README
+ Create Folder
+ Create File
/
www /
server /
panel /
class_v2 /
[ HOME SHELL ]
Name
Size
Permission
Action
__pycache__
[ DIR ]
drwxr-xr-x
btdockerModelV2
[ DIR ]
drwxr-xr-x
crontabModelV2
[ DIR ]
drwxr-xr-x
databaseModelV2
[ DIR ]
drwxr-xr-x
firewallModelV2
[ DIR ]
drwxr-xr-x
logsModelV2
[ DIR ]
drwxr-xr-x
monitorModelV2
[ DIR ]
drwxr-xr-x
panelModelV2
[ DIR ]
drwxr-xr-x
power_mta
[ DIR ]
drwxr-xr-x
projectModelV2
[ DIR ]
drwxr-xr-x
safeModelV2
[ DIR ]
drwxr-xr-x
safe_warning_v2
[ DIR ]
drwxr-xr-x
ssl_domainModelV2
[ DIR ]
drwxr-xr-x
virtualModelV2
[ DIR ]
drwxr-xr-x
wp_toolkit
[ DIR ]
drwxr-xr-x
acme_v3.py
133.98
KB
-rw-r--r--
ajax_v2.py
95.41
KB
-rw-r--r--
apache_v2.py
17.28
KB
-rw-r--r--
backup_bak_v2.py
24.86
KB
-rw-r--r--
breaking_through.py
47.94
KB
-rw-r--r--
cloud_stora_upload_v2.py
19.27
KB
-rw-r--r--
common_v2.py
12.45
KB
-rw-r--r--
config_v2.py
165.36
KB
-rw-r--r--
crontab_ssl_v2.py
1.85
KB
-rw-r--r--
crontab_v2.py
111.93
KB
-rw-r--r--
data_v2.py
36.54
KB
-rw-r--r--
database_v2.py
125.54
KB
-rw-r--r--
datatool_v2.py
5.83
KB
-rw-r--r--
db_mysql_v2.py
11.41
KB
-rw-r--r--
db_v2.py
11.04
KB
-rw-r--r--
dk_db.py
18.34
KB
-rw-r--r--
download_file_v2.py
2.54
KB
-rw-r--r--
fastcgi_client_two_v2.py
12.26
KB
-rw-r--r--
fastcgi_client_v2.py
6.89
KB
-rw-r--r--
file_execute_deny_v2.py
10.34
KB
-rw-r--r--
files_v2.py
149.12
KB
-rw-r--r--
firewall_new_v2.py
22.4
KB
-rw-r--r--
firewalld_v2.py
11.09
KB
-rw-r--r--
firewalls_v2.py
17.44
KB
-rw-r--r--
flask_compress_v2.py
5.12
KB
-rw-r--r--
flask_sockets_v2.py
3.75
KB
-rw-r--r--
ftp_log_v2.py
21.72
KB
-rw-r--r--
ftp_v2.py
16.17
KB
-rw-r--r--
http_requests_v2.py
24.25
KB
-rw-r--r--
jobs_v2.py
36.98
KB
-rw-r--r--
letsencrypt_v2.py
12.85
KB
-rw-r--r--
log_analysis_v2.py
12.23
KB
-rw-r--r--
monitor_v2.py
13.53
KB
-rw-r--r--
one_key_wp_v2.py
75.79
KB
-rw-r--r--
panelControllerV2.py
4.97
KB
-rw-r--r--
panelDatabaseControllerV2.py
5.76
KB
-rw-r--r--
panelDockerControllerV2.py
5.86
KB
-rw-r--r--
panelFireControllerV2.py
4.65
KB
-rw-r--r--
panelModControllerV2.py
5.13
KB
-rw-r--r--
panelProjectControllerV2.py
6.07
KB
-rw-r--r--
panelSafeControllerV2.py
4.65
KB
-rw-r--r--
panel_api_v2.py
10.43
KB
-rw-r--r--
panel_auth_v2.py
33.21
KB
-rw-r--r--
panel_backup_v2.py
102.56
KB
-rw-r--r--
panel_dns_api_v2.py
22.2
KB
-rw-r--r--
panel_http_proxy_v2.py
11.33
KB
-rw-r--r--
panel_lets_v2.py
43.61
KB
-rw-r--r--
panel_mssql_v2.py
4.48
KB
-rw-r--r--
panel_mysql_v2.py
7.55
KB
-rw-r--r--
panel_php_v2.py
24.78
KB
-rw-r--r--
panel_ping_v2.py
2.88
KB
-rw-r--r--
panel_plugin_v2.py
125.11
KB
-rw-r--r--
panel_push_v2.py
23.78
KB
-rw-r--r--
panel_redirect_v2.py
34.02
KB
-rw-r--r--
panel_restore_v2.py
11.04
KB
-rw-r--r--
panel_site_v2.py
343.73
KB
-rw-r--r--
panel_ssl_v2.py
75.34
KB
-rw-r--r--
panel_task_v2.py
28.7
KB
-rw-r--r--
panel_video_V2.py
1.88
KB
-rw-r--r--
panel_warning_v2.py
68.71
KB
-rw-r--r--
password_v2.py
8.09
KB
-rw-r--r--
plugin_auth_v2.py
3.14
KB
-rw-r--r--
plugin_deployment_v2.py
28.85
KB
-rw-r--r--
san_baseline_v2.py
51.13
KB
-rw-r--r--
site_dir_auth_v2.py
17.67
KB
-rw-r--r--
ssh_security_v2.py
45.66
KB
-rw-r--r--
ssh_terminal_v2.py
58.86
KB
-rw-r--r--
system_v2.py
44.77
KB
-rw-r--r--
userRegister_v2.py
6.74
KB
-rw-r--r--
user_login_v2.py
21.2
KB
-rw-r--r--
vilidate_v2.py
4.94
KB
-rw-r--r--
wxapp_v2.py
5.62
KB
-rw-r--r--
Delete
Unzip
Zip
${this.title}
Close
Code Editor : ssh_security_v2.py
#coding: utf-8 #------------------------------------------------------------------- # aaPanel #------------------------------------------------------------------- # Copyright (c) 2015-2017 aaPanel(www.aapanel.com) All rights reserved. #------------------------------------------------------------------- # Author: lkqiang <lkq@aapanel.com> #------------------------------------------------------------------- # SSH 安全类 #------------------------------ import public,os,re,send_mail,json from datetime import datetime from public.validate import Param class ssh_security: __type_list = ['ed25519','ecdsa','rsa', 'dsa'] __key_type_file = '{}/data/ssh_key_type.pl'.format(public.get_panel_path()) __key_files = ['/root/.ssh/id_ed25519','/root/.ssh/id_ecdsa','/root/.ssh/id_rsa','/root/.ssh/id_rsa_bt'] __type_files = { "ed25519": "/root/.ssh/id_ed25519", "ecdsa": "/root/.ssh/id_ecdsa", "rsa": "/root/.ssh/id_rsa", "dsa": "/root/.ssh/id_dsa" } open_ssh_login = public.get_panel_path() + '/data/open_ssh_login.pl' __SSH_CONFIG='/etc/ssh/sshd_config' __ip_data = None __ClIENT_IP='/www/server/panel/data/host_login_ip.json' __pyenv = 'python' __REPAIR={"1":{"id":1, "type":"file", "harm":"High", "repaired":"1", "level":"3", "name":"Make sure SSH MaxAuthTries is set between 3-6", "file":"/etc/ssh/sshd_config", "Suggestions":"Remove the MaxAuthTries comment symbol # in /etc/ssh/sshd_config, set the maximum number of failed password attempts 3-6 recommended 4", "repair":"MaxAuthTries 4", "rule":[{"re":"\nMaxAuthTries\\s*(\\d+)","check":{"type":"number","max":7,"min":3}}], "repair_loophole":[{"re":"\n?#?MaxAuthTries\\s*(\\d+)","check":"\nMaxAuthTries 4"}]}, "2":{"id":2, "repaired":"1", "type":"file", "harm":"High", "level":"3", "name":"SSHD Mandatory use of V2 security protocol", "file":"/etc/ssh/sshd_config", "Suggestions":"Set parameters in the /etc/ssh/sshd_config file as follows", "repair":"Protocol 2", "rule":[{"re":"\nProtocol\\s*(\\d+)", "check":{"type":"number","max":3,"min":1}}], "repair_loophole":[{"re":"\n?#?Protocol\\s*(\\d+)","check":"\nProtocol 2"}]}, "3":{"id":3, "repaired":"1", "type":"file", "harm":"High", "level":"3", "name":"Set SSH idle exit time", "file":"/etc/ssh/sshd_config", "Suggestions":"Set ClientAliveInterval to 300 to 900 in /etc/ssh/sshd_config, which is 5-15 minutes, and set ClientAliveCountMax to 0-3", "repair":"ClientAliveInterval 600 ClientAliveCountMax 2", "rule":[{"re":"\nClientAliveInterval\\s*(\\d+)","check":{"type":"number","max":900,"min":300}}], "repair_loophole":[{"re":"\n?#?ClientAliveInterval\\s*(\\d+)","check":"\nClientAliveInterval 600"}]}, "4":{"id":4, "repaired":"1", "type":"file", "harm":"High", "level":"3", "name":"Make sure SSH LogLevel is set to INFO", "file":"/etc/ssh/sshd_config", "Suggestions":"Set parameters in the /etc/ssh/sshd_config file as follows (uncomment)", "repair":"LogLevel INFO", "rule":[{"re":"\nLogLevel\\s*(\\w+)","check":{"type":"string","value":["INFO"]}}], "repair_loophole":[{"re":"\n?#?LogLevel\\s*(\\w+)","check":"\nLogLevel INFO"}]}, "5":{"id":5, "repaired":"1", "type":"file", "harm":"High", "level":"3", "name":"Disable SSH users with empty passwords from logging in", "file":"/etc/ssh/sshd_config", "Suggestions":"Configure PermitEmptyPasswords to no in /etc/ssh/sshd_config", "repair":"PermitEmptyPasswords no", "rule":[{"re":"\nPermitEmptyPasswords\\s*(\\w+)","check":{"type":"string","value":["no"]}}], "repair_loophole":[{"re":"\n?#?PermitEmptyPasswords\\s*(\\w+)","check":"\nPermitEmptyPasswords no"}]}, "6":{"id":6, "repaired":"1", "type":"file", "name":"SSH uses the default port 22", "harm":"High", "level":"3", "file":"/etc/ssh/sshd_config", "Suggestions":"Set Port to 6000 to 65535 in / etc / ssh / sshd_config", "repair":"Port 60151", "rule":[{"re":"Port\\s*(\\d+)","check":{"type":"number","max":65535,"min":22}}], "repair_loophole":[{"re":"\n?#?Port\\s*(\\d+)","check":"\nPort 65531"}]}} __root_login_types = {'yes':'yes - keys and passwords','no':'no - no login','without-password':'without-password - only key login','forced-commands-only':'forced-commands-only - can only execute commands'} def __init__(self): if not public.M('sqlite_master').where('type=? AND name=?', ('table', 'ssh_login_record')).count(): public.M('').execute('''CREATE TABLE ssh_login_record ( id INTEGER PRIMARY KEY AUTOINCREMENT, addr TEXT, server_ip TEXT, user_agent TEXT, ssh_user TEXT, login_time INTEGER DEFAULT 0, close_time INTEGER DEFAULT 0, video_addr TEXT);''') public.M('').execute('CREATE INDEX ssh_login_record ON ssh_login_record (addr);') if not os.path.exists(self.__ClIENT_IP): public.WriteFile(self.__ClIENT_IP,json.dumps([])) self.__mail=send_mail.send_mail() self.__mail_config=self.__mail.get_settings() self._check_pyenv() try: self.__ip_data = json.loads(public.ReadFile(self.__ClIENT_IP)) except: self.__ip_data=[] def _check_pyenv(self): if os.path.exists('/www/server/panel/pyenv'): self.__pyenv = 'btpython' def get_ssh_key_type(self): ''' 获取ssh密钥类型 @author hwliang :return: ''' default_type = 'rsa' if not os.path.exists(self.__key_type_file): return default_type new_type = public.ReadFile(self.__key_type_file) if new_type in self.__type_list: return new_type return default_type def return_python(self): if os.path.exists('/www/server/panel/pyenv/bin/python'):return '/www/server/panel/pyenv/bin/python' if os.path.exists('/usr/bin/python'):return '/usr/bin/python' if os.path.exists('/usr/bin/python3'):return '/usr/bin/python3' return 'python' def return_profile(self): if os.path.exists('/root/.bash_profile'): return '/root/.bash_profile' if os.path.exists('/etc/profile'): return '/etc/profile' fd = open('/root/.bash_profil', mode="w", encoding="utf-8") fd.close() return '/root/.bash_profil' def return_bashrc(self): if os.path.exists('/root/.bashrc'):return '/root/.bashrc' if os.path.exists('/etc/bashrc'):return '/etc/bashrc' if os.path.exists('/etc/bash.bashrc'):return '/etc/bash.bashrc' fd = open('/root/.bashrc', mode="w", encoding="utf-8") fd.close() return '/root/.bashrc' def check_files(self): try: json.loads(public.ReadFile(self.__ClIENT_IP)) except: public.WriteFile(self.__ClIENT_IP, json.dumps([])) def get_ssh_port(self): conf = public.readFile(self.__SSH_CONFIG) if not conf: conf = '' rep = r"#*Port\s+([0-9]+)\s*\n" tmp1 = re.search(rep,conf) port = '22' if tmp1: port = tmp1.groups(0)[0] return port # 主判断函数 def check_san_baseline(self, base_json): if base_json['type'] == 'file': if 'check_file' in base_json: if not os.path.exists(base_json['check_file']): return False else: if os.path.exists(base_json['file']): ret = public.ReadFile(base_json['file']) for i in base_json['rule']: valuse = re.findall(i['re'], ret) if i['check']['type'] == 'number': if not valuse: return False if not valuse[0]: return False valuse = int(valuse[0]) if valuse > i['check']['min'] and valuse < i['check']['max']: return True else: return False elif i['check']['type'] == 'string': if not valuse: return False if not valuse[0]: return False valuse = valuse[0] if valuse in i['check']['value']: return True else: return False return True def san_ssh_security(self,get): data={"num":100,"result":[]} result = [] ret = self.check_san_baseline(self.__REPAIR['1']) if not ret: result.append(self.__REPAIR['1']) ret = self.check_san_baseline(self.__REPAIR['2']) if not ret: result.append(self.__REPAIR['2']) ret = self.check_san_baseline(self.__REPAIR['3']) if not ret: result.append(self.__REPAIR['3']) ret = self.check_san_baseline(self.__REPAIR['4']) if not ret: result.append(self.__REPAIR['4']) ret = self.check_san_baseline(self.__REPAIR['5']) if not ret: result.append(self.__REPAIR['5']) ret = self.check_san_baseline(self.__REPAIR['6']) if not ret: result.append(self.__REPAIR['6']) data["result"]=result if len(result)>=1: data['num']=data['num']-(len(result)*10) return data ################## SSH 登陆报警设置 #################################### def send_mail_data(self, title, body, login_ip, type=None): # public.print_log(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>") # public.print_log((title, body, login_ip)) from panel_msg.collector import SitePushMsgCollect msg = SitePushMsgCollect.ssh_login(body) push_data = { "login_ip": "" if body.find("backdoor user") != -1 else ( login_ip if login_ip != "" else "unknown ip"), "msg_list": ['>Send content:' + body] } # public.print_log(push_data) try: import sys if "/www/server/panel" not in sys.path: sys.path.insert(0, "/www/server/panel") from mod.base.push_mod import push_by_task_keyword # public.print_log(push_data) res = push_by_task_keyword("ssh_login", "ssh_login", push_data=push_data) if res: return except: pass try: login_send_type_conf = "/www/server/panel/data/ssh_send_type.pl" if not os.path.exists(login_send_type_conf): return # login_type = "mail" else: login_type = public.readFile(login_send_type_conf).strip() if not login_type: login_type = "mail" object = public.init_msg(login_type.strip()) if not object: return False if login_type=="mail": data={} data['title'] = title data['msg'] = body object.push_data(data) elif login_type=="wx_account": from push.site_push import ToWechatAccountMsg if body.find("backdoor user") != -1: msg = ToWechatAccountMsg.ssh_login("") else: msg = ToWechatAccountMsg.ssh_login(login_ip if login_ip != "" else "unknown ip") object.send_msg(msg) else: msg = public.get_push_info("SSH logon alarm", ['>Send content:' + body]) msg['push_type'] = "SSH logon alarm" object.push_data(msg) except: pass #检测非UID为0的账户 def check_user(self): ret = [] cfile = '/etc/passwd' if os.path.exists(cfile): f = open(cfile, 'r') for i in f: i = i.strip().split(":") if i[2] == '0' and i[3] == '0': if i[0] == 'root': continue ret.append(i[0]) if ret: data=''.join(ret) public.run_thread(self.send_mail_data,args=(public.GetLocalIp()+' There is a backdoor user in the server',public.GetLocalIp()+' There is a backdoor user in the server '+data+' please check/etc/passwd',)) return True else: return False #记录root 的登陆日志 #返回登陆IP def return_ip(self,get): self.check_files() # return public.returnMsg(True, self.__ip_data) return public.return_message(0, 0, self.__ip_data) #添加IP白名单 def add_return_ip(self, get): # 校验参数 try: get.validate([ Param('ip').Require().String().Ip(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) self.check_files() if get.ip.strip() in self.__ip_data: # return public.returnMsg(False, public.lang("Already exists")) return public.return_message(-1, 0, public.lang("Already exists")) else: self.__ip_data.append(get.ip.strip()) public.writeFile(self.__ClIENT_IP, json.dumps(self.__ip_data)) # return public.returnMsg(True, public.lang("Added successfully")) return public.return_message(0, 0, public.lang("Added successfully")) def del_return_ip(self, get): # 校验参数 try: get.validate([ Param('ip').Require().Ip(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) self.check_files() if get.ip.strip() in self.__ip_data: self.__ip_data.remove(get.ip.strip()) public.writeFile(self.__ClIENT_IP, json.dumps(self.__ip_data)) # return public.returnMsg(True, public.lang("Successfully deleted")) return public.return_message(0, 0, public.lang("Successfully deleted")) else: # return public.returnMsg(False, public.lang("IP does not exist")) return public.return_message(-1, 0, public.lang("IP does not exist")) #取登陆的前50个条记录 def login_last(self): self.check_files() data=public.ExecShell('last -n 50') data=re.findall(r"(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)",data[0]) if data>=1: data2=list(set(data)) for i in data2: if not i in self.__ip_data: self.__ip_data.append(i) public.writeFile(self.__ClIENT_IP, json.dumps(self.__ip_data)) return self.__ip_data #获取ROOT当前登陆的IP def get_ip(self): data = public.ExecShell(''' who am i |awk ' {print $5 }' ''') data = re.findall(r"(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)",data[0]) return data def get_logs(self, get): # 分页校验参数 try: get.validate([ Param('p_size').Integer(), Param('p').Integer(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) import page page = page.Page() count = public.M('logs').where('type=?', ('SSH security',)).count() limit = 10 info = {} info['count'] = count info['row'] = limit info['p'] = 1 if hasattr(get, 'p'): info['p'] = int(get['p']) info['uri'] = get info['return_js'] = '' if hasattr(get, 'tojs'): info['return_js'] = get.tojs data = {} # 获取分页数据 data['page'] = page.GetPage(info, '1,2,3,4,5,8') data['data'] = public.M('logs').where('type=?', (u'SSH security',)).order('id desc').limit( str(page.SHIFT) + ',' + str(page.ROW)).field('log,addtime').select() # return data return public.return_message(0, 0, data) def get_server_ip(self): if os.path.exists('/www/server/panel/data/iplist.txt'): data=public.ReadFile('/www/server/panel/data/iplist.txt') return data.strip() else:return '127.0.0.1' #登陆的情况下 def login(self): self.check_files() self.check_user() self.__ip_data = json.loads(public.ReadFile(self.__ClIENT_IP)) ip=self.get_ip() if len(ip[0])==0:return False try: import time mDate = time.strftime('%Y-%m-%d %X', time.localtime()) if ip[0] in self.__ip_data: if public.M('logs').where('type=? addtime', ('SSH security',mDate,)).count():return False public.WriteLog('SSH security', 'The server {} login IP is {}, login user is root'.format(public.GetLocalIp(),ip[0])) return False else: if public.M('logs').where('type=? addtime', ('SSH security', mDate,)).count(): return False self.send_mail_data('Server {} login alarm'.format(public.GetLocalIp()),'There is a login alarm on the server {}, the login IP is {}, the login user is root'.format(public.GetLocalIp(),ip[0])) public.WriteLog('SSH security','There is a login alarm on the server {}, the login IP is {}, login user is root'.format(public.GetLocalIp(),ip [0])) return True except: pass #修复bashrc文件 def repair_bashrc(self): data = public.ReadFile(self.return_bashrc()) if re.search(self.return_python() + ' /www/server/panel/class/ssh_security.py', data): public.WriteFile(self.return_bashrc(),data.replace(self.return_python()+' /www/server/panel/class/ssh_security.py login','')) #遗留的错误信息 datassss = public.ReadFile(self.return_bashrc()) if re.search(self.return_python(),datassss): public.WriteFile(self.return_bashrc(),datassss.replace(self.return_python(),'')) #开启监控 def start_jian(self,get): self.repair_bashrc() data = public.ReadFile(self.return_profile()) if not re.search(self.return_python() + ' /www/server/panel/class/ssh_security.py', data): cmd = '''shell="%s /www/server/panel/class/ssh_security.py login" nohup `${shell}` &>/dev/null & disown $!''' % (self.return_python()) public.WriteFile(self.return_profile(), data.strip() + '\n' + cmd) return public.returnMsg(True, public.lang("Open successfully")) return public.returnMsg(False, public.lang("Open failed")) #关闭监控 def stop_jian(self,get): data = public.ReadFile(self.return_profile()) if re.search(self.return_python()+' /www/server/panel/class/ssh_security.py', data): cmd='''shell="%s /www/server/panel/class/ssh_security.py login"'''%(self.return_python()) data=data.replace(cmd, '') cmd='''nohup `${shell}` &>/dev/null &''' data=data.replace(cmd, '') cmd='''disown $!''' data=data.replace(cmd, '') public.WriteFile(self.return_profile(),data) #检查是否还存在遗留 if re.search(self.return_python()+' /www/server/panel/class/ssh_security.py', data): public.WriteFile(self.return_profile(),data.replace(self.return_python()+' /www/server/panel/class/ssh_security.py login','')) #遗留的错误信息 datassss = public.ReadFile(self.return_profile()) if re.search(self.return_python(),datassss): public.WriteFile(self.return_profile(),datassss.replace(self.return_python(),'')) return public.returnMsg(True, public.lang("Closed successfully")) else: return public.returnMsg(True, public.lang("Closed successfully")) #监控状态 def get_jian(self,get): data = public.ReadFile(self.return_profile()) #if re.search(r'{}\/www\/server\/panel\/class\/ssh_security.py\s+login'.format(r".*python\s+"), data): if re.search('/www/server/panel/class/ssh_security.py login', data): return public.returnMsg(True, public.lang("1")) else: return public.returnMsg(False, public.lang("1")) def set_password(self, get): ''' 开启密码登陆 get: 无需传递参数 ''' ssh_password = r'\n#?PasswordAuthentication\s\w+' file = public.readFile(self.__SSH_CONFIG) if not file: return public.return_message(-1, 0, public.lang("ERROR: sshd config configuration file does not exist, cannot continue!")) # return public.returnMsg(False, public.lang("ERROR: sshd config configuration file does not exist, cannot continue!")) if len(re.findall(ssh_password, file)) == 0: file_result = file + '\nPasswordAuthentication yes' else: file_result = re.sub(ssh_password, '\nPasswordAuthentication yes', file) self.wirte(self.__SSH_CONFIG, file_result) self.restart_ssh() public.WriteLog('SSH management', 'Enable password login') # return public.returnMsg(True, public.lang("Open successfully")) return public.return_message(0, 0, public.lang("Open successfully")) def set_sshkey(self, get): ''' 设置ssh 的key 参数 ssh=rsa&type=yes ''' # 分页校验参数 try: get.validate([ Param('ssh').Require().String('in', ['yes', 'no']).Xss(), Param('type').Require().Xss(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) # ssh_type = ['yes', 'no'] ssh = get.ssh # if not ssh in ssh_type: return public.returnMsg(False, public.lang("ssh option failed")) s_type = get.type if not s_type in self.__type_list: # return public.returnMsg(False, public.lang("Wrong encryption method")) return public.return_message(-1, 0, public.lang("Wrong encryption method")) authorized_keys = '/root/.ssh/authorized_keys' file = ['/root/.ssh/id_{}.pub'.format(s_type), '/root/.ssh/id_{}'.format(s_type)] for i in file: if os.path.exists(i): public.ExecShell(r'sed -i "\~$(cat %s)~d" %s' % (file[0], authorized_keys)) os.remove(i) os.system("ssh-keygen -t {s_type} -P '' -f /root/.ssh/id_{s_type} |echo y".format(s_type = s_type)) if os.path.exists(file[0]): public.ExecShell('cat %s >> %s && chmod 600 %s' % (file[0], authorized_keys, authorized_keys)) rec = r'\n#?RSAAuthentication\s\w+' rec2 = r'\n#?PubkeyAuthentication\s\w+' file = public.readFile(self.__SSH_CONFIG) if not file: # return public.returnMsg(False, public.lang("ERROR: sshd config configuration file does not exist, cannot continue!")) return public.return_message(-1, 0, public.lang("ERROR: sshd config configuration file does not exist")) if len(re.findall(rec, file)) == 0: file = file + '\nRSAAuthentication yes' if len(re.findall(rec2, file)) == 0: file = file + '\nPubkeyAuthentication yes' file_ssh = re.sub(rec, '\nRSAAuthentication yes', file) file_result = re.sub(rec2, '\nPubkeyAuthentication yes', file_ssh) if ssh == 'no': ssh_password = r'\n#?PasswordAuthentication\s\w+' if len(re.findall(ssh_password, file_result)) == 0: file_result = file_result + '\nPasswordAuthentication no' else: file_result = re.sub(ssh_password, '\nPasswordAuthentication no', file_result) self.wirte(self.__SSH_CONFIG, file_result) public.writeFile(self.__key_type_file, s_type) self.restart_ssh() public.WriteLog('SSH management', 'Set up SSH key authentication and successfully generate the key') # return public.returnMsg(True, public.lang("Open successfully")) return public.return_message(0, 0, public.lang("Open successfully")) else: public.WriteLog('SSH management', 'Failed to set SSH key authentication') # return public.returnMsg(False, public.lang("Open failed")) return public.return_message(-1, 0, public.lang("Open failed")) # 取SSH信息 def get_msg_push_list(self,get): """ @name 获取消息通道配置列表 @auther: cjxin @date: 2022-08-16 @ """ cpath = 'data/msg.json' try: if 'force' in get or not os.path.exists(cpath): public.downloadFile('{}/linux/panel/msg/msg.json'.format("https://node.aapanel.com"),cpath) except : pass data = {} if os.path.exists(cpath): msgs = json.loads(public.readFile(cpath)) for x in msgs: x['setup'] = False x['info'] = False key = x['name'] try: obj = public.init_msg(x['name']) if obj: x['setup'] = True x['info'] = obj.get_version_info(None) except : print(public.get_error_info()) pass data[key] = x # return data return public.return_message(0, 0, data) def _get_msg_push_list(self,get): """ @name 获取消息通道配置列表 @auther: cjxin @date: 2022-08-16 @ """ cpath = 'data/msg.json' try: if 'force' in get or not os.path.exists(cpath): public.downloadFile('{}/linux/panel/msg/msg.json'.format("https://node.aapanel.com"),cpath) except : pass data = {} if os.path.exists(cpath): msgs = json.loads(public.readFile(cpath)) for x in msgs: x['setup'] = False x['info'] = False key = x['name'] try: obj = public.init_msg(x['name']) if obj: x['setup'] = True x['info'] = obj.get_version_info(None) except : print(public.get_error_info()) pass data[key] = x return data # return public.return_message(0, 0, data) #取消告警 def clear_login_send(self,get): # 校验参数 try: get.validate([ Param('type').Require().String().Xss(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) login_send_type_conf = "/www/server/panel/data/ssh_send_type.pl" os.remove(login_send_type_conf) self.stop_jian(get) # return public.returnMsg(True, public.lang("Successfully cancel the login alarm!")) return public.return_message(0, 0, public.lang("Successfully cancel the login alarm")) #设置告警 def set_login_send(self,get): # 校验参数 try: get.validate([ Param('type').Require().String().Xss(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) login_send_type_conf = "/www/server/panel/data/ssh_send_type.pl" set_type=get.type.strip() msg_configs = self._get_msg_push_list(get) # public.print_log("22222 --{}".format(msg_configs.keys())) if set_type not in msg_configs.keys(): # return public.returnMsg(False, public.lang("This send type is not supported")) return public.return_message(-1, 0, public.lang("This send type is not supported")) from panelMessage import panelMessage pm = panelMessage() obj = pm.init_msg_module(set_type) if not obj: # return public.returnMsg(False, public.lang("The message channel is not installed.")) return public.return_message(-1, 0, public.lang("The message channel is not installed")) public.writeFile(login_send_type_conf, set_type) self.start_jian(get) # return public.returnMsg(True, public.lang("Successfully set")) return public.return_message(0, 0, public.lang("Successfully set")) #查看告警 def get_login_send(self, get): login_send_type_conf = "/www/server/panel/data/ssh_send_type.pl" if os.path.exists(login_send_type_conf): send_type = public.readFile(login_send_type_conf).strip() else: send_type ="error" # return public.returnMsg(True, send_type) return public.return_message(0, 0, send_type) def GetSshInfo(self, get): # port = public.get_ssh_port() pid_file = '/run/sshd.pid' if os.path.exists(pid_file): pid = int(public.readFile(pid_file)) status = public.pid_exists(pid) else: import system panelsys = system.system() version = panelsys.GetSystemVersion() if os.path.exists('/usr/bin/apt-get'): if os.path.exists('/etc/init.d/sshd'): status = public.ExecShell("service sshd status | grep -P '(dead|stop)'|grep -v grep") else: status = public.ExecShell("service ssh status | grep -P '(dead|stop)'|grep -v grep") else: if version.find(' 7.') != -1 or version.find(' 8.') != -1 or version.find('Fedora') != -1: status = public.ExecShell("systemctl status sshd.service | grep 'dead'|grep -v grep") else: status = public.ExecShell("/etc/init.d/sshd status | grep -e 'stopped' -e '已停'|grep -v grep") # return status; if len(status[0]) > 3: status = False else: status = True return status def stop_key(self, get): ''' 关闭key 无需参数传递 ''' is_ssh_status=self.GetSshInfo(get) rec = r'\n\s*#?\s*RSAAuthentication\s+\w+' rec2 = r'\n\s*#?\s*PubkeyAuthentication\s+\w+' file = public.readFile(self.__SSH_CONFIG) if not file: # return public.returnMsg(False, public.lang("错误:sshd_config配置文件不存在,无法继续!")) return public.return_message(-1, 0, public.lang("Error: sshd config configuration file does not exist")) file_ssh = re.sub(rec, '\nRSAAuthentication no', file) file_result = re.sub(rec2, '\nPubkeyAuthentication no', file_ssh) self.wirte(self.__SSH_CONFIG, file_result) if is_ssh_status: self.set_password(get) self.restart_ssh() public.WriteLog('SSH management','Disable SSH key login') # return public.returnMsg(True, public.lang("Disable successfully")) return public.return_message(0, 0, public.lang("Disable successfully")) def get_config(self, get): ''' 获取配置文件 无参数传递 ''' result = {} file = public.readFile(self.__SSH_CONFIG) if not file: # return public.returnMsg(False, public.lang("Error: sshd config does not exist")) return public.return_message(-1, 0, public.lang("Error: sshd config does not exist")) # ======== 以下在2022-10-12重构 ========== # author : hwliang # 是否开启RSA公钥认证 # 默认开启(最新版openssh已经不支持RSA公钥认证) # yes = 开启 # no = 关闭 result['rsa_auth'] = 'yes' rec = r'^\s*RSAAuthentication\s*(yes|no)' rsa_find = re.findall(rec, file, re.M|re.I) if rsa_find and rsa_find[0].lower() == 'no': result['rsa_auth'] = 'no' # 获取是否开启公钥认证 # 默认关闭 # yes = 开启 # no = 关闭 result['pubkey'] = 'no' if self._get_key(get)['msg']: # 先检查是否存在可用的公钥 pubkey = r'^\s*PubkeyAuthentication\s*(yes|no)' pubkey_find = re.findall(pubkey, file, re.M|re.I) if pubkey_find and pubkey_find[0].lower() == 'yes': result['pubkey'] = 'yes' # 是否开启密码登录 # 默认开启 # yes = 开启 # no = 关闭 result['password'] = 'yes' ssh_password = r'^\s*PasswordAuthentication\s*([\w\-]+)' ssh_password_find = re.findall(ssh_password, file, re.M|re.I) if ssh_password_find and ssh_password_find[0].lower() == 'no': result['password'] = 'no' #是否允许root登录 # 默认允许 # yes = 允许 # no = 不允许 # without-password = 允许,但不允许使用密码登录 # forced-commands-only = 允许,但只允许执行命令,不能使用终端 result['root_is_login'] = 'yes' result['root_login_type'] = 'yes' root_is_login=r'^\s*PermitRootLogin\s*([\w\-]+)' root_is_login_find = re.findall(root_is_login, file, re.M|re.I) if root_is_login_find and root_is_login_find[0].lower() != 'yes': result['root_is_login'] = 'no' result['root_login_type'] = root_is_login_find[0].lower() result['root_login_types'] = self.__root_login_types result['port'] = public.get_sshd_port() result['key_type'] = public.ReadFile(self.__key_type_file) # return result return public.return_message(0, 0, result) def set_root(self, get): ''' 开启密码登陆 get: 无需传递参数 ''' # without-password yes no forced-commands-only # 分页校验参数 try: get.validate([ Param('p_type').String('in', ['yes', 'no', 'without-password', 'forced-commands-only']).Xss(), ], [ public.validate.trim_filter(), ]) except Exception as ex: public.print_log("error info: {}".format(ex)) return public.return_message(-1, 0, str(ex)) p_type = 'yes' if 'p_type' in get: p_type = get.p_type if p_type not in self.__root_login_types.keys(): # return public.returnMsg(False, public.lang("Parameter passing error!")) return public.return_message(-1, 0, public.lang("Parameter passing error")) ssh_password = r'^\s*#?\s*PermitRootLogin\s*([\w\-]+)' file = public.readFile(self.__SSH_CONFIG) src_line = re.search(ssh_password, file,re.M) new_line = 'PermitRootLogin {}'.format(p_type) if not src_line: file_result = file + '\n{}'.format(new_line) else: file_result = file.replace(src_line.group(),new_line) self.wirte(self.__SSH_CONFIG, file_result) self.restart_ssh() msg = public.lang('Set the root login method as: {}',self.__root_login_types[p_type]) public.WriteLog('SSH management',msg) # return public.returnMsg(True, msg) return public.return_message(0, 0, msg) def set_root_password(self, get): """ @name 设置root密码 @param get: @return: """ password = get.password if "password" in get else "" username = get.username if "username" in get else "" if not password: return public.return_message(-1, 0, public.lang("The password cannot be empty")) if len(password) < 8: return public.return_message(-1, 0, public.lang("The password cannot be less than 8 bits long")) if get.username not in self.get_sys_user(get)['msg']: return public.return_message(-1, 0, public.lang("The username already exists")) has_letter = bool(re.search(r'[a-zA-Z!@#$%^&*()-_+=]', password)) has_digit_or_symbol = bool(re.search(r'[0-9!@#$%^&*()-_+=]', password)) if not has_letter or not has_digit_or_symbol: return public.return_message(-1, 0, public.lang("The password must contain letters and numbers or symbols")) if username == "root": cmd_result, cmd_err = public.ExecShell("echo root:%s|chpasswd" % password) else: cmd_result, cmd_err = public.ExecShell("echo %s:%s|chpasswd" % (username, password)) if cmd_err: return public.return_message(-1, 0, public.lang("Setup failure")) public.WriteLog("SSH", "[Security] - [SSH] - [Set %s password]" % username) return public.return_message(0, 0, public.lang("successfully set")) def get_sys_user(self, get): """获取所有用户名 @param: @return """ from collections import deque user_set = deque() with open('/etc/passwd') as fp: for line in fp.readlines(): user_set.append(line.split(':', 1)[0]) return public.returnMsg(True, list(user_set)) def stop_root(self, get): ''' 开启密码登陆 get: 无需传递参数 ''' ssh_password = r'\n\s*PermitRootLogin\s+\w+' file = public.readFile(self.__SSH_CONFIG) if len(re.findall(ssh_password, file)) == 0: file_result = file + '\nPermitRootLogin no' else: file_result = re.sub(ssh_password, '\nPermitRootLogin no', file) self.wirte(self.__SSH_CONFIG, file_result) self.restart_ssh() public.WriteLog('SSH management','Set the root login method to: no') return public.returnMsg(True, public.lang("Disable successfully")) def stop_password(self, get): ''' 关闭密码访问 无参数传递 ''' file = public.readFile(self.__SSH_CONFIG) ssh_password = r'\n#?PasswordAuthentication\s\w+' file_result = re.sub(ssh_password, '\nPasswordAuthentication no', file) self.wirte(self.__SSH_CONFIG, file_result) self.restart_ssh() public.WriteLog('SSH management','Disable password access') return public.returnMsg(True, public.lang("Closed successfully")) def _get_key(self, get): ''' 获取key 无参数传递 ''' key_type = self.get_ssh_key_type() if key_type in self.__type_files.keys(): key_file = self.__type_files[key_type] key = public.readFile(key_file) return public.returnMsg(True,key) return public.returnMsg(True, public.lang("")) def get_key(self, get): ''' 获取key 无参数传递 ''' key_type = self.get_ssh_key_type() if key_type in self.__type_files.keys(): key_file = self.__type_files[key_type] key = public.readFile(key_file) return public.return_message(0, 0,key) return public.return_message(0, 0, public.lang("")) def download_key(self, get): ''' @name 下载密钥 ''' download_file = '' key_type = self.get_ssh_key_type() if key_type in self.__type_files.keys(): if os.path.exists(self.__type_files[key_type]): download_file = self.__type_files[key_type] else: for file in self.__key_files: if not os.path.exists(file): continue download_file = file break if not download_file: return public.returnMsg(False, public.lang("Key file not found!")) from flask import send_file filename = "{}_{}".format(public.GetHost(),os.path.basename(download_file)) return send_file(download_file,download_name=filename) def wirte(self, file, ret): result = public.writeFile(file, ret) return result def restart_ssh(self): ''' 重启ssh 无参数传递 ''' version = public.readFile('/etc/redhat-release') act = 'restart' if not os.path.exists('/etc/redhat-release'): public.ExecShell('service ssh ' + act) elif version.find(' 7.') != -1 or version.find(' 8.') != -1: public.ExecShell("systemctl " + act + " sshd.service") else: public.ExecShell("/etc/init.d/sshd " + act) #检查是否设置了钉钉 def check_dingding(self, get): ''' 检查是否设置了钉钉 ''' #检查文件是否存在 if not os.path.exists('/www/server/panel/data/dingding.json'):return False dingding_config=public.ReadFile('/www/server/panel/data/dingding.json') if not dingding_config:return False #解析json try: dingding=json.loads(dingding_config) if dingding['dingding_url']: return True except: return False #开启SSH双因子认证 def start_auth_method(self, get): ''' 开启SSH双因子认证 ''' #检查是否设置了钉钉 import ssh_authentication ssh_class=ssh_authentication.ssh_authentication() return ssh_class.start_ssh_authentication_two_factors() #关闭SSH双因子认证 def stop_auth_method(self, get): ''' 关闭SSH双因子认证 ''' #检查是否设置了钉钉 import ssh_authentication ssh_class=ssh_authentication.ssh_authentication() return ssh_class.close_ssh_authentication_two_factors() #获取SSH双因子认证状态 def get_auth_method(self, get): ''' 获取SSH双因子认证状态 ''' #检查是否设置了钉钉 import ssh_authentication ssh_class=ssh_authentication.ssh_authentication() return ssh_class.check_ssh_authentication_two_factors() #判断so文件是否存在 def check_so_file(self, get): ''' 判断so文件是否存在 ''' import ssh_authentication ssh_class=ssh_authentication.ssh_authentication() return ssh_class.is_check_so() #下载so文件 def get_so_file(self, get): ''' 下载so文件 ''' import ssh_authentication ssh_class=ssh_authentication.ssh_authentication() return ssh_class.download_so() #获取pin def get_pin(self, get): ''' 获取pin ''' import ssh_authentication ssh_class=ssh_authentication.ssh_authentication() return public.returnMsg(True, ssh_class.get_pin()) def get_login_record(self,get): if os.path.exists(self.open_ssh_login): return public.returnMsg(True, public.lang("")) else: return public.returnMsg(False, public.lang("")) def start_login_record(self,get): if os.path.exists(self.open_ssh_login): return public.returnMsg(True, public.lang("")) else: public.writeFile(self.open_ssh_login,"True") return public.returnMsg(True, public.lang("")) def stop_login_record(self,get): if os.path.exists(self.open_ssh_login): os.remove(self.open_ssh_login) return public.returnMsg(True, public.lang("")) else: return public.returnMsg(True, public.lang("")) # 获取登录记录列表 def get_record_list(self, get): if 'limit' in get: limit = int(get.limit.strip()) else: limit = 12 import page page = page.Page() count = public.M('ssh_login_record').order("id desc").count() info = {} info['count'] = count info['row'] = limit info['p'] = 1 if hasattr(get, 'p'): info['p'] = int(get['p']) info['uri'] = get info['return_js'] = '' if hasattr(get, 'tojs'): info['return_js'] = get.tojs data = {} # 获取分页数据 data['page'] = page.GetPage(info, '1,2,3,4,5,8') data['data'] = public.M('ssh_login_record').order('id desc').limit( str(page.SHIFT) + ',' + str(page.ROW)).select() return data def get_file_json(self,get): if os.path.exists(get.path): ret=json.loads(public.ReadFile(get.path)) return ret else: return '' if __name__ == '__main__': import sys type = sys.argv[1] if type=='login': try: aa = ssh_security() aa.login() except:pass else: pass
Close