Linux ip-172-26-2-223 5.4.0-1018-aws #18-Ubuntu SMP Wed Jun 24 01:15:00 UTC 2020 x86_64
Apache
: 172.26.2.223 | : 18.221.70.17
Cant Read [ /etc/named.conf ]
8.1.13
www
www.github.com/MadExploits
Terminal
AUTO ROOT
Adminer
Backdoor Destroyer
Linux Exploit
Lock Shell
Lock File
Create User
CREATE RDP
PHP Mailer
BACKCONNECT
UNLOCK SHELL
HASH IDENTIFIER
CPANEL RESET
CREATE WP USER
BLACK DEFEND!
README
+ Create Folder
+ Create File
/
www /
server /
panel /
class_v2 /
firewallModelV2 /
app /
[ HOME SHELL ]
Name
Size
Permission
Action
appBase.py
4.11
KB
-rw-r--r--
firewalld.py
37.52
KB
-rw-r--r--
iptables.py
37.3
KB
-rw-r--r--
ufw.py
29.77
KB
-rw-r--r--
Delete
Unzip
Zip
${this.title}
Close
Code Editor : ufw.py
#!/www/server/panel/pyenv/bin/python3.7 # coding: utf-8 # ------------------------------------------------------------------- # aapanel # ------------------------------------------------------------------- # Copyright (c) 2014-2099 aapanel(http://www.aapanel.com) All rights reserved. # ------------------------------------------------------------------- # Author: wzz <wzz@bt.cn> # ------------------------------------------------------------------- # ------------------------------ # 系统防火墙模型 - ufw封装库 # ------------------------------ import subprocess import os import sys if "/www/server/panel/class" not in sys.path: sys.path.insert(0, "/www/server/panel/class") import public # import re from firewallModelV2.app.appBase import Base class Ufw(Base): def __init__(self): self.cmd_str = self._set_cmd_str() def _set_cmd_str(self): return "ufw" # 2024/3/19 下午 5:00 获取系统防火墙的运行状态 def status(self): ''' @name 获取系统防火墙的运行状态 @author wzz <2024/3/19 下午 5:00> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} ''' try: result = subprocess.run([self.cmd_str, "status"], capture_output=True, text=True, check=True) if "Status: active" in result.stdout: return "running" elif "状态: 激活" in result.stdout: return "running" else: return "not running" except subprocess.CalledProcessError: return "not running" except Exception as e: return "not running" # 2024/3/19 下午 5:00 获取系统防火墙的版本号 def version(self): ''' @name 获取系统防火墙的版本号 @author wzz <2024/3/19 下午 5:00> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} ''' try: result = subprocess.run([self.cmd_str, "version"], capture_output=True, text=True, check=True) info = result.stdout.replace("\n", "") return info.replace("ufw ", "") except Exception as e: return "Unknown version" # 2024/3/19 下午 5:00 启动防火墙 def start(self): ''' @name 启动防火墙 @author wzz <2024/3/19 下午 5:00> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} ''' try: stdout, stderr = public.ExecShell("echo y | {} enable".format(self.cmd_str)) if stderr: return self._result(False, public.lang("Failed to start firewall:{}",stderr)) return self._result(True, public.lang("The firewall was started successfully")) except Exception as e: return self._result(False, public.lang("Failed to start firewall:{}",str(e))) # 2024/3/19 下午 5:00 停止防火墙 def stop(self): ''' @name 停止防火墙 @author wzz <2024/3/19 下午 5:00> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} ''' try: stdout, stderr = public.ExecShell("{} disable".format(self.cmd_str)) if stderr: return self._result(False, public.lang("Failed to stop the firewall:{}",stderr)) return self._result(True, public.lang("The firewall was stopped")) except Exception as e: return self._result(False, public.lang("Failed to stop the firewall:{}",str(e))) # 2024/3/19 下午 4:59 重启防火墙 def restart(self): ''' @name 重启防火墙 @author wzz <2024/3/19 下午 4:59> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} ''' try: self.stop() self.start() except Exception as e: return self._result(False, public.lang("Failed to restart the firewall:{}",str(e))) # 2024/3/19 下午 4:59 重载防火墙 def reload(self): ''' @name 重载防火墙 @author wzz <2024/3/19 下午 4:59> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} ''' try: subprocess.run([self.cmd_str, "reload"], check=True, stdout=subprocess.PIPE) except Exception as e: return self._result(False, public.lang("Overloaded firewall failed:{}",str(e))) # 2024/3/19 上午 10:39 列出防火墙中所有端口规则 def list_port(self): ''' @name 列出防火墙中所有端口规则 @author wzz <2024/3/19 上午 10:39> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} ''' try: result = subprocess.run( [self.cmd_str, "status", "verbose"], capture_output=True, text=True, check=True ) port_infos = result.stdout.split("\n") datas = [] is_start = False for line in port_infos: if line.startswith("-"): is_start = True continue if not is_start: continue item_fire = self._load_info(line, "port") if item_fire.get("Port") and item_fire["Port"] != "Anywhere" and "." not in item_fire["Port"]: item_fire["Port"] = item_fire["Port"].replace(":", "-") item_fire["Address"] = "all" if item_fire["Address"] == "Anywhere" else item_fire["Address"] datas.append(item_fire) return datas except Exception as e: return [] # 2024/3/19 上午 10:39 列出防火墙中所有input端口规则 def list_input_port(self): ''' @name 列出防火墙中所有input端口规则 @author wzz <2024/3/19 上午 10:39> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} ''' try: result = subprocess.run( [self.cmd_str, "status", "verbose"], capture_output=True, text=True, check=True ) port_infos = result.stdout.split("\n") datas = [] is_start = False for line in port_infos: if line.startswith("-"): is_start = True continue if not is_start: continue item_fire = self._load_info(line, "port") if item_fire.get("Port") and item_fire["Port"] != "Anywhere" and "." not in item_fire["Port"]: item_fire["Port"] = item_fire["Port"].replace(":", "-") if item_fire["Chain"] == "INPUT": datas.append(item_fire) return datas except Exception as e: return [] # 2024/3/19 上午 10:39 列出防火墙中所有output端口规则 def list_output_port(self): ''' @name 列出防火墙中所有output端口规则 @author wzz <2024/3/19 上午 10:39> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} ''' try: result = subprocess.run( [self.cmd_str, "status", "verbose"], capture_output=True, text=True, check=True ) port_infos = result.stdout.split("\n") datas = [] is_start = False for line in port_infos: if line.startswith("-"): is_start = True continue if not is_start: continue item_fire = self._load_info(line, "port") if item_fire.get("Port") and item_fire["Port"] != "Anywhere" and "." not in item_fire["Port"]: item_fire["Port"] = item_fire["Port"].replace(":", "-") if item_fire["Chain"] == "OUTPUT": datas.append(item_fire) return datas except Exception as e: return [] # 2024/3/19 上午 10:39 列出防火墙中所有的ip规则 def list_address(self): ''' @name 列出防火墙中所有的ip规则 @author wzz <2024/3/19 上午 10:39> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} ''' try: result = subprocess.run( [self.cmd_str, "status", "verbose"], capture_output=True, text=True, check=True ) port_infos = result.stdout.split("\n") datas = [] is_start = False for line in port_infos: if line.startswith("-"): is_start = True continue if not is_start: continue item_fire = self._load_info(line, "address") if "Port" in item_fire: continue if item_fire.get("Address"): datas.append(item_fire) return datas except Exception as e: return [] # 2024/3/19 上午 10:39 列出防火墙中所有input的ip规则 def list_input_address(self): ''' @name 列出防火墙中所有input的ip规则 @author wzz <2024/3/19 上午 10:39> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} ''' try: result = subprocess.run( [self.cmd_str, "status", "verbose"], capture_output=True, text=True, check=True ) port_infos = result.stdout.split("\n") datas = [] is_start = False for line in port_infos: if line.startswith("-"): is_start = True continue if not is_start: continue if " IN" not in line: continue item_fire = self._load_info(line, "address") if "Port" in item_fire: continue if item_fire.get("Address"): datas.append(item_fire) return datas except Exception as e: return [] # 2024/3/19 上午 10:39 列出防火墙中所有output的ip规则 def list_output_address(self): ''' @name 列出防火墙中所有output的ip规则 @author wzz <2024/3/19 上午 10:39> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} ''' try: result = subprocess.run( [self.cmd_str, "status", "verbose"], capture_output=True, text=True, check=True ) port_infos = result.stdout.split("\n") datas = [] is_start = False for line in port_infos: if line.startswith("-"): is_start = True continue if not is_start: continue if " OUT" not in line: continue item_fire = self._load_info(line, "address") if "Port" in item_fire: continue if item_fire.get("Address"): datas.append(item_fire) return datas except Exception as e: return [] # 2024/3/19 下午 4:59 添加端口规则 def input_port(self, info, operation): ''' @name 添加端口规则 @author wzz <2024/3/19 下午 4:59> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} ''' try: if info["Strategy"] == "accept": info["Strategy"] = "allow" elif info["Strategy"] == "drop": info["Strategy"] = "deny" if info["Port"].find('-') != -1: info["Port"] = info["Port"].replace('-', ':') if operation == "add": if info['Protocol'].find("/") != -1: rich_rule = self.cmd_str + " insert 1 {} {}".format(info['Strategy'], info['Port']) stdout, stderr = public.ExecShell(rich_rule) elif info['Protocol'] == "tcp/udp": stdout, stderr = public.ExecShell(self.cmd_str + " allow " + info['Port']) else: stdout, stderr = public.ExecShell(self.cmd_str + " allow " + info['Port'] + "/" + info['Protocol']) else: if info['Protocol'].find("/") != -1: rich_rule = "{} delete {} {}".format(self.cmd_str, info['Strategy'], info['Port']) stdout, stderr = public.ExecShell(rich_rule) elif info['Protocol'] == "tcp/udp": stdout, stderr = public.ExecShell(self.cmd_str + " delete allow " + info['Port']) else: stdout, stderr = public.ExecShell(self.cmd_str + " delete allow " + info['Port'] + "/" + info['Protocol']) if stderr: if "setlocale" in stderr: return self._result(True, public.lang("The port rule was successfully configured")) return self._result(False, public.lang("Failed to set a port rule:{}",stderr)) return self._result(True, public.lang("The port rule was successfully configured")) except Exception as e: if "setlocale" in str(e): return self._result(True, public.lang("The port rule was successfully configured")) return self._result(False, public.lang("Failed to set a port rule:{}",str(e))) # 2024/3/24 下午 11:28 设置output端口策略 def output_port(self, info, operation): ''' @name 设置output端口策略 @param info: 端口号 @param operation: 操作 @return None ''' try: if info["Strategy"] == "accept": info["Strategy"] = "allow" elif info["Strategy"] == "drop": info["Strategy"] = "deny" if operation == "add": if info['Protocol'].find('/') != -1: cmd = "{} {} out {}".format(self.cmd_str, info['Strategy'], info['Port']) else: cmd = "{} {} out {}/{}".format(self.cmd_str, info['Strategy'], info['Port'], info['Protocol']) else: if info['Protocol'].find('/') != -1: cmd = "{} delete {} out {}".format(self.cmd_str, info['Strategy'], info['Port']) else: cmd = "{} delete {} out {}/{}".format(self.cmd_str, info['Strategy'], info['Port'], info['Protocol']) stdout, stderr = public.ExecShell(cmd) if stderr: if "setlocale" in stderr: return self._result(True, public.lang("The port rule was successfully configured")) return self._result(False, public.lang("Failed to set the output port rule:{}",stderr)) return self._result(True, public.lang("The output port rule is set successfully")) except Exception as e: if "setlocale" in str(e): return self._result(True, public.lang("The output port rule is set successfully")) return self._result(False, public.lang("Failed to set the output port rule:{}",str(e))) # 2024/3/19 下午 4:58 复杂一些的规则管理 def rich_rules(self, info, operation): ''' @name 复杂一些的规则管理 @author wzz <2024/3/19 下午 4:58> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} ''' try: if info["Strategy"] == "accept": info["Strategy"] = "allow" elif info["Strategy"] == "drop": info["Strategy"] = "deny" else: return self._result(False, public.lang("Unknown policy parameters:{}",info["Strategy"])) if info.get('Port', "") != "": if info["Port"].find('-') != -1: info["Port"] = info["Port"].replace('-', ':') rule_str = "{} insert 1 {} ".format(self.cmd_str, info["Strategy"]) if "Address" in info and public.is_ipv6(info['Address']): rule_str = "{} {} ".format(self.cmd_str, info["Strategy"]) if operation == "remove": rule_str = "{} delete {} ".format(self.cmd_str, info["Strategy"]) if "Address" in info and info['Address'] != "all": rule_str += "from {} ".format(info['Address']) if len(info.get("Protocol", "")) != 0 and "/" not in info['Protocol']: rule_str += "proto {} ".format(info['Protocol']) if len(info.get("Port", "")) != 0: rule_str += "to any port {} ".format(info['Port']) stdout, stderr = public.ExecShell(rule_str) if stderr: if "Rule added" in stdout or "Rule deleted" in stdout or "Rule updated" in stdout or "Rule inserted" in stdout or "Skipping adding existing rule" in stdout: return self._result(True, public.lang("The rule is set successfully")) if "setlocale" in stderr: return self._result(True, public.lang("The rule is set successfully")) return self._result(False, public.lang("The rule setup failed:{}",stderr)) return self._result(True, public.lang("The rule is set successfully")) except Exception as e: public.print_log(public.get_error_info()) if "setlocale" in str(e): return self._result(True, public.lang("The rule is set successfully")) return self._result(False, public.lang("The rule setup failed:{}",e)) # 2024/3/24 下午 11:29 设置output rich_rules def output_rich_rules(self, info, operation): ''' @name 设置output rich_rules @param info: 规则 @param operation: 操作 @return None ''' try: if info["Strategy"] == "accept": info["Strategy"] = "allow" elif info["Strategy"] == "drop": info["Strategy"] = "deny" else: return self._result(False, public.lang("Unknown strategy: {}",info["Strategy"])) rule_str = "{} insert 1 {} ".format(self.cmd_str, info["Strategy"]) if "Address" in info and public.is_ipv6(info['Address']): rule_str = "{} {} ".format(self.cmd_str, info["Strategy"]) if operation == "remove": rule_str = "{} delete {} ".format(self.cmd_str, info["Strategy"]) if len(info.get("Address", "")) != 0: rule_str += "out from {} ".format(info['Address']) if len(info.get("Protocol", "")) != 0: rule_str += "proto {} ".format(info['Protocol']) if len(info.get("Port", "")) != 0: rule_str += "to any port {} ".format(info['Port']) stdout, stderr = public.ExecShell(rule_str) if stderr: if "Rule added" in stdout or "Rule deleted" in stdout or "Rule updated" in stdout or "Rule inserted" in stdout or "Skipping adding existing rule" in stdout: return self._result(True, public.lang("The output rule is set successfully")) if "setlocale" in stderr: return self._result(True, public.lang("The rule is set successfully")) return self._result(False, public.lang("outpuThe rule setup failed:{}",stderr)) return self._result(True, public.lang("The output rule is set successfully")) except Exception as e: if "setlocale" in str(e): return self._result(True, public.lang("The output rule is set successfully")) return self._result(False, public.lang("outpuThe rule setup failed:{}",e)) # 2024/3/19 下午 5:01 解析防火墙规则信息,返回字典格式数据,用于添加或删除防火墙规则 def _load_info(self, line, fire_type): ''' @name 解析防火墙规则信息,返回字典格式数据,用于添加或删除防火墙规则 @author wzz <2024/3/19 上午 10:38> @param "data":{"参数名":""} <数据类型> 参数描述 @return dict{"status":True/False,"msg":"提示信息"} ''' fields = line.split() item_info = {} if "LIMIT" in line or "ALLOW FWD" in line: return item_info if len(fields) < 4: return item_info if fields[0] == "Anywhere" and fire_type != "port": item_info["Strategy"] = "drop" if fields[1] != "(v6)": if fields[1] == "ALLOW": item_info["Strategy"] = "accept" if fields[2] == "IN": item_info["Chain"] = "INPUT" elif fields[2] == "OUT": item_info["Chain"] = "OUTPUT" item_info["Address"] = fields[3] if fields[3] != "Anywhere" else "all" item_info["Family"] = "ipv4" else: if fields[2] == "ALLOW": item_info["Strategy"] = "accept" if fields[3] == "IN": item_info["Chain"] = "INPUT" elif fields[3] == "OUT": item_info["Chain"] = "OUTPUT" item_info["Address"] = fields[4] if fields[4] != "Anywhere" else "all" item_info["Family"] = "ipv6" return item_info if "/" in fields[0]: item_info["Port"] = fields[0].split("/")[0] item_info["Protocol"] = fields[0].split("/")[1] else: item_info["Port"] = fields[0] item_info["Protocol"] = "tcp/udp" if "v6" in fields[1]: item_info["Family"] = "ipv6" if fields[2] == "ALLOW": item_info["Strategy"] = "accept" else: item_info["Strategy"] = "drop" if fields[3] == "IN": item_info["Chain"] = "INPUT" elif fields[3] == "OUT": item_info["Chain"] = "OUTPUT" item_info["Address"] = fields[4] if fields[4] != "Anywhere" else "all" else: item_info["Family"] = "ipv4" if ":" not in fields[3] else "ipv6" if fields[1] == "ALLOW": item_info["Strategy"] = "accept" else: item_info["Strategy"] = "drop" if fields[2] == "IN": item_info["Chain"] = "INPUT" elif fields[2] == "OUT": item_info["Chain"] = "OUTPUT" item_info["Address"] = fields[3] if fields[3] != "Anywhere" else "all" return item_info # 2024/3/25 下午 2:29 设置端口转发 def port_forward(self, info, operation): ''' @name 设置端口转发 @param port: 端口号 @param ip: ip地址 @param operation: 操作 @return None ''' from firewallModelV2.app.iptables import Iptables self.firewall = Iptables() return self.firewall.port_forward(info, operation) # 2024/3/25 下午 2:34 获取所有端口转发列表 def list_port_forward(self): ''' @name 获取所有端口转发列表 @return None ''' from firewallModelV2.app.iptables import Iptables self.firewall = Iptables() return self.firewall.get_nat_prerouting_rules() if __name__ == '__main__': args = sys.argv firewall = Ufw() ufw_status = firewall.status() if len(args) < 2: print("Welcome to the UFW (Uncomplicated Firewall) command-line interface!") print("Firewall status is :", ufw_status) print("Firewall version: ", firewall.version()) if ufw_status == "not running": print("ufw is not started, please start ufw and then execute the command!") print("Start the command: start") print() sys.exit(1) print() print("Available options:") print("1. Check Firewall Status: status") print("2. Check Firewall Version: version") print("3. Start Firewall: start") print("4. Stop Firewall: stop") print("5. Restart Firewall: restart") print("6. Reload Firewall: reload") print("7. List All Ports: list_port") print("8. List All IP Addresses: list_address") print("9. Add Port: add_port <port> <protocol>") print("10. Remove Port: remove_port <port> <protocol>") print("11. Add Port Rule: add_port_rule <address> <port> <protocol> <strategy> <operation>") print("12. Remove Port Rule: remove_port_rule <address> <port> <protocol> <strategy> <operation>") print("13. Add IP Rule: add_ip_rule <address> <strategy> <operation>") print("14. Remove IP Rule: remove_ip_rule <address> <strategy> <operation>") print() sys.exit(1) if args[1] == "status": print(firewall.status()) elif args[1] == "version": print(firewall.version()) elif args[1] == "start": error = firewall.start() if error: print(f"Error: {error}") else: print("Firewall started successfully.") elif args[1] == "stop": error = firewall.stop() if error: print(f"Error: {error}") else: print("Firewall stopped successfully.") elif args[1] == "restart": error = firewall.restart() if error: print(f"Error: {error}") else: print("Firewall restarted successfully.") elif args[1] == "reload": error = firewall.reload() if error: print(f"Error: {error}") else: print("Firewall reloaded successfully.") elif args[1] == "list_input_port": ports = firewall.list_input_port() for p in ports: print(p) elif args[1] == "list_output_port": ports = firewall.list_output_port() for p in ports: print(p) elif args[1] == "list_input_address": addresses = firewall.list_input_address() for a in addresses: print(a) elif args[1] == "list_output_address": addresses = firewall.list_output_address() for a in addresses: print(a) elif args[1] == "add_port": port = args[2] protocol = args[3] error = firewall.input_port(f"{port}/{protocol}", "allow") if error: print(f"Error: {error}") else: print(f"Port {port}/{protocol} added successfully.") elif args[1] == "remove_port": port = args[2] protocol = args[3] error = firewall.input_port(f"{port}/{protocol}", "remove") if error: print(f"Error: {error}") else: print(f"Port {port}/{protocol} removed successfully.") elif args[1] == "add_port_rule": address = args[2] port = args[3] protocol = args[4] strategy = args[5] operation = args[6] error = firewall.rich_rules( {"Address": address, "Port": port, "Protocol": protocol, "Strategy": strategy}, operation) if error: print(f"Error: {error}") else: print("Rich rule added successfully.") elif args[1] == "remove_port_rule": address = args[2] port = args[3] protocol = args[4] strategy = args[5] operation = args[6] error = firewall.rich_rules( {"Address": address, "Port": port, "Protocol": protocol, "Strategy": strategy}, operation) if error: print(f"Error: {error}") else: print("Rich rule removed successfully.") elif args[1] == "add_ip_rule": address = args[2] strategy = args[3] operation = args[4] error = firewall.rich_rules( {"Address": address, "Strategy": strategy}, operation) if error: print(f"Error: {error}") else: print("Rich rule added successfully.") elif args[1] == "remove_ip_rule": address = args[2] strategy = args[3] operation = args[4] error = firewall.rich_rules( {"Address": address, "Strategy": strategy}, operation) if error: print(f"Error: {error}") else: print("Rich rule removed successfully.") elif args[1] == "output_port": port = args[2] operation = args[3] error = firewall.output_port(port, operation) if error: print(f"Error: {error}") else: print(f"Output port {port} {operation} successfully.") elif args[1] == "output_rich_rules": address = args[2] port = args[3] protocol = args[4] strategy = args[5] operation = args[6] error = firewall.output_rich_rules( {"Address": address, "Port": port, "Protocol": protocol, "Strategy": strategy}, operation) if error: print(f"Error: {error}") else: print("Output rich rule added successfully.") else: print("Invalid args") sys.exit(1)
Close