Source code for expliot_finder.vulnerability_scanner.executor
#!/usr/bin/env python3
"""Executor of all 'vulnerability_scanner' scanning modules.
This module executes all scanners built in 'vulnerability_scanner' module to
obtain those following information about target:
- ip_v4 (IP address of the selected target to scan)
- os_name (OS used by target)
- mac_address (MAC address used by target)
- mac_vendor_name (Vendor name of MAC used by target)
- ports_services (Open ports, services running on those open ports and
versions of those services of the chosen target)
All detected informations by scanners returned in the following form:
.. code-block:: python
{
'ip_v4': '192.168.0.1',
'os_name': 'Linux',
'mac_address': 'D8-0D-17-XX-XX-XX',
'mac_vendor_name': 'TP-LINK TECHNOLOGIES CO.,LTD.',
'ports_services': [
PortService(
port_number=22,
service_name='ssh',
service_version='SSH-2.0-dropbear_2011.54'
),
PortService(
port_number=80,
service_name='http',
service_version='Unknown'
)
],
...
}
"""
__all__ = ("VulnerabilityScannerExecutor",)
from typing import NamedTuple, Type
from expliot_finder.vulnerability_scanner.core.utils import run_concurrently
from .captured_sensitive_target_info import CapturedSensitiveInfo
from .core.scanners import MACAddressDetector, OSNameDetector, PortServiceScannerTCP
[docs]class VulnerabilityScannerExecutor:
"""Executor of all 'vulnerability_scanner' scanning modules.
Modules that are executed by this executor:
- 'OSNameDetector' (discover OS used by target)
- 'MACAddressDetector' (discover MAC address and MAC vendor name used
by target)
- 'PortServiceScannerTCP' (find out what open ports 'target' has.
If PortServiceScannerTCP detects open ports, it will try to
determine what service is running on these ports and what are
the versions of these services.)
Attributes:
captured_sensitive_info:
Global dataclass that stores detected sensitive information about
the selected target to scan. This dataclass can be easily
transformed to the dictionary.
scanned_ports_count:
Amount of already scanned ports. Needed here to pass this value
from 'PortServiceScannerTCP' trough 'VulnerabilityScannerExecutor'
to 'display_scanning_progress' to show progress of scanning.
port_amount:
Amount of ports that's will be scanned in chosen target. This value
is provided by CLI.
os_name_discoverer:
Instance of class 'OSNameDetector'. Functions in this class will
detect OS used by the chosen target.
mac_discoverer:
Instance of class 'MACAddressDetector'. Functions in this class
will detect MAC address and vendor name of MAC used by the target.
tcp_port_scanner:
Instance of class 'PortServiceScannerTCP'. Functions in this class
will find out open ports, services running on those open ports and
versions of those services in the chosen target.
.. automethod:: __call__
.. automethod:: __run_tcp_port_scanner
"""
__slots__ = (
"captured_sensitive_info",
"scanned_ports_count",
"port_amount",
"os_name_discoverer",
"mac_discoverer",
"tcp_port_scanner",
)
def __init__(self, **kwargs: dict[str, str]) -> None:
"""Init VulnerabilityScannerExecutor class.
Args:
<port_amount>:
User selected number of ports to be scanned in the selected
target.
"""
self.captured_sensitive_info = CapturedSensitiveInfo(
ip_v4=kwargs["<target_ip>"]) # type: ignore
self.scanned_ports_count: int = 0
self.port_amount: int = int(kwargs.get("<port_amount>", 1024)) # type: ignore
# Core 'vulnerability_scanner' scanners against the chosen target
self.os_name_discoverer: Type[OSNameDetector] = OSNameDetector
self.mac_discoverer: Type[MACAddressDetector] = MACAddressDetector
self.tcp_port_scanner: Type[PortServiceScannerTCP] = PortServiceScannerTCP
def __repr__(self) -> str:
"""Print class name and class attributes.
Returns:
'VulnerabilityScannerExecutor' as the class name and attributes of
this class.
"""
return f"{self.__class__.__name__}({vars(self)!r})"
[docs] async def run_tcp_port_scanner(self):
"""Run the asynchronous TCP port scanner in loop be able to track scanning progress."""
async for scanned_ports_count in self.tcp_port_scanner(
self.captured_sensitive_info,
self.port_amount).run_port_scanner():
self.scanned_ports_count = scanned_ports_count
[docs] async def __call__(self, display_scanning_progress) -> dict[str, str | list[NamedTuple]]:
"""Run concurrently scanners from 'vulnerability_scanner' module against the chosen target.
Returns:
Return detected confidential information about the selected target.
"""
await run_concurrently(
display_scanning_progress(self),
self.mac_discoverer(self.captured_sensitive_info).
capture_mac_addr_and_vendor_name(),
self.os_name_discoverer(
self.captured_sensitive_info).capture_os_name(),
self.run_tcp_port_scanner(),
)
return dict(self.captured_sensitive_info)