Source code for expliot_finder.main_executor
"""Main executor of 'exploit-finder' module."""
__all__ = ("ExploitFinderExecutor",)
import asyncio
from collections import namedtuple
from rich import console, table
from expliot_finder.vulnerability_scanner import VulnerabilityScannerExecutor
from expliot_finder.scraper import FindExploit
from expliot_finder.vulnerability_scanner.ui import display_scanning_progress
# TODO Needs to be refactored in the feature
[docs]class ExploitFinderExecutor:
"""This class collect all submodules calls."""
__slots__ = ("_filtered_kw", "founded_vulnerabilities", "output_table",)
def __init__(self) -> None:
"""Init ExploitFinderExecutor class."""
self.filtered_kw: dict[str, str] = {}
self.founded_vulnerabilities: dict[str, str] = {}
self.output_table = table.Table(show_lines=True)
def __call__(self, *args, **kwargs) -> None:
"""Call in right order vulnerability scanners and exploits finder against chosen target."""
self.scan_selected_device()
self.find_exploit()
self.create_output_tb()
self.show_final_output()
def __repr__(self) -> str:
"""Print class name and class attributes.
Returns:
'ExploitFinderExecutor' as the class name and attributes of
this class.
"""
return f"{self.__class__.__name__}({vars(self)!r})"
@property
def filtered_kw(self) -> dict[str, str]:
"""Return the filtered parameters provided by the user."""
return self._filtered_kw
@filtered_kw.setter
def filtered_kw(self, cli_kwargs: dict[str, str]) -> None:
"""Filter command line keyword arguments provided by user.
Args:
cli_kwargs:
Command line keyword arguments provided by user.
"""
self._filtered_kw = {k: v for k, v in cli_kwargs.items() if v is not None}
[docs] def scan_selected_device(self):
"""Run scanners in order to find out what vulnerabilities the selected device to scann has.
Save results into 'target_vulnerability'
"""
self.founded_vulnerabilities = asyncio.run(
VulnerabilityScannerExecutor(**self.filtered_kw)(display_scanning_progress))
[docs] def find_exploit(self):
"""Based on the vulnerabilities found, the scrapper will find suitable exploits."""
cve_urls, exploit_urls = [], []
for index_, collected_info in enumerate(
self.founded_vulnerabilities["ports_services"]):
if collected_info.service_version != "Unknown":
cve_urls, exploit_urls = asyncio.run(
FindExploit(service_version=collected_info.service_version).run_web_scrappers())
port_service_vulnerability = namedtuple(
"Vulnerability",
"port_number service_name service_version cve_link exploit_link",
)
self.founded_vulnerabilities["ports_services"].insert(
index_ + 1,
port_service_vulnerability(
port_number=collected_info.port_number,
service_name=collected_info.service_name,
service_version=collected_info.service_version,
cve_link=cve_urls[0] if cve_urls else "Unknown",
exploit_link=exploit_urls[0]
if exploit_urls else "Unknown",
),
)
del self.founded_vulnerabilities["ports_services"][index_]
if cve_urls:
del cve_urls[0]
if exploit_urls:
del exploit_urls[0]
[docs] def create_output_tb(self):
"""Create table with all detected ports, services, services versions and found exploits."""
self.output_table.add_column("PORT", justify="center")
self.output_table.add_column("SERVICE NAME", justify="center")
self.output_table.add_column("SERVICE VERSION", justify="center")
self.output_table.add_column("VULNERABILITY INFORMATION",
justify="center",
header_style="yellow")
self.output_table.add_column("FOUNDED EXPLOIT",
justify="center",
header_style="red")
for row in self.founded_vulnerabilities["ports_services"]:
self.output_table.add_row(
str(row.port_number),
row.service_name,
row.service_version,
row.cve_link,
row.exploit_link,
)
[docs] def show_final_output(self) -> None:
"""Show final output to the end user."""
console_ = console.Console()
console_.print(self.output_table)