Linux ip-172-26-2-223 5.4.0-1018-aws #18-Ubuntu SMP Wed Jun 24 01:15:00 UTC 2020 x86_64
Apache
: 172.26.2.223 | : 3.147.8.67
Cant Read [ /etc/named.conf ]
8.1.13
www
www.github.com/MadExploits
Terminal
AUTO ROOT
Adminer
Backdoor Destroyer
Linux Exploit
Lock Shell
Lock File
Create User
CREATE RDP
PHP Mailer
BACKCONNECT
UNLOCK SHELL
HASH IDENTIFIER
CPANEL RESET
CREATE WP USER
BLACK DEFEND!
README
+ Create Folder
+ Create File
/
www /
server /
panel /
class_v2 /
databaseModelV2 /
[ HOME SHELL ]
Name
Size
Permission
Action
__init__.py
1
B
-rw-r--r--
base.py
21.68
KB
-rw-r--r--
mongodbModel.py
38.86
KB
-rw-r--r--
pgsqlModel.py
31.87
KB
-rw-r--r--
redisModel.py
20.5
KB
-rw-r--r--
sqliteModel.py
667
B
-rw-r--r--
sqlserverModel.py
26.34
KB
-rw-r--r--
Delete
Unzip
Zip
${this.title}
Close
Code Editor : pgsqlModel.py
# coding: utf-8 # ------------------------------------------------------------------- # aaPanel # ------------------------------------------------------------------- # Copyright (c) 2015-2099 aaPanel(www.aapanel.com) All rights reserved. # ------------------------------------------------------------------- # Author: hezhihong <bt_ahong@aapanel.com> # ------------------------------------------------------------------- # ------------------------------ # postgresql模型 # ------------------------------ import os, re, json, time from databaseModelV2.base import databaseBase import public try: from BTPanel import session except: pass try: import psycopg2 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT except: pass class panelPgsql: __DB_PASS = None __DB_USER = 'postgres' __DB_PORT = 5432 __DB_HOST = 'localhost' __DB_CONN = None __DB_CUR = None __DB_ERR = None __DB_CLOUD = 0 # 远程数据库 def __init__(self): self.__DB_CLOUD = 0 if self.__DB_USER == 'postgres' and self.__DB_HOST == 'localhost' and self.__DB_PASS == None: tmp_args = public.dict_obj() tmp_args.is_True = True self.__DB_PASS = main().get_root_pwd(tmp_args) def set_host(self, host, port, name, username, password, prefix=''): self.__DB_HOST = host self.__DB_PORT = int(port) self.__DB_NAME = name if self.__DB_NAME: self.__DB_NAME = str(self.__DB_NAME) self.__DB_USER = str(username) self._USER = str(username) self.__DB_PASS = str(password) self.__DB_PREFIX = prefix self.__DB_CLOUD = 1 return self def check_psycopg(self): """ @name检测依赖是否正常 """ try: import psycopg2 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT except: os.system('btpip install psycopg2-binary') try: import psycopg2 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT except: return False return True # 连接MYSQL数据库 def __Conn(self): self.check_psycopg() try: import psycopg2 except: self.__DB_ERR = public.get_error_info() return False try: if self.__DB_USER == 'postgres' and self.__DB_HOST == 'localhost': if not self.__DB_PASS: tmp_args = public.dict_obj() try: self.__DB_PASS == main().get_root_pwd(tmp_args)['msg'] except: pass self.__DB_CONN = psycopg2.connect(user=self.__DB_USER, password=self.__DB_PASS, host=self.__DB_HOST, port=self.__DB_PORT) self.__DB_CONN.autocommit = True self.__DB_CONN.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) # <-- ADD THIS LINE self.__DB_CUR = self.__DB_CONN.cursor() return True except: self.__DB_ERR = public.get_error_info() print(self.__DB_ERR) return False def execute(self, sql): # 执行SQL语句返回受影响行 if not self.__Conn(): return self.__DB_ERR try: # print(sql) result = self.__DB_CUR.execute(sql) self.__DB_CONN.commit() self.__Close() return result except Exception as ex: return ex def query(self, sql): # 执行SQL语句返回数据集 if not self.__Conn(): return self.__DB_ERR try: self.__DB_CUR.execute(sql) result = self.__DB_CUR.fetchall() data = list(map(list, result)) self.__Close() return data except Exception as ex: return ex # 关闭连接 def __Close(self): self.__DB_CUR.close() self.__DB_CONN.close() class main(databaseBase, panelPgsql): __ser_name = None __soft_path = '/www/server/pgsql' __setup_path = '/www/server/panel/' __dbuser_info_path = "{}plugin/pgsql_manager_dbuser_info.json".format(__setup_path) __plugin_path = "{}plugin/pgsql_manager/".format(__setup_path) def __init__(self): s_path = public.get_setup_path() v_info = public.readFile("{}/pgsql/version.pl".format(s_path)) if v_info: ver = v_info.split('.')[0] self.__ser_name = 'postgresql-x64-{}'.format(ver) self.__soft_path = '{}/pgsql/{}'.format(s_path) # 获取配置项 def get_options(self, get): data = {} options = ['port', 'listen_addresses'] if not self.__soft_path: self.__soft_path = '{}/pgsql'.format(public.get_setup_path()) conf = public.readFile('{}/data/postgresql.conf'.format(self.__soft_path)) for opt in options: tmp = re.findall(r"\s+" + opt + r"\s*=\s*(.+)#", conf) if not tmp: continue; data[opt] = tmp[0].strip() if opt == 'listen_addresses': data[opt] = data[opt].replace('\'', '') data['password'] = self.get_root_pwd(None)['msg'] # return data return public.return_message(0, 0, data) def get_list(self, args): """ @获取数据库列表 @sql_type = pgsql """ # return self.get_base_list(args, sql_type = 'pgsql') return public.return_message(0, 0, self.get_base_list(args, sql_type='pgsql')) def get_sql_obj_by_sid(self, sid=0, conn_config=None): """ @取pgsql数据库对像 By sid @sid 数据库分类,0:本地 """ if type(sid) == str: try: sid = int(sid) except: sid = 0 if sid: if not conn_config: conn_config = public.M('database_servers').where("id=?", sid).find() db_obj = panelPgsql() try: db_obj = db_obj.set_host(conn_config['db_host'], conn_config['db_port'], None, conn_config['db_user'], conn_config['db_password']) except Exception as e: raise public.PanelError(e) else: db_obj = panelPgsql() return db_obj # return public.return_message(0, 0, db_obj) def get_sql_obj(self, db_name): """ @取pgsql数据库对象 @db_name 数据库名称 """ is_cloud_db = False if db_name: db_find = public.M('databases').where("name=?", db_name).find() if db_find['sid']: # return self.get_sql_obj_by_sid(db_find['sid']) return public.return_message(0, 0, self.get_sql_obj_by_sid(db_find['sid'])) is_cloud_db = db_find['db_type'] in ['1', 1] if is_cloud_db: db_obj = panelPgsql() conn_config = json.loads(db_find['conn_config']) try: db_obj = db_obj.set_host(conn_config['db_host'], conn_config['db_port'], conn_config['db_name'], conn_config['db_user'], conn_config['db_password']) except Exception as e: raise public.PanelError(e) else: db_obj = panelPgsql() # return db_obj return public.return_message(0, 0, db_obj) def GetCloudServer(self, args): ''' @name 获取远程服务器列表 @author hwliang<2021-01-10> @return list ''' check_result = os.system('/www/server/pgsql/bin/psql --version') if check_result != 0 and not public.M('database_servers').where("LOWER(db_type)=LOWER('pgsql')", ()).count(): # return [] return public.return_message(0, 0, []) # return self.GetBaseCloudServer(args) return public.return_message(0, 0, self.GetBaseCloudServer(args)) def AddCloudServer(self, args): ''' @添加远程数据库 ''' return self.AddBaseCloudServer(args) # return public.return_message(0, 0, self.AddBaseCloudServer(args)) def RemoveCloudServer(self, args): ''' @删除远程数据库 ''' return self.RemoveBaseCloudServer(args) # return public.return_message(0, 0, self.RemoveBaseCloudServer(args)) def ModifyCloudServer(self, args): ''' @修改远程数据库 ''' return self.ModifyBaseCloudServer(args) def AddDatabase(self, args): """ @添加数据库 """ # try: if not args.get('name/str', 0): # rreturn public.returnMsg(False, public.lang("Database name cannot be empty!")) return public.return_message(-1, 0, public.lang("Database name cannot be empty!")) import re test_str = re.search(r"\W", args.name) if test_str != None: # rreturn public.returnMsg(False, public.lang("The database name cannot contain special characters")) return public.return_message(-1, 0, public.lang("The database name cannot contain special characters")) res = self.add_base_database(args) if not res['status']: # return res return public.return_message(-1, 0, res['msg']) data_name = res['data_name'] username = res['username'] password = res['data_pwd'] try: self.sid = int(args['sid']) except: self.sid = 0 dtype = 'PgSql' sql_obj = self.get_sql_obj_by_sid(self.sid) result = sql_obj.execute("CREATE DATABASE {};".format(data_name)) isError = self.IsSqlError(result) if isError != None: return isError # return public.return_message(-1, 0, isError) # 添加用户 self.__CreateUsers(data_name, username, password, '127.0.0.1') if not hasattr(args, 'ps'): args['ps'] = public.getMsg('INPUT_PS'); addTime = time.strftime('%Y-%m-%d %X', time.localtime()) pid = 0 if hasattr(args, 'pid'): pid = args.pid if hasattr(args, 'contact'): site = public.M('sites').where("id=?", (args.contact,)).field('id,name').find() if site: pid = int(args.contact) args['ps'] = site['name'] db_type = 0 if self.sid: db_type = 2 public.set_module_logs('pgsql', 'AddDatabase', 1) # 添加入SQLITE public.M('databases').add('pid,sid,db_type,name,username,password,accept,ps,addtime,type', ( pid, self.sid, db_type, data_name, username, password, '127.0.0.1', args['ps'], addTime, dtype)) public.WriteLog("TYPE_DATABASE", 'DATABASE_ADD_SUCCESS', (data_name,)) # return public.returnMsg(True, public.lang("Added successfully!")) return public.return_message(0, 0, public.lang("Added successfully!")) # except Exception as ex: # public.print_log("error info55: {}".format(ex)) # return public.return_message(-1, 0, str(ex)) def DeleteDatabase(self, get): """ @删除数据库 """ id = get['id'] find = public.M('databases').where("id=?", (id,)).field( 'id,pid,name,username,password,accept,ps,addtime,db_type,conn_config,sid,type').find(); if not find: # return public.returnMsg(False, public.lang("The specified database does not exist.")) return public.return_message(-1, 0, public.lang("The specified database does not exist.")) name = get['name'] username = find['username'] sql_obj = self.get_sql_obj_by_sid(find['sid']) result = sql_obj.execute("drop database {};".format(name)) sql_obj.execute("drop user {};".format(username)) # 删除SQLITE public.M('databases').where("id=?", (id,)).delete() public.WriteLog("TYPE_DATABASE", 'DATABASE_DEL_SUCCESS', (name,)) # return public.returnMsg(True, public.lang("Delete successfully!")) return public.return_message(0, 0, public.lang("Delete successfully!")) def ToBackup(self, args): """ @备份数据库 id 数据库id """ id = args['id'] find = public.M('databases').where("id=?", (id,)).find() if not find: # return public.returnMsg(False, public.lang("Database does not exist!")) return public.return_message(-1, 0, public.lang("Database does not exist!")) if not find['password'].strip(): # return public.returnMsg(False, public.lang("The database password is empty. Set the password first.")) return public.return_message(-1, 0, public.lang("The database password is empty. Set the password first.")) sql_dump = '{}/bin/pg_dump'.format(self.__soft_path) # return sql_dump if not os.path.isfile(sql_dump): # return public.returnMsg(False, public.lang("Lack of backup tools, please first through the software store PGSQL manager!")) return public.return_message(-1, 0, public.lang("Lack of backup tools, please first through the software store PGSQL manager!")) back_path = session['config']['backup_path'] + '/database/pgsql/' # return back_path if not os.path.exists(back_path): os.makedirs(back_path) fileName = find['name'] + '_' + time.strftime('%Y%m%d_%H%M%S', time.localtime()) + '.sql' backupName = back_path + fileName if int(find['sid']): info = self.get_info_by_db_id(id) shell = '{} "host={} port={} user={} dbname={} password={}" > {}'.format(sql_dump, info['db_host'], info['db_port'], info['db_user'], find['name'], info['db_password'], backupName) else: args_one = public.dict_obj() port = self.get_port(args_one) shell = '{} "host=127.0.0.1 port={} user={} dbname={} password={}" > {}'.format(sql_dump, port['data'], find['username'], find['name'], find['password'], backupName) ret = public.ExecShell(shell) if not os.path.exists(backupName): # return public.returnMsg(False, public.lang("Backup error")); return public.return_message(-1, 0, public.lang("Backup error")) public.M('backup').add('type,name,pid,filename,size,addtime', (1, fileName, id, backupName, 0, time.strftime('%Y-%m-%d %X', time.localtime()))) public.WriteLog("TYPE_DATABASE", "DATABASE_BACKUP_SUCCESS", (find['name'],)) if os.path.getsize(backupName) < 2048: # return public.returnMsg(True, public.lang("The backup file size is smaller than 2Kb. Check the backup integrity.")) return public.return_message(0, 0, public.lang("The backup file size is smaller than 2Kb. Check the backup integrity.")) else: # return public.returnMsg(True, public.lang("Backup Succeeded!")) return public.return_message(0, 0, public.lang("Backup Succeeded!")) def DelBackup(self, args): """ @删除备份文件 """ return self.delete_base_backup(args) # return public.return_message(0, 0, self.delete_base_backup(args)) def get_port(self, args): # 获取端口号 str_shell = '''netstat -luntp|grep postgres|head -1|awk '{print $4}'|awk -F: '{print $NF}' ''' try: port = public.ExecShell(str_shell)[0] if port.strip(): return {'data': port.strip(), "status": True} else: return {'data': 5432, "status": False} except: return {'data': 5432, "status": False} # 导入 def InputSql(self, get): name = get.name file = get.file # return name find = public.M('databases').where("name=?", (name,)).find() if not find: # return public.returnMsg(False, public.lang("Database does not exist!")) return public.return_message(-1, 0, public.lang("Database does not exist!")) # return find if not find['password'].strip(): # return public.returnMsg(False, public.lang("The database password is empty. Set the password first.")) return public.return_message(-1, 0, public.lang("The database password is empty. Set the password first.")) tmp = file.split('.') exts = ['sql'] ext = tmp[len(tmp) - 1] if ext not in exts: # return public.returnMsg(False, public.lang("DATABASE_INPUT_ERR_FORMAT")) return public.return_message(-1, 0, public.lang("Select sql、gz、zip file!")) sql_dump = '{}/bin/psql'.format(self.__soft_path) if not os.path.exists(sql_dump): # return public.returnMsg(False, public.lang("Lack of recovery tool, please use software management to install PGSQL!")) return public.return_message(-1, 0, public.lang("Lack of recovery tool, please use software management to install PGSQL!")) if int(find['sid']): info = self.get_info_by_db_id(find['id']) shell = '{} "host={} port={} user={} dbname={} password={}" < {}'.format(sql_dump, info['db_host'], info['db_port'], info['db_user'], find['name'], info['db_password'], file) else: args_one = public.dict_obj() port = self.get_port(args_one) shell = '{} "host=127.0.0.1 port={} user={} dbname={} password={}" < {}'.format(sql_dump, port['data'], find['username'], find['name'], find['password'], file) ret = public.ExecShell(shell) public.WriteLog("TYPE_DATABASE", 'Description Succeeded in importing database [{}]'.format(name)) # return public.returnMsg(True, public.lang("DATABASE_INPUT_SUCCESS")); return public.return_message(0, 0, public.lang("Successfully imported database!")) def SyncToDatabases(self, get): """ @name同步数据库到服务器 """ tmp_type = int(get['type']) n = 0 sql = public.M('databases') if tmp_type == 0: where = "lower(type) = lower('pgsql')" # data = sql.field('id,name,username,password,accept,type,sid,db_type').where('type=?',('pgsql',)).select() data = sql.field('id,name,username,password,accept,type,sid,db_type').where(where, ()).select() print(data) for value in data: if value['db_type'] in ['1', 1]: continue # 跳过远程数据库 result = self.ToDataBase(value) if result == 1: n += 1 else: import json data = json.loads(get.ids) for value in data: find = sql.where("id=?", (value,)).field('id,name,username,password,sid,db_type,accept,type').find() result = self.ToDataBase(find) if result == 1: n += 1 if n == 1: # return public.returnMsg(True, public.lang("Synchronization succeeded")) return public.return_message(0, 0, public.lang("Synchronization succeeded")) elif n == 0: # return public.returnMsg(False, public.lang("Sync failed")) return public.return_message(-1, 0, public.lang("Sync failed")) # return public.returnMsg(True, 'DATABASE_SYNC_SUCCESS', (str(n),)) return public.return_message(0, 0, public.lang("DATABASE_SYNC_SUCCESS {}", n)) def ToDataBase(self, find): """ @name 添加到服务器 """ if find['username'] == 'bt_default': return 0 if len(find['password']) < 3: find['username'] = find['name'] find['password'] = public.md5(str(time.time()) + find['name'])[0:10] public.M('databases').where("id=?", (find['id'],)).save('password,username', (find['password'], find['username'])) self.sid = find['sid'] sql_obj = self.get_sql_obj_by_sid(self.sid) result = sql_obj.execute("CREATE DATABASE {};".format(find['name'])) isError = self.IsSqlError(result) if isError != None and isError['status'] == False and isError[ 'msg'] == 'The specified database already exists, please do not add it repeatedly.': return 1 self.__CreateUsers(find['name'], find['username'], find['password'], '127.0.0.1') return 1 def SyncGetDatabases(self, get): """ @name 从服务器获取数据库 @param sid 0为本地数据库 1为远程数据库 """ n = 0; s = 0; db_type = 0 self.sid = get.get('sid/d', 0) if self.sid: db_type = 2 sql_obj = self.get_sql_obj_by_sid(self.sid) data = sql_obj.query('SELECT datname FROM pg_database;') # select * from pg_database order by datname; isError = self.IsSqlError(data) if isError != None: return isError # return public.return_message(-1, 0, isError) if type(data) == str: # return public.returnMsg(False, data) return public.return_message(-1, 0, data) sql = public.M('databases') nameArr = ['information_schema', 'postgres', 'template1', 'template0', 'performance_schema', 'mysql', 'sys', 'master', 'model', 'msdb', 'tempdb', 'ReportServerTempDB', 'YueMiao', 'ReportServer'] for item in data: dbname = item[0] if sql.where("name=?", (dbname,)).count(): continue if not dbname in nameArr: if sql.table('databases').add('name,username,password,accept,ps,addtime,type,sid,db_type', ( dbname, dbname, '', '', public.getMsg('INPUT_PS'), time.strftime('%Y-%m-%d %X', time.localtime()), 'pgsql', self.sid, db_type)): n += 1 # return public.returnMsg(True, 'DATABASE_GET_SUCCESS', (str(n),)) return public.return_message(0, 0, public.lang("DATABASE_GET_SUCCESS {}", n)) def ResDatabasePassword(self, args): """ @修改用户密码 """ id = args['id'] username = args['name'].strip() newpassword = public.trim(args['password']) if not newpassword: # return public.returnMsg(False, public.lang("The database password cannot be empty.")); return public.return_message(-1, 0, public.lang("The database password cannot be empty")) find = public.M('databases').where("id=?", (id,)).field( 'id,pid,name,username,password,type,accept,ps,addtime,sid').find() if not find: # return public.returnMsg(False, public.lang("Modify the failure,The specified database does not exist.")); return public.return_message(-1, 0, public.lang("Modify the failure,The specified database does not exist")) sql_obj = self.get_sql_obj_by_sid(find['sid']) result = sql_obj.execute("alter user {} with password '{}';".format(username, newpassword)) isError = self.IsSqlError(result) if isError != None: return isError # return public.return_message(-1, 0, isError) # 修改SQLITE public.M('databases').where("id=?", (id,)).setField('password', newpassword) public.WriteLog("TYPE_DATABASE", 'DATABASE_PASS_SUCCESS', (find['name'],)) # return public.returnMsg(True, 'DATABASE_PASS_SUCCESS', (find['name'],)) return public.return_message(0, 0, public.lang("DATABASE_PASS_SUCCESS {}", find['name'],)) def get_root_pwd(self, args): """ @获取sa密码 """ check_result = os.system('/www/server/pgsql/bin/psql --version') if check_result != 0: # return public.returnMsg(False, public.lang("If PgSQL is not installed or started, install or start it first")) return public.return_message(-1, 0, public.lang("If PgSQL is not installed or started, install or start it first")) password = '' path = '{}/data/postgresAS.json'.format(public.get_panel_path()) if os.path.isfile(path): try: password = json.loads(public.readFile(path))['password'] print('333333333') print(password) except: pass if 'is_True' in args and args.is_True: # return password return public.return_message(-1, 0, password) # return public.returnMsg(True, password) return public.return_message(0, 0, password) def set_root_pwd(self, args): """ @设置sa密码 """ password = public.trim(args['password']) if len(password) < 8: # return public.returnMsg(False, public.lang("The password must not be less than 8 digits.")) return public.return_message(-1, 0, public.lang("The password must not be less than 8 digits.")) check_result = os.system('/www/server/pgsql/bin/psql --version') if check_result != 0: # return public.returnMsg(False, public.lang("If PgSQL is not installed or started, install or start it first")) return public.return_message(-1, 0, public.lang("If PgSQL is not installed or started, install or start it first")) sql_obj = self.get_sql_obj_by_sid('0') data = sql_obj.query('SELECT datname FROM pg_database;') isError = self.IsSqlError(data) if isError != None: return isError # return public.return_message(-1, 0, isError) path = '{}/data/pg_hba.conf'.format(self.__soft_path) p_path = '{}/data/postgresAS.json'.format(public.get_panel_path()) if not os.path.isfile(path): # return public.returnMsg(False, public.lang("{}File does not exist, please check the installation is complete!", path)) return public.return_message(-1, 0, public.lang("{}File does not exist, please check the installation is complete!", path)) src_conf = public.readFile(path) add_conf = src_conf.replace('md5', 'trust') # public.writeFile(path,public.readFile(path).replace('md5','trust')) public.writeFile(path, add_conf) pg_obj = panelPgsql() pg_obj.execute("ALTER USER postgres WITH PASSWORD '{}';".format(password)) data = {"username": "postgres", "password": ""} try: data = json.loads(public.readFile(p_path)) except: pass data['password'] = password public.writeFile(p_path, json.dumps(data)) public.writeFile(path, src_conf) # return public.returnMsg(True, public.lang("The administrator password is successfully changed. Procedure.")) return public.return_message(0, 0, public.lang("The administrator password is successfully changed. Procedure.")) def get_info_by_db_id(self, db_id): """ @获取数据库连接详情 @db_id 数据库id """ # print(db_id,'111111111111') find = public.M('databases').where("id=?", db_id).find() # return find if not find: return False # print(find) data = { 'db_host': '127.0.0.1', 'db_port': 5432, 'db_user': find['username'], 'db_password': find['password'] } if int(find['sid']): conn_config = public.M('database_servers').where("id=?", find['sid']).find() data['db_host'] = conn_config['db_host'] data['db_port'] = int(conn_config['db_port']) return data def get_database_size_by_id(self, args): """ @获取数据库尺寸(批量删除验证) @args json/int 数据库id """ total = 0 db_id = args if not isinstance(args, int): db_id = args['db_id'] try: name = public.M('databases').where('id=?', db_id).getField('name') sql_obj = self.get_sql_obj(name) tables = sql_obj.query( "select name,size,type from sys.master_files where type=0 and name = '{}'".format(name)) total = tables[0][1] if not total: total = 0 except: pass return total def check_del_data(self, args): """ @删除数据库前置检测 """ # return self.check_base_del_data(args) return public.return_message(0, 0, self.check_base_del_data(args)) # 本地创建数据库 def __CreateUsers(self, data_name, username, password, address): """ @创建数据库用户 """ sql_obj = self.get_sql_obj_by_sid(self.sid) sql_obj.execute("CREATE USER {} WITH PASSWORD '{}';".format(username, password)) sql_obj.execute("GRANT ALL PRIVILEGES ON DATABASE {} TO {};".format(data_name, username)) return True def __get_db_list(self, sql_obj): """ 获取pgsql数据库列表 """ data = [] ret = sql_obj.query('SELECT datname FROM pg_database;') if type(ret) == list: for x in ret: data.append(x[0]) return data def check_cloud_database_status(self, conn_config): """ @检测远程数据库是否连接 @conn_config 远程数据库配置,包含host port pwd等信息 """ try: if not 'db_name' in conn_config: conn_config['db_name'] = None sql_obj = panelPgsql().set_host(conn_config['db_host'], conn_config['db_port'], conn_config['db_name'], conn_config['db_user'], conn_config['db_password']) data = sql_obj.query("SELECT datname FROM pg_database;") if type(data) == str: # return public.returnMsg(False, public.lang("Connecting to remote PGSQL fails. Perform the following operations to rectify the fault:<br/>1、The database port is correct and the firewall allows access<br/>2、Check whether the database account password is correct<br/>3、pg_hba.confWhether to add a client release record<br/>4、postgresql.conf Add listen_addresses to the correct server IP address.")) return public.return_message(-1, 0, public.lang("Connecting to remote PGSQL fails. Perform the following operations to rectify the fault:<br/>1、The database port is correct and the firewall allows access<br/>2、Check whether the database account password is correct<br/>3、pg_hba.confWhether to add a client release record<br/>4、postgresql.conf Add listen_addresses to the correct server IP address.")) if not conn_config['db_name']: # return True return public.return_message(0, 0, True) for i in data: if i[0] == conn_config['db_name']: # return True return public.return_message(0, 0, True) # return public.returnMsg(False, public.lang("The specified database does not exist!")) return public.return_message(-1, 0, public.lang("The specified database does not exist!")) except Exception as ex: # return public.returnMsg(False, ex) return public.return_message(-1, 0, ex)
Close