diff --git a/README.md b/README.md index 5ef86e9..08ec962 100644 --- a/README.md +++ b/README.md @@ -1,21 +1,15 @@ -# Sara: MikroTik RouterOS Security Inspector +# Sara: RouterOS Security Inspector -RouterOS security analyzer for detecting misconfigurations, weak settings, and known vulnerabilities (CVE). +RouterOS configuration analyzer to find security misconfigurations and vulnerabilities. -![](cover/saracover.png) +![](/banner/banner.png) -```bash - _____ - / ___/____ __________ _ - \__ \/ __ `/ ___/ __ `/ - ___/ / /_/ / / / /_/ / - /____/\__,_/_/ \__,_/ +``` +RouterOS Security Inspector. Designed for security engineers - Sara: MikroTik RouterOS Security Inspector - Developer: Magama Bazarov (Caster) - Contact: magamabazarov@mailbox.org - Version: 1.3.0 - Documentation & Usage: https://github.com/caster0x00/Sara +Author: Magama Bazarov, +Alias: Caster +Version: 1.2 ``` # Disclaimer @@ -32,7 +26,7 @@ The author does not take any responsibility for the misuse of this tool, includi # Sara is not an attack tool -**Sara does not bypass authentication, exploit vulnerabilities, or alter RouterOS configurations.** It works in read-only mode and does not modify device configuration. A read-only RouterOS account is sufficient. +**Sara does not bypass authentication, exploit vulnerabilities, or alter RouterOS configurations.** It works in **read-only mode**, requiring no administrative privileges. If you are unsure about the interpretation of the analysis results, consult an experienced network engineer before making any decisions! @@ -44,95 +38,141 @@ Before use, ensure that your device auditing complies with your organization's l - Use it only on your devices or with the owner's permission; - Do not use Sara on other people's networks without the owner's explicit consent - this may violate computer security laws! -# Features +# Mechanism -**Sara** uses [netmiko](https://github.com/ktbyers/netmiko) to remotely connect via SSH to RouterOS devices. It executes RouterOS system commands to extract configuration data and analyze it for potential vulnerabilities and signs of compromise. The user connects to the hardware himself using Sara by entering his username and password. Sara executes only `print` commands and does not change RouterOS configuration in any way. You can even use a read-only (RO) account if you want to. +**Sara** uses [netmiko](https://github.com/ktbyers/netmiko) to remotely connect via SSH to RouterOS devices. It executes RouterOS system commands to extract configuration data and analyze it for potential vulnerabilities and signs of compromise. The user connects to the hardware himself using Sara by entering his username and password. Sara executes exactly `print` based commands, thus not changing the configuration of your hardware in any way. So, by the way, you can even use an RO-only account if you want to. Sara does not use any exploits, payloads or bruteforce attacks. All RouterOS security analysis here is based on pure configuration analysis. -## Profiles +## What exactly is Sara checking for? -Sara uses audit profiles. Each profile covers its own audit scope. +1. **SMB protocol activity** – determines whether SMB is enabled, which may be vulnerable to CVE-2018-7445; -- `system`: This profile covers RouterOS system checks. It checks the system version, account status, remote management services and their availability, IP restrictions, PoE and RouterBOOT status, SSH settings, and password policies. It also analyzes MAC Winbox/Telnet services, NAT rules, connection tracking mode, RoMON, and the scheduler, including for suspicious automatic tasks and possible persistence mechanisms. -- `protocols`: The profile focuses on network protocols and services that are commonly used as entry points for attacks. It checks SMB, UPnP, SOCKS proxies, DNS settings and static DNS records, DDNS cloud services, Neighbor Discovery settings, and SNMP. The profile's task is to identify enabled or improperly restricted network services that could increase the attack surface. -- `wifi`: This profile evaluates the security of the wireless part of RouterOS. It analyzes Wi-Fi interfaces, PMKID parameters, and WPS modes that could open up opportunities for offline attacks or unauthorized connections. The profile helps you quickly understand whether the current Wi-Fi network security configuration is weak. +2. **Check the status of RMI interfaces** – identifies active management services (Telnet, FTP, Winbox, API, HTTP/HTTPS); -# CVE Search +3. **Wi-Fi Security Check** – determines whether WPS and PMKID support are enabled, which can be used in WPA2-PSK attacks; + + > At the moment, this check has minor stability issues, as different versions of RouterOS have different variations of Wi-Fi configurations. Keep that in mind, but feel free to make an issue, we'll look into it; + +4. **Check UPnP** – determines whether UPnP is enabled, which can automatically forward ports and threaten network security; + +5. **Check DNS settings** – detects whether `allow-remote-requests`, which makes the router a DNS server, is enabled; + +6. **Check DDNS** – determines whether dynamic DNS is enabled, which can reveal the real IP address of the device; + +7. **PoE Test** – checks if PoE is enabled, which may cause damage to connected devices; + +8. **Check RouterBOOT security** – determines if RouterBOOT bootloader protection is enabled; + +9. **Check SOCKS Proxy** – identifies an active SOCKS Proxy that could be used by an attacker for pivoting, as well as indicating a potential compromise of the device. + +10. **Bandwidth Server Test (BTest)** – determines whether a bandwidth server is enabled that can be used for a Flood attack by the attacker; + +11. **Check discovery protocols** – determines whether CDP, LLDP, MNDP that can disclose network information are active; + +12. **Check minimum password length** – determines whether the `minimum-password-length` parameter is set to prevent the use of weak passwords; + +13. **SSH Check** – analyzes SSH settings, including the use of strong-crypto and Port Forwarding permission; + +14. **Check Connection Tracking** – determines whether Connection Tracking is enabled, which can increase the load and open additional attack vectors; + +15. **RoMON check** – detects RoMON activity, which allows you to manage devices at Layer 2; + +16. **Check Winbox MAC Server** – analyzes access by MAC address via Winbox and Telnet, which can be a vulnerability on a local network; + +17. **Check SNMP** – detects the use of weak SNMP community strings (`public`, `private`); + +18. **Check NAT rules** – analyzes port forwarding (`dst-nat`, `netmap`) that may allow access to internal services from the outside; + +19. **Check network access to RMI** – determines whether access to critical services (API, Winbox, SSH) is restricted to trusted IPs only; + +20. **Check RouterOS version** – analyzes the current version of RouterOS and compares it to known vulnerable versions; + +21. **RouterOS Vulnerability Check** – checks the RouterOS version against the CVE database and displays a list of known vulnerabilities; + +22. **“Keep Password” in Winbox** – warns of potential use of the “Keep Password” feature + +23. **Check default usernames** – defines the use of standard logins (`admin`, `engineer`, `test`, `mikrotik`); + +24. **Checking the schedulers** – detects malicious tasks that can load remote scripts, perform hidden reboots, or run too often; + +25. **Check static DNS records** – Analyzes static DNS records that can be used for phishing and MITM attacks. + +## A breakdown of one technique + +Sara analyzes MikroTik RouterOS configuration by sending commands via SSH and interpreting the results. Let's consider a basic example of checking an SMB service that may be vulnerable to CVE-2018-7445. + +```python +# SMB Check +def check_smb(connection): + separator("Checking SMB Service") + command = "/ip smb print" + output = connection.send_command(command) + + if "enabled: yes" in output: + print(Fore.RED + Style.BRIGHT + "[*] CAUTION: SMB service is enabled! Are you sure you want it? Also, avoid CVE-2018-7445") + else: + print(Fore.GREEN + "[+] SMB is disabled. No risk detected.") + print(Fore.GREEN + "[+] No issues found.") +``` + +1. Sending a command to the router: command = `/ip smb print` - queries the status of the SMB service; +2. `output = connection.send_command(command)` - executes the command via SSH and receives its output, writing it to the variable memory; +3. If the output contains the string `“enabled: yes”`, then SMB is enabled and the script displays a warning. + +The same principle works for the other checks. Only read the configuration and then analyze it in detail. + +# Vulnerability Search (CVE) Sara performs a security analysis of RouterOS by checking the current firmware version and checking it against a database of known vulnerabilities (CVEs). This process identifies critical vulnerabilities that can be exploited by attackers to compromise the device. ## How does it work? -Sara uses a separate module called `cve_analyzer.py`. -It downloads information from the NVD (National Vulnerability Database), filters it by RouterOS, and generates a local file called `routeros_cves.json` -Next, the RouterOS version is compared with the version ranges specified in CVE records, as well as with additional patterns extracted from vulnerability descriptions. +Sara has a special module called `cve_analyzer.py`, which creates `routeros_cves.json` based on the NVD NIST database containing information about vulnerabilities, including those in MikroTik RouterOS. +Vulnerabilities for the RouterOS version are searched for using the `--cve` argument. The results will show the total number of vulnerabilities, their categorization, as well as the CVE ID and a brief description. -There are two ways to perform the CVE check: - -1. Live Device (SSH) - -Sara will determine the RouterOS version on the device and compare it with the CVE database. ```bash -~$ sara cve 192.168.88.1 admin +caster@kali:~$ sara --ip 192.168.88.1 --username admin --password admin --cve ``` -If necessary, you can specify the SSH key and port: -```bash -~$ sara cve 192.168.88.1 admin ~/.ssh/id_rsa 2222 -``` +## Specifics of checking -> The password or key passphrase is requested interactively ([getpass](https://docs.python.org/3/library/getpass.html)) +- Sara does not verify real-world exploitation of vulnerabilities. It only cross-references the RouterOS version against publicly available CVE databases; +- If the device is running an older version of RouterOS, but vulnerable services have been manually disabled, some warnings may be false positives; +- It is recommended to manually validate your version of RouterOS after the audit to ensure there are no false positives. + +## Example ```bash -[+] CVE Audit (Live) - [*] Target Device: 192.168.88.1 - [*] Transport: SSH (port 22) -[?] SSH password for admin@192.168.88.1: - [✓] SSH connection established: admin@192.168.88.1 -[+] Search for CVEs for a specific version +[+] Detected RouterOS Version: 7.1.1 [!] routeros_cves.json not found. [*] Fetching CVEs from NVD... -[+] Saved 80 CVEs to routeros_cves.json +[+] Saved 74 CVEs to routeros_cves.json +[*] Total matching CVEs: 4 +[*] CRITICAL: 1 +[*] HIGH: 1 +[*] MEDIUM: 2 +[*] Vulnerability details: -Target RouterOS Version: 7.20.5 Matched CVEs: 0 -CRIT: 0 | HIGH: 0 | MED: 0 | LOW: 0 | UNK: 0 +→ CVE-2022-45313 [HIGH] + Mikrotik RouterOs before stable v7.5 was discovered to contain an out-of-bounds read in the hotspot process. This vulnerability allows attackers to execute arbitrary code via a crafted nova message. + CVSS Score: 8.8 -[*] No known CVEs found for this RouterOS version +→ CVE-2022-45315 [CRITICAL] + Mikrotik RouterOs before stable v7.6 was discovered to contain an out-of-bounds read in the snmp process. This vulnerability allows attackers to execute arbitrary code via a crafted packet. + CVSS Score: 9.8 -[*] Disconnected from RouterOS (192.168.88.1) +→ CVE-2023-41570 [MEDIUM] + MikroTik RouterOS v7.1 to 7.11 was discovered to contain incorrect access control mechanisms in place for the Rest API. + CVSS Score: 5.3 + +→ CVE-2024-54772 [MEDIUM] + An issue was discovered in the Winbox service of MikroTik RouterOS long-term release v6.43.13 through v6.49.13 and stable v6.43 through v7.17.2. A patch is available in the stable release v6.49.18. A discrepancy in response size between connection attempts made with a valid username and those with an invalid username allows attackers to enumerate for valid accounts. + CVSS Score: 5.4 ``` -2. Manual +> The quality of entries in the NVD leaves much to be desired; in many cases, fields such as `versionEndExcluding` or `versionStartExcluding` have a value of “null.” Therefore, it is also important to validate your RouterOS version to ensure that a particular vulnerability exists. -If the device is unavailable, you can simply specify the desired version: -```bash -~$ sara cve version 7.13.1 -``` - -Sara will perform a vulnerability scan for a specific version without connecting to the device. -```bash -[+] CVE Audit (Manual Version) - [*] RouterOS Version: 7.13.1 -[+] Search for CVEs for a specific version -[!] routeros_cves.json not found. -[*] Fetching CVEs from NVD... -[+] Saved 80 CVEs to routeros_cves.json - -Target RouterOS Version: 7.13.1 Matched CVEs: 2 -CRIT: 0 | HIGH: 1 | MED: 1 | LOW: 0 | UNK: 0 - -CVE ID SEV CVSS PUBLISHED -CVE-2025-6443 HIGH 7.2 2025-06-25 -CVE-2024-54772 MED 5.4 2025-02-11 -``` - -## Features of CVE verification - -Sara does not determine the possibility of actual exploitation of vulnerabilities, but only analyzes the compliance of the RouterOS version with known CVEs; -A vulnerability is considered "relevant" if the device version falls within the range specified in the CVE, or if the vulnerability description contains a recognized version range; -The NVD database often contains incomplete data (`versionStartExcluding=null`, `versionEndExcluding=null`). That's why it's also a good idea to manually check the results against MikroTik's official changelog. - -# How to Use +# How to use You have two ways to install Sara: @@ -146,68 +186,61 @@ caster@kali:~$ sara -h 2. Manually using Git and Python: ```bash -~$ sudo apt install git python3-colorama python3-netmiko python3-packaging python3-requests -~$ git clone https://github.com/caster0x00/Sara +~$ sudo apt install git python3-colorama python3-netmiko python3-packaging +~$ git clone https://github.com/casterbyte/Sara ~$ cd Sara ~/Sara$ sudo python3 setup.py install ~$ sara -h ``` -## Startup +## Trigger Arguments (CLI Options) -The tool uses subcommands divided by purpose: - -- `audit` - analyze the RouterOS configuration by profiles (system / protocols / wifi); -- `cve` - check RouterOS vulnerabilities based on CVE (live check or manually specified version) +Sara supports the following command line options: ```bash -~$ sara [...] +usage: sara.py [-h] [--ip IP] [--username USERNAME] [--password PASSWORD] [--ssh-key SSH_KEY] [--passphrase PASSPHRASE] [--port PORT] [--cve] [--skip-confirmation] + +options: + -h, --help show this help message and exit + --ip IP The address of your MikroTik router + --username USERNAME SSH username (RO account can be used) + --password PASSWORD SSH password + --ssh-key SSH_KEY SSH key + --passphrase PASSPHRASE + SSH key passphrase + --port PORT SSH port (default: 22) + --cve Check RouterOS version against known CVEs + --skip-confirmation Skips legal usage confirmation prompt ``` -## Authentication +1. `--ip` - this argument specifies the IP address of the MikroTik device to which Sara is connecting; -Sara does not support passwords in command line arguments. Passwords/passphrases are requested securely via `getpass()` +2. `--username` - the SSH username that will be used to connect. Sara supports only authorized access; -## Audit + > You can use read-only (RO) accounts. Sara does not make configuration changes, so you do not need `write` or `full` level access. -The RouterOS configuration audit is performed via SSH with selected profiles. -```bash -~$ sara audit [key] [port] -~$ sara audit 192.168.88.1 admin system -``` +3. `--password` - password for SSH authentication; -With key (SSH): -```bash -~$ sara audit 192.168.88.1 admin system,protocols ~/.ssh/id_rsa -``` +4. `--ssh-key` - specifies the ssh key that should be used to access the RouterOS's shell -With a non-standard port: -```bash -~$ sara audit 192.168.88.1 admin system,protocols ~/.ssh/id_rsa 2222 -``` + > This is muaually exclusive with `--password`. -## Profiles +5. `--passphrase` - specifies the passphrase used to access the ssh-key -Profiles are a comma-separated list: + > This only works when using the `--ssh-key` argument. -- `system` refers to system settings, management, users, RMI, SSH security, NAT, scheduler, etc.; -- `protocols` refers to services and network protocols (SMB, UPnP, DNS, SNMP, DDNS, Neighbor Discovery); -- `wifi` refers to security of Wi-Fi interfaces (PMKID, WPS). +6. `--port` - allows you to specify a non-standard SSH port for connection. The default is **22**, but if you have changed the SSH port number, it must be specified manually. -You can use multiple profiles at once: -```bash -system,protocols,wifi -``` +7. `--cve` - launches a vulnerability search using the NIST NVD database. + +8. `--skip-confirmation` - allows you to skip the audit start confirmation check. Use this if you really have permission to perform a security audit. # Copyright -Copyright (c) 2026 Magama Bazarov. -This project is licensed under the Apache 2.0 License. - -This project is not affiliated with or endorsed by SIA Mikrotīkls - -All MikroTik trademarks and product names are the property of their respective owners. +Copyright (c) 2025 Magama Bazarov. This project is licensed under the Apache 2.0 License # Outro -If you have any suggestions or find any bugs, feel free to create issues in the repository or contact me: [magamabazarov@mailbox.org](mailto:magamabazarov@mailbox.org) +MikroTik devices are widely used around the world. Sara is designed to help engineers improve security - use it wisely. + +E-mail for contact: magamabazarov@mailbox.org diff --git a/banner/banner.png b/banner/banner.png new file mode 100644 index 0000000..9c18be4 Binary files /dev/null and b/banner/banner.png differ diff --git a/cover/saracover.png b/cover/saracover.png deleted file mode 100644 index 675cbc8..0000000 Binary files a/cover/saracover.png and /dev/null differ diff --git a/cve_analyzer.py b/cve_analyzer.py index a73276e..d130f14 100644 --- a/cve_analyzer.py +++ b/cve_analyzer.py @@ -1,99 +1,39 @@ # Auxiliary module cve_analyzer.py for searching CVE using the NIST NVD database -# -# Copyright (c) 2026 Magama Bazarov + +# Copyright (c) 2025 Magama Bazarov # Licensed under the Apache 2.0 License -# This project is not affiliated with or endorsed by SIA Mikrotīkls +# This project is not affiliated with or endorsed by MikroTik import json, re, os, requests, time from packaging.version import Version, InvalidVersion from colorama import Fore, Style -# NVD v2.0 URL and settings +# Constants NVD_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0" KEYWORD = "routeros" RESULTS_PER_PAGE = 2000 OUTPUT_FILE = "routeros_cves.json" -GUTTER = 2 # spaces between columns - -# Simple role-based color mapping -def paint(role: str, text: str) -> str: - role_value = (role or "").lower() - - if role_value in ("crit", "fail"): - return Fore.RED + text + Style.RESET_ALL - if role_value == "warn": - return Fore.YELLOW + text + Style.RESET_ALL - if role_value == "ok": - return Fore.GREEN + text + Style.RESET_ALL - if role_value == "info": - return Fore.CYAN + text + Style.RESET_ALL - if role_value == "label": - return Fore.CYAN + text + Style.RESET_ALL - if role_value == "value": - return Style.BRIGHT + text + Style.RESET_ALL - - return text - - -# ANSI and OSC-8 stripping for width calculation -ANSI_RE = re.compile(r"\x1b\[[0-9;]*[A-Za-z]") -OSC8_BEL = re.compile(r"\x1b]8;;.*?\x07") -OSC8_ST = re.compile(r"\x1b]8;;.*?\x1b\\") - - -def strip_osc8(s: str) -> str: - tmp = OSC8_BEL.sub("", s) - tmp = OSC8_ST.sub("", tmp) - return tmp - - -def visible_len(s: str) -> int: - raw = strip_osc8(s) - no_ansi = ANSI_RE.sub("", raw) - return len(no_ansi) - - -def pad_r(s: str, width: int) -> str: - length = visible_len(s) - pad_len = width - length - if pad_len < 0: - pad_len = 0 - return s + " " * pad_len - - -def pad_l(s: str, width: int) -> str: - length = visible_len(s) - pad_len = width - length - if pad_len < 0: - pad_len = 0 - return " " * pad_len + s - - -# Clickable hyperlink (OSC-8) -def term_link(text: str, url: str) -> str: - return f"\x1b]8;;{url}\x1b\\{text}\x1b]8;;\x1b\\" - - -# Convert RouterOS version string to Version object +# Converts version string to a comparable Version object def normalize_version(v): if not v: return None try: return Version(v) except InvalidVersion: - cleaned = re.sub(r"(rc|beta|testing|stable)[\d\-]*", "", v, flags=re.IGNORECASE) + # Strip unstable labels like 'rc', 'beta', etc + cleaned = re.sub(r'(rc|beta|testing|stable)[\d\-]*', '', v, flags=re.IGNORECASE) try: return Version(cleaned) except InvalidVersion: return None - -# Extract version ranges from CVE description text +# Extract version ranges from CVE descriptions def extract_ranges_from_description(description): - description = (description or "").lower() + description = description.lower() ranges = [] + # Match version ranges in various formats matches = re.findall(r"(?:from\s+)?v?(\d+\.\d+(?:\.\d+)?)\s+to\s+v?(\d+\.\d+(?:\.\d+)?)", description) for start, end in matches: ranges.append({"versionStartIncluding": start, "versionEndIncluding": end}) @@ -112,7 +52,7 @@ def extract_ranges_from_description(description): matches = re.findall(r"v?(\d+\.\d+)\.x", description) for base in matches: - ranges.append({"versionStartIncluding": f"{base}.0", "versionEndIncluding": f"{base}.999"}) + ranges.append({"versionEndIncluding": f"{base}.999"}) matches = re.findall(r"(?:up to|and below)\s+v?(\d+\.\d+(?:\.\d+)?)", description) for end in matches: @@ -120,43 +60,41 @@ def extract_ranges_from_description(description): return ranges - -# Check if current version falls into a vulnerable range +# Determines if the given version falls within a vulnerable range def is_version_affected(current_v, version_info): def get(v_key): return normalize_version(version_info.get(v_key)) - criteria_raw = version_info.get("criteria", "") or "" - criteria = criteria_raw.lower() + criteria = version_info.get("criteria", "") end_excl_raw = version_info.get("versionEndExcluding", "") - - if criteria and ("mikrotik" not in criteria or "routeros" not in criteria): - return False - + + # RouterOS 6.x vs 7.x false positive prevention if isinstance(end_excl_raw, str) and end_excl_raw.startswith("7") and str(current_v).startswith("6."): return False if isinstance(end_excl_raw, str) and end_excl_raw.startswith("6") and str(current_v).startswith("7."): return False + # Parse version range keys start_incl = get("versionStartIncluding") start_excl = get("versionStartExcluding") - end_incl = get("versionEndIncluding") - end_excl = get("versionEndExcluding") + end_incl = get("versionEndIncluding") + end_excl = get("versionEndExcluding") + # Fallback: match exact version in criteria if no range info is provided if not any([start_incl, start_excl, end_incl, end_excl]): - version_match = re.search(r"routeros:([\w.\-]+)", criteria_raw) + version_match = re.search(r"routeros:([\w.\-]+)", criteria) if version_match: version_exact = normalize_version(version_match.group(1)) return version_exact is not None and current_v == version_exact return False - for raw_key, normed in zip( - ["versionStartIncluding", "versionStartExcluding", "versionEndIncluding", "versionEndExcluding"], - [start_incl, start_excl, end_incl, end_excl], - ): - if version_info.get(raw_key) and normed is None: + # Skip if range is invalid or unparseable + for raw, normed in zip(["versionStartIncluding", "versionStartExcluding", "versionEndIncluding", "versionEndExcluding"], + [start_incl, start_excl, end_incl, end_excl]): + if version_info.get(raw) and normed is None: return False + # Perform actual version comparisons if start_incl and current_v < start_incl: return False if start_excl and current_v <= start_excl: @@ -168,8 +106,7 @@ def is_version_affected(current_v, version_info): return True - -# Download all RouterOS CVEs from NVD and save locally +# Downloads and stores all CVEs from NVD def fetch_all_cves(): all_cves = [] start_index = 0 @@ -179,7 +116,7 @@ def fetch_all_cves(): params = { "keywordSearch": KEYWORD, "startIndex": start_index, - "resultsPerPage": RESULTS_PER_PAGE, + "resultsPerPage": RESULTS_PER_PAGE } try: response = requests.get(NVD_URL, params=params, timeout=30) @@ -195,14 +132,15 @@ def fetch_all_cves(): cve_items = data.get("vulnerabilities", []) total_results = data.get("totalResults", 0) + # Process each CVE entry for item in cve_items: cve = item.get("cve", {}) cve_id = cve.get("id") - description = next((d["value"] for d in cve.get("descriptions", []) if d.get("lang") == "en"), "") + description = next((d["value"] for d in cve.get("descriptions", []) if d["lang"] == "en"), "") severity = "UNKNOWN" score = "N/A" - published = cve.get("published", "") + # Extract CVSS score/severity metrics = cve.get("metrics", {}) if "cvssMetricV31" in metrics: cvss = metrics["cvssMetricV31"][0]["cvssData"] @@ -213,49 +151,62 @@ def fetch_all_cves(): severity = cvss.get("baseSeverity", "UNKNOWN") score = cvss.get("baseScore", "N/A") + # Extract affected version ranges affected_versions = [] for config in cve.get("configurations", []): for node in config.get("nodes", []): for match in node.get("cpeMatch", []): - if not match.get("vulnerable", False): - continue - criteria = match.get("criteria", "") or "" - crit_l = criteria.lower() - if "mikrotik" not in crit_l or "routeros" not in crit_l: - continue - affected_versions.append( - { - "criteria": criteria, - "versionStartIncluding": match.get("versionStartIncluding"), - "versionStartExcluding": match.get("versionStartExcluding"), - "versionEndIncluding": match.get("versionEndIncluding"), - "versionEndExcluding": match.get("versionEndExcluding"), - } - ) + affected_versions.append({ + "criteria": match.get("criteria"), + "versionStartIncluding": match.get("versionStartIncluding"), + "versionStartExcluding": match.get("versionStartExcluding"), + "versionEndIncluding": match.get("versionEndIncluding"), + "versionEndExcluding": match.get("versionEndExcluding") + }) - all_cves.append( - { - "cve_id": cve_id, - "description": description, - "severity": severity, - "cvss_score": score, - "published": published, - "affected_versions": affected_versions, - } - ) + all_cves.append({ + "cve_id": cve_id, + "description": description, + "severity": severity, + "cvss_score": score, + "affected_versions": affected_versions + }) start_index += RESULTS_PER_PAGE if start_index >= total_results: break time.sleep(1.5) + # Write results to file with open(OUTPUT_FILE, "w", encoding="utf-8") as f: json.dump(all_cves, f, indent=2, ensure_ascii=False) print(Fore.GREEN + f"[+] Saved {len(all_cves)} CVEs to {OUTPUT_FILE}") +# Perform local audit of current RouterOS version against cached CVE data +def run_cve_audit(connection): + # Banner + print(Fore.WHITE + "=" * 60) + print(Fore.WHITE + "[!] Checking CVE Vulnerabilities") + print(Fore.MAGENTA + "[!] In any case, validate results manually due to potential false positives.") + print(Fore.WHITE + "=" * 60) -# Load local CVE cache and optionally refresh it -def load_cve_data(): + # Retrieve RouterOS version from device + output = connection.send_command("/system resource print") + match = re.search(r"version:\s*([\w.\-]+)", output) + if not match: + print(Fore.RED + "[-] ERROR: Could not determine RouterOS version.") + return + + current_version = match.group(1) + current_v = normalize_version(current_version) + + if not current_v: + print(Fore.RED + f"[-] ERROR: RouterOS version '{current_version}' is invalid.") + return + + print(Fore.GREEN + f"[+] Detected RouterOS Version: {current_version}") + + # Load or refresh CVE data if not os.path.isfile(OUTPUT_FILE): print(Fore.YELLOW + f"[!] {OUTPUT_FILE} not found.") fetch_all_cves() @@ -265,161 +216,23 @@ def load_cve_data(): if answer == "yes": fetch_all_cves() + # Load local CVE file try: with open(OUTPUT_FILE, "r", encoding="utf-8") as f: - return json.load(f) + cve_data = json.load(f) except Exception as e: print(Fore.RED + f"[-] Failed to load {OUTPUT_FILE}: {e}") - return None - - -# Format CVSS score as 0.1 or N/A -def fmt_cvss(x): - try: - value = float(x) - return f"{value:0.1f}" - except Exception: - return "N/A" - - -# Severity rank for sorting -sev_rank = { - "CRITICAL": 0, - "HIGH": 1, - "MEDIUM": 2, - "LOW": 3, - "UNKNOWN": 4, -} - - -def key_tuple(m): - sev = m.get("severity", "UNKNOWN") or "UNKNOWN" - sev_u = sev.upper() - rank = sev_rank.get(sev_u, 4) - pub = m.get("published") or "" - return (rank, pub) - - -# Severity tag (CRIT/HIGH/MED/LOW/UNK) -def sev_tag(sev: str): - sev_u = (sev or "UNKNOWN").upper() - if sev_u == "CRITICAL": - return paint("crit", "CRIT") - if sev_u == "HIGH": - return paint("fail", "HIGH") - if sev_u == "MEDIUM": - return paint("warn", "MED") - if sev_u == "LOW": - return paint("info", "LOW") - return paint("value", "UNK") - - -def count_seg(label: str, n: int): - role_map = { - "CRIT": "crit", - "HIGH": "fail", - "MED": "warn", - "LOW": "info", - "UNK": "value", - } - role = role_map[label] - return paint(role, label) + ": " + paint("value", str(n)) - - -# Summary header -def render_summary(version: str, matches, counters): - c_crit = counters.get("CRITICAL", 0) - c_high = counters.get("HIGH", 0) - c_med = counters.get("MEDIUM", 0) - c_low = counters.get("LOW", 0) - c_unk = counters.get("UNKNOWN", 0) - - line1 = ( - paint("label", "Target RouterOS Version:") + " " + paint("value", version) - + " " - + paint("label", "Matched CVEs:") + " " + paint("value", str(len(matches))) - ) - - parts = [ - count_seg("CRIT", c_crit), - count_seg("HIGH", c_high), - count_seg("MED", c_med), - count_seg("LOW", c_low), - count_seg("UNK", c_unk), - ] - line2 = " | ".join(parts) - - print(line1) - print(line2) - print() - - -# Table rendering -def render_cve(rows): - max_id_len = 14 - for m in rows: - cve_id = m.get("cve_id", "") or "" - length = len(cve_id) - if length > max_id_len: - max_id_len = length - - id_w = max_id_len + 2 - if id_w < 16: - id_w = 16 - if id_w > 22: - id_w = 22 - - sev_w = 5 - cvss_w = 4 - pub_w = 10 - sep = " " * GUTTER - - head = ( - pad_r(paint("label", "CVE ID"), id_w) - + sep - + pad_r(paint("label", "SEV"), sev_w) - + sep - + pad_r(paint("label", "CVSS"), cvss_w) - + sep - + pad_r(paint("label", "PUBLISHED"), pub_w) - ) - print(head) - - for m in rows: - cve_id = m.get("cve_id", "") or "" - link = term_link(cve_id, "https://nvd.nist.gov/vuln/detail/" + cve_id) - - sev_t = sev_tag(m.get("severity", "UNKNOWN")) - cvss = fmt_cvss(m.get("cvss_score", "N/A")) - - published = m.get("published") or "-" - published = str(published)[:10] - - line = ( - pad_r(paint("info", link), id_w) - + sep - + pad_r(sev_t, sev_w) - + sep - + pad_l(paint("value", cvss), cvss_w) - + sep - + pad_r(paint("value", published), pub_w) - ) - print(line) - - -# Core CVE matching logic for a given RouterOS version -def run_cve_match_for_version(current_v, current_version: str): - cve_data = load_cve_data() - if not cve_data: return + # CVE match logic counters = {"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 0, "UNKNOWN": 0} - matches = [] + hits = [] for cve in cve_data: matched = False affected_versions = cve.get("affected_versions", []) - + + # Fallback: try parsing version from description if structured data is missing if not affected_versions: affected_versions = extract_ranges_from_description(cve.get("description", "")) @@ -428,59 +241,41 @@ def run_cve_match_for_version(current_v, current_version: str): matched = True break - if not matched: - continue + if matched: + hits.append(cve) + severity = cve.get("severity", "UNKNOWN").upper() + counters[severity] = counters.get(severity, 0) + 1 - sev = (cve.get("severity", "UNKNOWN") or "UNKNOWN").upper() - counters[sev] = counters.get(sev, 0) + 1 + # Display summary + total = len(hits) + print(Fore.WHITE + f"[*] Total matching CVEs: {total}") + for level in ["CRITICAL", "HIGH", "MEDIUM", "LOW", "UNKNOWN"]: + count = counters.get(level, 0) + if count > 0: + color = { + "CRITICAL": Fore.RED + Style.BRIGHT, + "HIGH": Fore.RED, + "MEDIUM": Fore.YELLOW, + "LOW": Fore.CYAN, + "UNKNOWN": Fore.WHITE + }[level] + print(color + f"[*] {level}: {count}") - matches.append( - { - "cve_id": cve.get("cve_id", ""), - "severity": sev, - "cvss_score": cve.get("cvss_score", "N/A"), - "published": cve.get("published", ""), - } - ) + # Print vulnerability details + if total > 0: + print(Fore.WHITE + "[*] Vulnerability details:") + for cve in hits: + severity = cve.get("severity", "UNKNOWN").upper() + description = cve.get("description", "").strip() + score = cve.get("cvss_score", "N/A") + color = { + "CRITICAL": Fore.RED + Style.BRIGHT, + "HIGH": Fore.RED, + "MEDIUM": Fore.YELLOW, + "LOW": Fore.CYAN, + "UNKNOWN": Fore.WHITE + }.get(severity, Fore.WHITE) - print() - render_summary(current_version, matches, counters) - - if not matches: - print(paint("ok", "[*] No known CVEs found for this RouterOS version")) - print() - return - - rows = sorted(matches, key=key_tuple) - render_cve(rows) - - -# Live CVE audit based on device version (kept for potential reuse) -def run_cve_audit(connection): - print(paint("label", "[+] Search for CVEs for a specific version")) - output = connection.send_command("/system resource print") - match = re.search(r"version:\s*([\w.\-]+)", output) - if not match: - print(Fore.RED + "[-] ERROR: Could not determine RouterOS version.") - return - - current_version = match.group(1) - current_v = normalize_version(current_version) - if not current_v: - print(Fore.RED + f"[-] ERROR: RouterOS version '{current_version}' is invalid.") - return - - run_cve_match_for_version(current_v, current_version) - - - -# CVE audit for manually provided RouterOS version string (used by Sara) -def run_cve_audit_for_version(version_str: str): - print(paint("label", "[+] Search for CVEs for a specific version")) - current_version = version_str.strip() - current_v = normalize_version(current_version) - if not current_v: - print(Fore.RED + f"[-] ERROR: RouterOS version '{current_version}' is invalid.") - return - - run_cve_match_for_version(current_v, current_version) + print(color + f"\n→ {cve['cve_id']} [{severity}]") + print(Fore.WHITE + " " + description) + print(Fore.WHITE + f" CVSS Score: {score}") \ No newline at end of file diff --git a/sara.py b/sara.py index f372117..b2bb117 100644 --- a/sara.py +++ b/sara.py @@ -1,662 +1,576 @@ #!/usr/bin/env python3 -# Sara: MikroTik RouterOS Security Inspector -# Copyright (c) 2026 Magama Bazarov +# Copyright (c) 2025 Magama Bazarov # Licensed under the Apache 2.0 License -# This project is not affiliated with or endorsed by SIA Mikrotīkls +# This project is not affiliated with or endorsed by MikroTik -import argparse -import colorama -import re -import sys -import os -from getpass import getpass +import argparse, colorama, time, re, sys from netmiko import ConnectHandler from colorama import Fore, Style from packaging.version import Version -from cve_analyzer import run_cve_audit, run_cve_audit_for_version +from cve_analyzer import run_cve_audit -# init colors +# Initialize colorama for colored console output colorama.init(autoreset=True) -INDENT = " " - - -# print banner def banner(): - banner_art = r""" - _____ - / ___/____ __________ _ - \__ \/ __ `/ ___/ __ `/ - ___/ / /_/ / / / /_/ / - /____/\__,_/_/ \__,_/ + banner_text = r""" + _____ + / ____| + | (___ __ _ _ __ __ _ + \___ \ / _` | '__/ _` | + ____) | (_| | | | (_| | + |_____/ \__,_|_| \__,_| """ - print(INDENT + banner_art) - print(INDENT + "Sara: " + Style.RESET_ALL + "MikroTik RouterOS Security Inspector") - print(INDENT + "Developer: " + Style.RESET_ALL + "Magama Bazarov (Caster)") - print(INDENT + "Contact: " + Style.RESET_ALL + "magamabazarov@mailbox.org") - print(INDENT + "Version: " + Style.RESET_ALL + "1.3.0") - print(INDENT + "Documentation & Usage: " + Style.RESET_ALL + "https://github.com/caster0x00/Sara") + # Display the program banner and metadata + print(banner_text) + print(" " + Fore.YELLOW + "RouterOS Security Inspector. Designed for security engineers") + print(" " + Fore.YELLOW + "Author: " + Style.RESET_ALL + "Magama Bazarov, ") + print(" " + Fore.YELLOW + "Alias: " + Style.RESET_ALL + "Caster") + print(" " + Fore.YELLOW + "Version: " + Style.RESET_ALL + "1.2") + print(" " + Fore.YELLOW + "Documentation & Usage: " + Style.RESET_ALL + "https://github.com/casterbyte/Sara\n") - -# section header -def section(title: str): + # Display a legal disclaimer to emphasize responsible usage + print(" " + Fore.YELLOW + "[!] DISCLAIMER: Use this tool only for auditing your own devices.") + print(" " + Fore.YELLOW + "[!] Unauthorized use on third-party systems is ILLEGAL.") + print(" " + Fore.YELLOW + "[!] The author is not responsible for misuse.") print() - print(Fore.WHITE + f"[+] {title}" + Style.RESET_ALL) - -# info line -def info(msg: str): - print(Fore.WHITE + INDENT + f"[*] {msg}") - - -# ok line -def ok(msg: str): - print(Fore.GREEN + INDENT + f"[✓] {msg}") - - -# warning line -def warn(msg: str): - print(Fore.YELLOW + INDENT + f"[!] {msg}") - - -# high severity line -def alert(msg: str): - print(Fore.RED + INDENT + f"[!] {msg}") - - -# error line -def error(msg: str): - print(Fore.RED + INDENT + f"[-] {msg}") - - -# detailed line -def detail(msg: str): - print(Fore.LIGHTWHITE_EX + INDENT * 2 + f"[*] {msg}" + Style.RESET_ALL) - - -# ssh connection helper -def connect_to_router(ip, user, password=None, port=22, key_file=None, key_passphrase=None): +# Establish SSH connection to the RouterOS device using Netmiko +def connect_to_router(ip, username, password, port, key_file, passphrase): device = { "device_type": "mikrotik_routeros", "host": ip, - "username": user, + "username": username, + "password": password, "port": port, + "key_file": key_file, + "passphrase": passphrase, } - - # key-based auth - if key_file: - key_path = os.path.expanduser(key_file) - if not os.path.exists(key_path): - error(f"SSH key not found: {key_path}") - info("Provide path to the private key (not .pub)") - sys.exit(1) - - device["use_keys"] = True - device["key_file"] = key_path - - if key_passphrase: - device["passphrase"] = key_passphrase - else: - # password auth - if not password: - error("No authentication method provided") - sys.exit(1) - device["password"] = password - device["use_keys"] = False - - # connect try: - conn = ConnectHandler(**device) - ok(f"SSH connection established: {user}@{ip}") - return conn + print(Fore.WHITE + f"[*] Connecting to RouterOS at {ip}:{port}") + connection = ConnectHandler(**device) + print(Fore.WHITE + "[*] Connection successful!") + return connection except Exception as e: - error(f"SSH connection failed: {e}") - sys.exit(1) + print(Fore.RED + f"[-] Connection failed: {e}") + exit(1) +# Print a visual separator for better readability in the output +def separator(title): + print(Fore.WHITE + Style.BRIGHT + '=' * 50) + print(Fore.WHITE + Style.BRIGHT + f"[*] {title}") -# resolve auth method and prompt -def normalize_auth_and_prompt(args): - key_file = args.key - key_passphrase = None - - # key flow - if key_file: - key_file = os.path.expanduser(key_file) - if not os.path.exists(key_file): - error(f"SSH key file not found: {key_file}") - info("Provide path to the private key (not .pub)") - sys.exit(1) - - prompt = f"[?] Passphrase for key {key_file} (leave empty if none): " - entered = getpass(prompt) - if entered: - key_passphrase = entered - - return None, key_file, key_passphrase - - # password flow - password = getpass(f"[?] SSH password for {args.username}@{args.ip}: ") - return password, None, None - - -# simple version wrapper def parse_version(version_str): + # Parses a version string into a comparable Version object. Example: "6.49.7" → Version(6.49.7) return Version(version_str) - -# detect and print RouterOS version +# Retrieves the RouterOS version def check_routeros_version(connection): - # run resource print - output = connection.send_command("/system resource print") + # Separator outlet + separator("Checking RouterOS Version") + command = "/system resource print" + output = connection.send_command(command) + match = re.search(r"version:\s*([\d.]+)", output) if match: routeros_version = parse_version(match.group(1)) - info(f"Detected RouterOS: {Fore.MAGENTA}{routeros_version}{Style.RESET_ALL}") + print(Fore.GREEN + f"[+] Detected RouterOS Version: {routeros_version}") else: - error("Could not determine RouterOS version") + print(Fore.RED + Style.BRIGHT + "[-] ERROR: Could not determine RouterOS version.") - -# SMB service check +# Check if SMB service is enabled (potential security risk) def check_smb(connection): - section("SMB Service") - output = connection.send_command("/ip smb print") + # Separator outlet + separator("Checking SMB Service") + command = "/ip smb print" + output = connection.send_command(command) + if "enabled: yes" in output: - alert("SMB service is enabled! Do you need SMB? Also avoid CVE-2018-7445") + print(Fore.RED + "[*] CAUTION: SMB service is enabled! Did you turn it on? Do you need SMB? Also avoid CVE-2018-7445") else: - ok("SMB is disabled") - ok("No issues detected") - - -# RMI services exposure check + print(Fore.GREEN + "[+] SMB is disabled. No risk detected.") + print("[" + Fore.GREEN + "+" + Fore.WHITE + "] No issues found.") + +# Check for high-risk remote management interfaces (RMI) def check_rmi_services(connection): - section("Remote Management (RMI/MGMT)") - output = connection.send_command("/ip service print") + # Separator outlet + separator("Checking RMI Services") + command = "/ip service print" + output = connection.send_command(command) + high_risk = ["telnet", "ftp", "www"] - moderate_risk = ["api", "api-ssl", "winbox", "www-ssl"] - ssh = ["ssh"] + moderate_risk = ["api", "api-ssl", "winbox", "www-ssl"] + safe = ["ssh"] + risks_found = False - # scan line by line for line in output.splitlines(): line = line.strip() - if not line: - continue - # skip disabled/default - if re.search(r"^\d+\s+X\b", line): - continue - if re.search(r"^\d+\s+D\b", line): - continue + if re.search(r"^\d+\s+X", line): + continue + match = re.search(r"(\S+)\s+\d+", line) + if match: + service_name = match.group(1).lower() + display_name = service_name.upper().replace("WWW", "HTTP").replace("WWW-SSL", "HTTPS") - match = re.search(r"(\S+)\s+\d+", line) - if not match: - continue + if service_name in high_risk: + print(Fore.RED + f"[!] ALERT: {display_name} is ENABLED! This is a high security risk.") + if service_name == "ftp": + print(Fore.RED + " - Are you sure you need FTP?") + if service_name == "telnet": + print(Fore.RED + " - Account passwords can be intercepted") + if service_name == "www": + print(Fore.RED + " - Account passwords can be intercepted") + risks_found = True - service_name = match.group(1).lower() - display_name = service_name.upper().replace("WWW", "HTTP").replace("WWW-SSL", "HTTPS") - - # high risk - if service_name in high_risk: - alert(f"{display_name} is enabled (high risk)") - if service_name == "ftp": - warn("FTP transmits credentials in cleartext") - if service_name == "telnet": - warn("Telnet allows credential interception") - if service_name == "www": - warn("HTTP credentials can be sniffed over the network") - risks_found = True - continue - - # medium risk - if service_name in moderate_risk: - warn(f"{display_name} is enabled") - if service_name in ["api", "api-ssl"]: - info("RouterOS API is a brute-force target; restrict access") - elif service_name == "www-ssl": - info("Ensure HTTPS uses strong ciphers and valid certificates") - elif service_name == "winbox": - warn("Winbox enabled. Winbox 'Keep Password' may store credentials in plaintext. If the PC is compromised, saved passwords may be extracted!") - continue - - # ssh - if service_name in ssh: - ok(f"{display_name} enabled. Use strong passwords or SSH keys for authentication") + elif service_name in moderate_risk: + print(Fore.YELLOW + f"[!] CAUTION: {display_name} is enabled.") + if service_name in ["api", "api-ssl"]: + print(Fore.YELLOW + " - RouterOS API is vulnerable to a bruteforce attack. If you need it, make sure you have access to it.") + elif service_name == "www-ssl": + print(Fore.GREEN + " - HTTPS detected. Ensure it uses a valid certificate and strong encryption.") + elif service_name == "winbox": + print(Fore.RED + "[!] CAUTION: If you're using 'Keep Password' in Winbox, your credentials may be stored in plaintext!") + print(Fore.YELLOW + " - If your PC is compromised, attackers can extract saved credentials.") + print(Fore.YELLOW + " - Consider disabling 'Keep Password' to improve security.") + + elif service_name in safe: + print(Fore.GREEN + f"[+] OK: {display_name} is enabled. Good!") + print(Fore.GREEN + " - Are you using strong passwords and SSH keys for authentication?") if not risks_found: - ok("No high-risk RMI services detected") + print(Fore.GREEN + "[+] No high-risk RMI services enabled.") + print("[" + Fore.GREEN + "+" + Fore.WHITE + "] No issues found.") - -# default usernames check +# Check for default usernames that could be security risks def check_default_users(connection): - section("Default Usernames") - output = connection.send_command("/user print detail") + # Separator outlet + separator("Checking Default Usernames") + command = "/user print detail" + output = connection.send_command(command) + default_users = {"admin", "engineer", "user", "test", "root", "mikrotik", "routeros"} risks_found = False - # split user blocks - for block in output.split("\n\n"): - match = re.search(r'name="([^"]+)"', block) - if not match: - continue - username = match.group(1).lower() - if username in default_users: - warn(f"Default username detected: '{username}'") - info("Change it to a unique value to reduce attack surface") - risks_found = True - + for line in output.split("\n\n"): + match = re.search(r"name=\"?(\w+)\"?", line) + if match: + username = match.group(1).lower() + if username in default_users: + print(Fore.YELLOW + f"[!] CAUTION: Default username '{username}' detected! Change it to a unique one.") + risks_found = True if not risks_found: - ok("No default usernames found") + print(Fore.GREEN + "[+] No default usernames found.") - -# check service address-list on /ip service +# Verify whether critical services have restricted network access def checking_access_to_RMI(connection): - section("RMI/MGMT Access Restrictions") - output = connection.send_command("/ip service print") - lines = output.splitlines() - header_line = None - - # find header - for line in lines: - if line.strip().startswith("#"): - header_line = line - break - - if not header_line: - error("Unable to parse /ip service print header") - return - - try: - idx_name = header_line.index("NAME") - idx_addr = header_line.index("ADDRESS") - idx_cert = header_line.index("CERTIFICATE") if "CERTIFICATE" in header_line else None - except ValueError: - error("Expected columns NAME/ADDRESS not found in /ip service print output") - return + # Separator outlet + separator("Checking network access to RMI") + command = "/ip service print detail" + output = connection.send_command(command) risks_found = False - # parse body - for line in lines: - if not line.strip(): - continue - if line == header_line or line.strip().startswith("Flags:") or line.strip().startswith("Columns:"): - continue + for line in output.split("\n\n"): + service_match = re.search(r'name="([^"]+)"', line) + address_match = re.search(r'address=([\d./,]+)', line) - stripped = line.lstrip() - if not stripped or not stripped[0].isdigit(): - continue + if service_match: + service_name = service_match.group(1) - flags_field = line[:idx_name] - if "X" in flags_field or "D" in flags_field: - continue - - service_name_raw = line[idx_name:idx_addr].strip() - if not service_name_raw: - continue - service_name = service_name_raw.upper() - - if idx_cert is not None and len(line) > idx_addr: - addr_raw = line[idx_addr:idx_cert] - else: - addr_raw = line[idx_addr:] - address = addr_raw.strip() - - # empty address -> no restrictions - if not address: - alert(f"{service_name} has no IP restriction") - risks_found = True - else: - ok(f"{service_name} restricted to: {address}") + if address_match: + address_list = address_match.group(1).split(",") + if not address_list or address_list == [""] or "0.0.0.0/0" in address_list: + print(Fore.YELLOW + f"[!] CAUTION: {service_name.upper()} is exposed to the entire network! Restrict access to trusted IP ranges.") + risks_found = True + else: + print(Fore.GREEN + f"[+] OK! {service_name.upper()} is restricted to: {', '.join(address_list)}") + else: + print(Fore.RED + f"[!] CAUTION: {service_name.upper()} has no IP restriction set! Please restrict access.") + risks_found = True if not risks_found: - ok("All RMI services have proper IP restrictions") + print(Fore.GREEN + "[+] All services have proper IP restrictions.") - -# WiFi / PMKID / WPS check (/interface/wifi/print detail, RouterOS v7+) +# Analyze Wi-Fi security settings, including WPS and PMKID vulnerabilities +# I think this is the most unstable feature of the whole Sara, need more feedback from users to get it perfect def check_wifi_security(connection): - section("WiFi Security") + # Separator outlet + separator("Checking WLAN Security") + risks_found = False try: - output = connection.send_command("/interface/wifi/print detail") - except Exception as e: - error(f"Error while checking WiFi: {e}") - return + # Retrieve RouterOS version to determine supported commands + command = "/system resource print" + output = connection.send_command(command) + version_match = re.search(r"version:\s*([\d.]+)", output) + routeros_version = Version(version_match.group(1)) if version_match else Version("0.0.0") - if not output.strip(): - ok("No WiFi interfaces found") - return + # Wi-Fi (ROS v6/v7) + commands = ["/interface wifi print detail", "/interface wireless print detail"] + found_valid_output = False - interfaces = output.split("\n\n") - risks_found = False + for command in commands: + output = connection.send_command(command) + if "bad command name" not in output.lower() and output.strip(): + found_valid_output = True + interfaces = output.split("\n\n") + for interface in interfaces: + name_match = re.search(r'name="([^"]+)"', interface) + default_name_match = re.search(r'default-name="([^"]+)"', interface) + pmkid_match = re.search(r'disable-pmkid=(\S+)', interface) + wps_match = re.search(r'wps=(\S+)', interface) - # scan interfaces - for iface in interfaces: - if not iface.strip(): - continue + name = name_match.group(1) if name_match else (default_name_match.group(1) if default_name_match else "Unknown") + pmkid = pmkid_match.group(1) if pmkid_match else "unknown" + wps = wps_match.group(1) if wps_match else None # Fix: If WPS is not found, set None - name_match = re.search(r'\bname="([^"]+)"', iface) - if not name_match: - name_match = re.search(r'\bdefault-name="([^"]+)"', iface) + if pmkid == "no": + print(Fore.RED + f"[!] ALERT: Wi-Fi '{name}' has insecure settings!") + print(Fore.RED + " - PMKID attack is possible (disable-pmkid=no)") + risks_found = True - if name_match: - iface_name = name_match.group(1) + # Fix: Do not report WPS if it's completely missing in the output + if wps is not None and wps != "disable": + print(Fore.RED + f"[!] ALERT: Wi-Fi '{name}' has WPS enabled ({wps}), Risk of PIN bruteforcing and Pixie Dust attacks.") + risks_found = True + + if not found_valid_output: + print(Fore.RED + "[-] ERROR: Unable to retrieve Wi-Fi interface settings. Unsupported RouterOS version or missing interface.") + + # Security profiles (ROS v6) + security_profiles_output = connection.send_command("/interface wireless security-profiles print detail") + if security_profiles_output.strip(): + profiles = security_profiles_output.split("\n\n") + for profile in profiles: + profile_name_match = re.search(r'name="([^"]+)"', profile) + pmkid_match = re.search(r'disable-pmkid=(\S+)', profile) + + profile_name = profile_name_match.group(1) if profile_name_match else "Unknown" + pmkid = pmkid_match.group(1) if pmkid_match else "unknown" + + if pmkid == "no": + print(Fore.RED + f"[!] ALERT: Security Profile '{profile_name}' allows PMKID attack! (disable-pmkid=no)") + risks_found = True + + # /interface wifi security print (ROS v7.10+ only) + if routeros_version >= Version("7.10"): + security_output = connection.send_command("/interface wifi security print") + if security_output.strip(): + securities = security_output.split("\n\n") + for security in securities: + sec_name_match = re.search(r'name="([^"]+)"', security) + pmkid_match = re.search(r'disable-pmkid=(\S+)', security) + wps_match = re.search(r'wps=(\S+)', security) + + if sec_name_match and (pmkid_match or wps_match): + sec_name = sec_name_match.group(1) + pmkid = pmkid_match.group(1) if pmkid_match else "unknown" + wps = wps_match.group(1) if wps_match else None # Fix: Avoid "WPS is enabled (unknown)" + + if pmkid == "no": + print(Fore.RED + f"[!] ALERT: Wi-Fi security profile '{sec_name}' has insecure settings!") + print(Fore.RED + " - PMKID attack is possible (disable-pmkid=no)") + risks_found = True + + if wps is not None and wps != "disable": + print(Fore.RED + f"[!] ALERT: Wi-Fi security profile '{sec_name}' has WPS enabled ({wps}), Risk of PIN bruteforcing and Pixie Dust attacks.") + risks_found = True + else: + print(Fore.RED + "[-] ERROR: Unable to retrieve Wi-Fi security settings.") else: - iface_name = "Unknown" + print(Fore.CYAN + "[*] Skipping `/interface wifi security print` (not supported in this version)") - pmkid_enabled = re.search(r'\.disable-pmkid=no\b', iface) - wps_push = re.search(r'\.wps=push-button\b', iface) - - if pmkid_enabled or wps_push: - warn(f"WiFi interface '{iface_name}' has potentially weak security settings") - if pmkid_enabled: - detail("PMKID is enabled (.disable-pmkid=no) - allows offline PMKID-based attacks on WPA/WPA2-PSK") - if wps_push: - detail("WPS push-button is enabled (.wps=push-button) - WPS is a known attack surface; disable it in production") - risks_found = True + except Exception as e: + print(Fore.RED + f"[-] ERROR: Failed to check Wi-Fi settings: {e}") if not risks_found: - ok("No risky WiFi security settings detected") + print(Fore.GREEN + "[+] All Wi-Fi interfaces and security profiles have secure settings.") + print(Fore.YELLOW + "[*] If you use WPA-PSK or WPA2-PSK, take care of password strength. So that the handshake cannot be easily brute-forced.") + print("[" + Fore.GREEN + "+" + Fore.WHITE + "] No issues found.") - -# UPnP check +# Check if UPnP is enabled def check_upnp_status(connection): - section("UPnP Status") - output = connection.send_command("/ip upnp print") + # Separator outlet + separator("Checking UPnP Status") + command = "/ip upnp print" + output = connection.send_command(command) + if "enabled: yes" in output: - alert("UPnP is enabled") - detail("UPnP allows automatic port forwarding to internal hosts") - detail("Can expose devices to the Internet without your awareness") - detail("Ensure this was intentionally enabled") + print(Fore.RED + "[!] ALERT: UPnP is ENABLED! This is a very insecure protocol that automatically pushes internal hosts to the Internet. This protocol is used for automatic port forwarding and may also indicate a potential router compromise. Did you enable UPnP yourself?") else: - ok("UPnP is disabled") + print(Fore.GREEN + "[+] UPnP is disabled. No risk detected.") + print("[" + Fore.GREEN + "+" + Fore.WHITE + "] No issues found.") - -# DNS behavior check +# Check if the router is acting as a DNS server def check_dns_status(connection): - section("DNS Settings") - output = connection.send_command("/ip dns print") + # Separator outlet + separator("Checking DNS Settings") + command = "/ip dns print" + output = connection.send_command(command) + if "allow-remote-requests: yes" in output: - warn("Router is acting as a DNS server") - detail("DNS queries from the network are accepted") - detail("Ensure DNS is not exposed on external interfaces") + print(Fore.YELLOW + "[!] CAUTION: Router is acting as a DNS server! This is just a warning. The DNS port on your RouterOS should not be on the external interface.") else: - ok("Remote DNS requests are disabled") + print(Fore.GREEN + "[+] DNS remote requests are disabled. No risk detected.") + print("[" + Fore.GREEN + "+" + Fore.WHITE + "] No issues found.") - -# DDNS check +# Check DDNS Settings def check_ddns_status(connection): - section("DDNS Settings") - output = connection.send_command("/ip cloud print") + # Separator outlet + separator("Checking DDNS Settings") + command = "/ip cloud print" + output = connection.send_command(command) + if "ddns-enabled: yes" in output: - warn("Dynamic DNS is enabled") - detail("Your router may become reachable via a public hostname") - detail("Ensure this is needed for remote access or VPN setups") + print(Fore.YELLOW + "[!] CAUTION: Dynamic DNS is enabled! Are you sure you need it?") else: - ok("DDNS is disabled") + print(Fore.GREEN + "[+] DDNS is disabled. No risk detected.") + print("[" + Fore.GREEN + "+" + Fore.WHITE + "] No issues found.") - -# PoE check +# Detect active PoE interfaces that might pose a risk to connected devices def check_poe_status(connection): - section("PoE Status") - output = connection.send_command("/interface ethernet print detail") - interfaces = output.split("\n\n") + # Separator outlet + separator("Checking PoE Status") + command = "/interface ethernet print detail" + output = connection.send_command(command) + risks_found = False + interfaces = output.split("\n\n") - # inspect each port - for iface in interfaces: - if not iface.strip(): - continue - - name_match = re.search(r'name="([^"]+)"', iface) - poe_match = re.search(r'poe-out=(\S+)', iface) + for interface in interfaces: + name_match = re.search(r'name="([^"]+)"', interface) + poe_match = re.search(r'poe-out=(\S+)', interface) name = name_match.group(1) if name_match else "Unknown" - poe_mode = poe_match.group(1) if poe_match else "none" + poe = poe_match.group(1) if poe_match else "none" - if poe_mode in ("auto-on", "forced-on"): - warn(f"PoE is enabled on interface '{name}'") - detail("Ensure connected devices support PoE to avoid hardware damage") + if poe in ["auto-on", "forced-on"]: + print(Fore.YELLOW + f"[!] CAUTION: PoE is enabled on {name}. Ensure that connected devices support PoE to prevent damage.") risks_found = True if not risks_found: - ok("No PoE-enabled interfaces detected") + print(Fore.GREEN + "[+] No PoE-enabled interfaces detected.") + print("[" + Fore.GREEN + "+" + Fore.WHITE + "] No issues found.") - -# RouterBOOT protection check +# Checking RouterBOOT def check_routerboot_protection(connection): - section("RouterBOOT Protection") - output = connection.send_command("/system routerboard settings print") + # Separator outlet + separator("Checking RouterBOOT Protection") + command = "/system routerboard settings print" + output = connection.send_command(command) + if "protected-routerboot: disabled" in output: - alert("RouterBOOT protection is disabled") - detail("Device can be reset or reflashed via Netinstall without authentication") - detail("Enable 'protected-routerboot' to prevent unauthorized boot changes") + print(Fore.YELLOW + "[!] CAUTION: RouterBOOT protection is disabled! This can allow unauthorized firmware changes and password resets via Netinstall.") else: - ok("RouterBOOT protection is enabled") + print(Fore.GREEN + "[+] RouterBOOT protection is enabled. No risk detected.") + print("[" + Fore.GREEN + "+" + Fore.WHITE + "] No issues found.") - -# SOCKS proxy check def check_socks_status(connection): - section("SOCKS Proxy Status") - output = connection.send_command("/ip socks print") + separator("Checking SOCKS Proxy Status") + command = "/ip socks print" + output = connection.send_command(command) + if "enabled: yes" in output: - alert("SOCKS proxy is enabled") - detail("SOCKS may indicate unauthorized tunneling or compromise") - detail("Attackers often use SOCKS as a pivot into internal networks") - detail("Disable unless explicitly required for your environment") + print(Fore.RED + "[!] ALERT: SOCKS proxy is enabled! This may indicate a possible compromise of the device, the entry point to the internal network.") else: - ok("SOCKS proxy is disabled") + print(Fore.GREEN + "[+] SOCKS proxy is disabled. No risk detected.") + print("[" + Fore.GREEN + "+" + Fore.WHITE + "] No issues found.") - -# bandwidth-server check +# Verify if RouterBOOT protection is enabled to prevent unauthorized firmware modifications def check_bandwidth_server_status(connection): - section("Bandwidth Server Status") - output = connection.send_command("/tool bandwidth-server print") + # Separator outlet + separator("Checking Bandwidth Server Status") + command = "/tool bandwidth-server print" + output = connection.send_command(command) + if "enabled: yes" in output: - warn("Bandwidth server is enabled") - detail("May generate unwanted test traffic") - detail("Can increase CPU load under active use") + print(Fore.YELLOW + "[!] CAUTION: Bandwidth server is enabled! Possible unwanted traffic, possible CPU load.") else: - ok("Bandwidth server is disabled") + print(Fore.GREEN + "[+] Bandwidth server is disabled. No risk detected.") + print("[" + Fore.GREEN + "+" + Fore.WHITE + "] No issues found.") - -# neighbor discovery config check +# Analyze discovery protocols (CDP, LLDP, MNDP) that might expose network information def check_neighbor_discovery(connection): - section("Neighbor Discovery Protocols") - output = connection.send_command("/ip neighbor discovery-settings print") - risks_found = False + # Separator outlet + separator("Checking Neighbor Discovery Protocols") + command = "/ip neighbor discovery-settings print" + output = connection.send_command(command) if "discover-interface-list: all" in output: - warn("Discovery packets are sent on all interfaces") - detail("This allows attackers to map RouterOS presence on multiple segments") - risks_found = True + print(Fore.YELLOW + "[!] CAUTION: RouterOS sends Discovery protocol packets to all interfaces. This can be used by an attacker to gather data about RouterOS.") protocol_match = re.search(r'protocol: ([\w,]+)', output) if protocol_match: protocols = protocol_match.group(1) - warn(f"Neighbor discovery protocols enabled: {protocols}") - detail("Limit discovery to management or trusted interfaces only") - risks_found = True + print(Fore.YELLOW + f"[!] Neighbor Discovery Protocols enabled: {protocols}") + if "discover-interface-list: all" not in output and not protocol_match: + print(Fore.GREEN + "[+] No security risks found in Neighbor Discovery Protocol settings.") + print("[" + Fore.GREEN + "+" + Fore.WHITE + "] No issues found.") - if not risks_found: - ok("No security risks found in Neighbor Discovery configuration") - - -# password policy check +# Ensure a minimum password length policy is enforced def check_password_length_policy(connection): - section("Password Policy") - output = connection.send_command("/user settings print") + # Separator outlet + separator("Checking Password Policy") + command = "/user settings print" + output = connection.send_command(command) + if "minimum-password-length: 0" in output: - warn("No minimum password length is enforced") - detail("Short passwords significantly reduce brute-force resistance") - detail("Set a minimum length (e.g. 10-12 characters or more)") - else: - ok("Password length policy is enforced") + print(Fore.YELLOW + "[!] CAUTION: No minimum password length is enforced! The length of the created passwords must be taken into account.") + if "minimum-password-length: 0" not in output: + print(Fore.GREEN + "[+] Password policy is enforced. No risk detected.") + print("[" + Fore.GREEN + "+" + Fore.WHITE + "] No issues found.") - -# SSH security check +# Analyze SSH security settings, including strong encryption and port forwarding risks def check_ssh_security(connection): - section("SSH Security") - output = connection.send_command("/ip ssh print") - risks_found = False + # Separator outlet + separator("Checking SSH Security") + command = "/ip ssh print" + output = connection.send_command(command) if "forwarding-enabled: both" in output: - warn("SSH dynamic port forwarding is enabled") - detail("May be used as a tunneling/pivoting channel") - detail("Verify this is required and properly restricted") - risks_found = True - + print(Fore.YELLOW + "[!] CAUTION: SSH Dynamic Port Forwarding is enabled! This could indicate a RouterOS compromise, and SSH DPF could also be used by an attacker as a pivoting technique.") if "strong-crypto: no" in output: - warn("Strong SSH crypto is disabled") - detail("Enable 'strong-crypto' to enforce stronger ciphers and MACs") - detail("Disables weak algorithms (MD5, null encryption, small DH groups)") - risks_found = True + print(Fore.YELLOW + "[!] CAUTION: strong-crypto is disabled! It is recommended to enable it to enhance security. This will:") + print(Fore.YELLOW + " - Use stronger encryption, HMAC algorithms, and larger DH primes;") + print(Fore.YELLOW + " - Prefer 256-bit encryption, disable null encryption, prefer SHA-256;") + print(Fore.YELLOW + " - Disable MD5, use 2048-bit prime for Diffie-Hellman exchange;") + if "forwarding-enabled: both" not in output and "strong-crypto: no" not in output: + print(Fore.GREEN + "[+] SSH security settings are properly configured.") + print("[" + Fore.GREEN + "+" + Fore.WHITE + "] No issues found.") - if not risks_found: - ok("SSH security settings are properly configured") - - -# Connection tracking check +# Check if connection tracking is enabled, which may impact performance def check_connection_tracking(connection): - section("Connection Tracking") - output = connection.send_command("/ip firewall connection tracking print") + # Separator outlet + separator("Checking Connection Tracking") + command = "/ip firewall connection tracking print" + output = connection.send_command(command) if "enabled: auto" in output or "enabled: on" in output: - warn("Connection tracking is enabled") - detail("RouterOS tracks connection states for firewall/NAT") - detail("On pure transit routers without NAT, disabling may reduce CPU load") - else: - ok("Connection tracking is configured appropriately") + print(Fore.YELLOW + "[!] CAUTION: Connection Tracking is enabled! This means RouterOS is tracks connection statuses.") + print(Fore.YELLOW + " - If this device is a transit router and does NOT use NAT, consider disabling connection tracking to reduce CPU load.") + + if "enabled: auto" not in output and "enabled: on" not in output: + print(Fore.GREEN + "[+] Connection Tracking is properly configured.") + print("[" + Fore.GREEN + "+" + Fore.WHITE + "] No issues found.") - -# RoMON check +# Verify if RoMON is enabled, which might expose Layer 2 management access def check_romon_status(connection): - section("RoMON Status") - output = connection.send_command("/tool romon print") + # Separator outlet + separator("Checking RoMON Status") + command = "/tool romon print" + output = connection.send_command(command) + if "enabled: yes" in output: - warn("RoMON is enabled") - detail("Provides Layer 2 management access to RouterOS devices") - detail("Disable RoMON if not explicitly required to reduce attack surface") - else: - ok("RoMON is disabled") + print(Fore.YELLOW + "[!] CAUTION: RoMON is enabled! This allows Layer 2 management access, which may expose the router to unauthorized control.") + print(Fore.YELLOW + " - If RoMON is not required, disable it to reduce attack surface.") + if "enabled: yes" not in output: + print(Fore.GREEN + "[+] RoMON is disabled. No risk detected.") + print("[" + Fore.GREEN + "+" + Fore.WHITE + "] No issues found.") - -# MAC Winbox / MAC Telnet / MAC ping checks +# Analyze MAC-based Winbox access settings def check_mac_winbox_security(connection): - section("Winbox MAC Server Settings") + separator("Checking Winbox MAC Server Settings") - # MAC Winbox + # MAC-Winbox Server try: - output = connection.send_command("/tool mac-server mac-winbox print") + command = "/tool mac-server mac-winbox print" + output = connection.send_command(command) + if "allowed-interface-list" in output: if "allowed-interface-list: all" in output: - warn("MAC Winbox access is allowed on all interfaces") - detail("Limit MAC Winbox to management or trusted segments only") + print(Fore.YELLOW + "[!] CAUTION: MAC Winbox access is enabled on all interfaces.") else: - ok("MAC Winbox is restricted to specific interfaces") + print(Fore.GREEN + "[+] MAC Winbox is properly restricted.") else: - # legacy layout + # Fallback for older versions: look for "INTERFACE" column and value "all" if re.search(r"\bINTERFACE\s*\n.*\ball\b", output, re.DOTALL | re.IGNORECASE): - warn("MAC Winbox access is allowed on all interfaces (legacy format)") - detail("Limit MAC Winbox to management or trusted segments only") + print(Fore.YELLOW + "[!] CAUTION: MAC Winbox access is enabled on all interfaces") else: - ok("MAC Winbox is properly restricted (legacy format)") + print(Fore.GREEN + "[+] MAC Winbox is properly restricted (legacy format).") except Exception as e: - error(f"Error while checking MAC Winbox: {e}") + print(Fore.RED + f"[-] ERROR while checking MAC Winbox: {e}") - # MAC Telnet + # MAC-Server try: - output = connection.send_command("/tool mac-server print") + command = "/tool mac-server print" + output = connection.send_command(command) + if "allowed-interface-list" in output: if "allowed-interface-list: all" in output: - warn("MAC Telnet access is allowed on all interfaces") - detail("Limit MAC Telnet to management or trusted segments only") + print(Fore.YELLOW + "[!] CAUTION: MAC Telnet access is enabled on all interfaces.") else: - ok("MAC Telnet is restricted to specific interfaces") + print(Fore.GREEN + "[+] MAC Telnet is properly restricted.") else: if re.search(r"\bINTERFACE\s*\n.*\ball\b", output, re.DOTALL | re.IGNORECASE): - warn("MAC Telnet access is allowed on all interfaces (legacy format)") - detail("Limit MAC Telnet to management or trusted segments only") + print(Fore.YELLOW + "[!] CAUTION: MAC Telnet access is enabled on all interfaces") else: - ok("MAC Telnet is properly restricted (legacy format)") + print(Fore.GREEN + "[+] MAC Telnet is properly restricted (legacy format).") except Exception as e: - error(f"Error while checking MAC Telnet: {e}") + print(Fore.RED + f"[-] ERROR while checking MAC Telnet: {e}") - # MAC ping + # MAC Ping try: - output = connection.send_command("/tool mac-server ping print") + command = "/tool mac-server ping print" + output = connection.send_command(command) if "enabled: yes" in output: - warn("MAC Ping is enabled") - detail("May generate unnecessary Layer 2 broadcast traffic") + print(Fore.YELLOW + "[!] CAUTION: MAC Ping is enabled. Possible unwanted traffic.") else: - ok("MAC Ping is restricted or disabled") + print(Fore.GREEN + "[+] MAC Ping is properly restricted.") except Exception as e: - error(f"Error while checking MAC Ping: {e}") + print(Fore.RED + f"[-] ERROR while checking MAC Ping: {e}") - -# SNMP communities check +# Check for weak SNMP community strings that could be exploited def check_snmp(connection): - section("SNMP Community Strings") - output = connection.send_command("/snmp community print") - bad_names = {"public", "private", "admin", "mikrotik", "mikrotik_admin", "root", "routeros", "zabbix"} + # Separator outlet + separator("Checking SNMP Community Strings") + command = "/snmp community print" + output = connection.send_command(command) + + bad_names = ["public", "private", "admin", "mikrotik", "mikrotik_admin", "root", "routeros", "zabbix"] risks_found = False - # scan table for line in output.splitlines(): match = re.search(r'^\s*\d+\s+[*X]?\s*([\w-]+)', line) - if not match: - continue - community_name = match.group(1).lower() - if community_name in bad_names: - warn(f"Weak SNMP community string detected: '{community_name}'") - detail("Change it to a long, random value and restrict source IPs") - risks_found = True + if match: + community_name = match.group(1).lower() + if community_name in bad_names: + print(Fore.YELLOW + f"[!] CAUTION: Weak SNMP community string detected: '{community_name}'. Change it to a secure, unique value.") + risks_found = True if not risks_found: - ok("SNMP community strings checked - no weak values detected") + print(Fore.GREEN + "[+] SNMP community strings checked. No weak values detected.") + print("[" + Fore.GREEN + "+" + Fore.WHITE + "] No issues found.") - -# dst-nat / netmap rules check +# Detect and analyze firewall NAT rules that could expose internal services def check_dst_nat_rules(connection): - section("Firewall NAT Rules") - output = connection.send_command("/ip firewall nat print") + # Separator outlet + separator("Checking Firewall NAT Rules") + command = "/ip firewall nat print" + output = connection.send_command(command) dst_nat_rules = [] - for line in output.splitlines(): if "action=dst-nat" in line or "action=netmap" in line: dst_nat_rules.append(line.strip()) - if dst_nat_rules: - warn("Destination NAT (dst-nat/netmap) rules detected") - detail("Exposing internal services to the Internet can be dangerous") - detail("Verify that each rule is intentional and properly restricted") + print(Fore.YELLOW + "[!] CAUTION: Destination NAT (dst-nat/netmap) rules detected! Exposing devices to the internet can be dangerous.") + print(Fore.YELLOW + "[*] Similar rules can also be created by the attacker. Did you really create these rules yourself?") + print(Fore.YELLOW + " - Review the following NAT rules:") for rule in dst_nat_rules: - detail(rule) - else: - ok("No Destination NAT (dst-nat/netmap) rules detected") + print(Fore.YELLOW + f" {rule}") + if not dst_nat_rules: + print(Fore.GREEN + "[+] No Destination NAT (dst-nat/netmap) rules detected. No risks found.") + print("[" + Fore.GREEN + "+" + Fore.WHITE + "] No issues found.") - -# scheduler / persistence check +# Identify potentially malicious scheduled tasks def detect_malicious_schedulers(connection): - section("Schedulers & Persistence") - output = connection.send_command("/system scheduler print detail") + # Separator outlet + separator("Checking for Malicious Schedulers") + command = "/system scheduler print detail" + output = connection.send_command(command) + risks_found = False fetch_files = set() - tasks = output.split("\n\n") - - # first pass: track fetch targets - for task in tasks: - if not task.strip(): - continue - - event_match = re.search(r'on-event="?([^"\n]+)"?', task) - event = event_match.group(1).strip() if event_match else "" - fetch_match = re.search(r'dst-path=([\S]+)', event) - if "fetch" in event and fetch_match: - fetched_file = fetch_match.group(1).strip(";") - fetch_files.add(fetched_file) - - # second pass: analyze schedulers - for task in tasks: - if not task.strip(): - continue + for task in output.split("\n\n"): name_match = re.search(r'name="?([^"]+)"?', task) event_match = re.search(r'on-event="?([^"\n]+)"?', task) policy_match = re.search(r'policy=([\w,]+)', task) @@ -664,223 +578,203 @@ def detect_malicious_schedulers(connection): name = name_match.group(1) if name_match else "Unknown" event = event_match.group(1).strip() if event_match else "" - policies = policy_match.group(1).split(",") if policy_match else [] + policy = policy_match.group(1).split(",") if policy_match else [] interval_value, interval_unit = (int(interval_match.group(1)), interval_match.group(2)) if interval_match else (None, None) - issues = [] + # DEBUG + print(Fore.CYAN + f"[*] Checking: '{name}' → {event}") - # fetch + import chain + # Fetch detection + fetch_match = re.search(r'dst-path=([\S]+)', event) + if "fetch" in event and fetch_match: + fetched_file = fetch_match.group(1).strip(";") + fetch_files.add(fetched_file) + print(Fore.YELLOW + f"[!] Noted fetched file: {fetched_file}") + + # Import detection (checks if imported file was fetched earlier) import_match = re.search(r'import\s+([\S]+)', event) if "import" in event and import_match: imported_file = import_match.group(1).strip(";") if imported_file in fetch_files: - issues.append("Imports a previously fetched script - potential backdoor") - if interval_value and interval_unit: - issues.append(f"Runs every {interval_value}{interval_unit}, ensures persistence") + print(Fore.RED + f"[!] ALERT: '{name}' is a BACKDOOR!") + print(Fore.RED + " - This scheduler imports a previously fetched script.") + print(Fore.RED + " - Attacker can inject any command remotely via this script.") + print(Fore.RED + f" - Interval: {interval_value}{interval_unit}, ensuring persistence.") + risks_found = True - # dangerous policies + # High privileges checking dangerous_policies = {"password", "sensitive", "sniff", "ftp"} - used_dangerous = [p for p in policies if p in dangerous_policies] - if used_dangerous: - issues.append(f"Uses high-privilege policies: {', '.join(used_dangerous)}") + if any(p in dangerous_policies for p in policy): + print(Fore.RED + f"[!] ALERT: '{name}' has HIGH PRIVILEGES!") + print(Fore.RED + f" - It has dangerous permissions: {', '.join(policy)}") + risks_found = True - # reboots + # Reboot detection (Anti-forensics & persistence check) if "reboot" in event: if interval_value and interval_unit in ["s", "m", "h"] and interval_value < 12: - issues.append(f"Frequently reboots router ({interval_value}{interval_unit}) - possible anti-forensics") + print(Fore.RED + f"[!] ALERT: '{name}' reboots router TOO FREQUENTLY ({interval_value}{interval_unit})!") + print(Fore.RED + " - This may be an attempt to prevent log analysis (anti-forensics).") + risks_found = True else: - issues.append("Schedules router reboot - verify it is intentional") + print(Fore.YELLOW + f"[!] CAUTION: '{name}' schedules a reboot.") + print(Fore.YELLOW + " - Ensure this is intentional and not used to hide attacks.") + continue - # tight intervals + # Frequent execution detection if interval_value and interval_unit in ["s", "m", "h"] and interval_value < 25: - issues.append(f"Executes too frequently ({interval_value}{interval_unit}) - may indicate persistence or botnet-like activity") - - if issues: - alert(f"Scheduler '{name}' looks suspicious") - for msg in issues: - detail(msg) + print(Fore.RED + f"[!] ALERT: '{name}' executes TOO FREQUENTLY ({interval_value}{interval_unit})!") + print(Fore.RED + " - This indicates botnet-like persistence.") risks_found = True if not risks_found: - ok("No suspicious schedulers detected") + print(Fore.GREEN + "[+] No malicious schedulers detected.") - -# static DNS entries check +# Checking DNS Static Entries def check_static_dns_entries(connection): - section("Static DNS Entries") - output = connection.send_command("/ip dns static print detail") + # Separator outlet + separator("Checking Static DNS Entries") + command = "/ip dns static print detail" + output = connection.send_command(command) + dns_entries = [] entry_blocks = output.split("\n\n") - # parse entries for entry in entry_blocks: - if not entry.strip(): - continue name_match = re.search(r'name="([^"]+)"', entry) address_match = re.search(r'address=([\d.]+)', entry) + if name_match and address_match: name = name_match.group(1) address = address_match.group(1) dns_entries.append((name, address)) if dns_entries: - warn("Static DNS entries are configured") - detail("Verify that each record is legitimate and expected") + print(Fore.YELLOW + "[!] WARNING: The following static DNS entries exist:") for name, address in dns_entries: - detail(f"{name} → {address}") - detail("Attackers often modify DNS for phishing or traffic redirection during post-exploitation") + print(Fore.CYAN + f" - {name} → {address}") + + print(Fore.YELLOW + "[*] Were you the one who created those static DNS records? Make sure.") + print(Fore.YELLOW + "[*] Attackers during RouterOS post-exploitation like to tamper with DNS record settings, for example, for phishing purposes.") else: - ok("No static DNS entries found") + print(Fore.GREEN + "[+] No static DNS entries found.") -# parse audit profiles -def parse_profiles(profiles_str): - raw = [p.strip().lower() for p in profiles_str.split(",")] - selected = {p for p in raw if p} - valid = {"system", "protocols", "wifi"} +# Require user confirmation before proceeding, emphasizing legal responsibility +def confirm_legal_usage(): + print(" " + "WARNING: This tool is for security auditing of YOUR OWN RouterOS devices.") + print(" " + "Unauthorized use may be illegal. Proceed responsibly.\n") + response = input(" " + "Do you wish to proceed? [yes/no]: ").strip() + + if response.lower() != "yes": + print("\nOperation aborted. Exiting...") + sys.exit(0) - if not selected: - error("No profiles specified") - info("Use at least one profile: system, protocols, wifi") +# Require user confirmation before proceeding, emphasizing legal responsibility +def confirm_legal_usage(): + print(" " + "WARNING: This tool is for security auditing of YOUR OWN RouterOS devices.") + print(" " + "Unauthorized use may be illegal. Proceed responsibly.\n") + +def prompt_legal_usage(): + response = input(" " + "Do you wish to proceed? [yes/no]: ").strip() + + if response.lower() != "yes": + print("\nOperation aborted. Exiting...") + sys.exit(0) + +# Main func +def main(): + banner() + + parser = argparse.ArgumentParser() + parser.add_argument("--ip", help="The address of your MikroTik router") + parser.add_argument("--username", help="SSH username (RO account can be used)") + parser.add_argument("--password", help="SSH password") + parser.add_argument("--ssh-key", help="SSH key") + parser.add_argument("--passphrase", help="SSH key passphrase") + parser.add_argument("--port", type=int, default=22, help="SSH port (default: 22)") + parser.add_argument("--cve", action="store_true", help="Check RouterOS version against known CVEs") + parser.add_argument("--skip-confirmation", action='store_true', help="Skips legal usage confirmation prompt") + + args = parser.parse_args() + + if len(sys.argv) == 2 and sys.argv[1] in ["-h", "--help"]: + parser.print_help() + sys.exit(0) + + if not args.ip or not args.username or (not args.password and not args.ssh_key): + print(Fore.YELLOW + "[!] ERROR: Missing required arguments") + print(Fore.YELLOW + "[!] Use 'sara --help' for more information") sys.exit(1) - invalid = selected - valid - if invalid: - error("Unknown profiles: " + ", ".join(sorted(invalid))) - info("Valid profiles: system, protocols, wifi") + if args.password and args.ssh_key: + print(Fore.YELLOW + "[!] ERROR: Can't use both password & ssh_key authentication") sys.exit(1) - return selected + if args.passphrase and not args.ssh_key: + print(Fore.YELLOW + "[!] ERROR: Passphrase requires --ssh-key") + sys.exit(1) +# Legal warning (interactive only if not skipped) + if not args.skip_confirmation: + # disclaimer text + confirm_legal_usage() + # yes or no + prompt_legal_usage() + else: + confirm_legal_usage() -# main audit dispatcher -def run_sara_audit(args): - section("Sara Audit Mode") - info(f"Target Device: {args.ip}") - info(f"Transport: SSH (port {args.port})") + # Start timer + start_time = time.time() - profiles = parse_profiles(args.profiles) - password, key_file, key_passphrase = normalize_auth_and_prompt(args) + # Connect to RouterOS connection = connect_to_router( args.ip, args.username, - password=password, - port=args.port, - key_file=key_file, - key_passphrase=key_passphrase, + args.password, + args.port, + args.ssh_key, + args.passphrase ) - # system profile - if "system" in profiles: - check_routeros_version(connection) - check_default_users(connection) - check_rmi_services(connection) - checking_access_to_RMI(connection) - check_poe_status(connection) - check_routerboot_protection(connection) - check_bandwidth_server_status(connection) - check_password_length_policy(connection) - check_ssh_security(connection) - check_connection_tracking(connection) - check_romon_status(connection) - check_mac_winbox_security(connection) - check_dst_nat_rules(connection) - detect_malicious_schedulers(connection) - - # protocols profile - if "protocols" in profiles: - check_smb(connection) - check_upnp_status(connection) - check_socks_status(connection) - check_dns_status(connection) - check_static_dns_entries(connection) - check_ddns_status(connection) - check_neighbor_discovery(connection) - check_snmp(connection) - - # wifi profile - if "wifi" in profiles: - check_wifi_security(connection) - - connection.disconnect() - print(f"[*] Disconnected from RouterOS ({args.ip})") - - -# CVE command dispatcher -def run_cve_command(args): - # mode 1: manual version - if args.mode_or_ip.lower() == "version": - version = args.username_or_version.strip() - - section("CVE Search (Manual)") - info(f"RouterOS Version: {version}") - - # always pass string here - run_cve_audit_for_version(version) + # Run only CVE check if --cve is used + if args.cve: + run_cve_audit(connection) + connection.disconnect() return - # mode 2: live device - ip = args.mode_or_ip - username = args.username_or_version + # Run full audit + check_routeros_version(connection) + check_smb(connection) + check_rmi_services(connection) + check_default_users(connection) + checking_access_to_RMI(connection) + check_wifi_security(connection) + check_upnp_status(connection) + check_dns_status(connection) + check_ddns_status(connection) + check_poe_status(connection) + check_routerboot_protection(connection) + check_socks_status(connection) + check_bandwidth_server_status(connection) + check_neighbor_discovery(connection) + check_password_length_policy(connection) + check_ssh_security(connection) + check_connection_tracking(connection) + check_romon_status(connection) + check_mac_winbox_security(connection) + check_snmp(connection) + check_dst_nat_rules(connection) + detect_malicious_schedulers(connection) + check_static_dns_entries(connection) - # reuse auth helper - args.ip = ip - args.username = username - - section("CVE Search (Live)") - info(f"Target Device: {ip}") - info(f"Transport: SSH (port {args.port})") - - password, key_file, key_passphrase = normalize_auth_and_prompt(args) - connection = connect_to_router( - ip, - username, - password=password, - port=args.port, - key_file=key_file, - key_passphrase=key_passphrase, - ) - - # here we pass connection, not version string - run_cve_audit(connection) + print() connection.disconnect() - print(f"[*] Disconnected from RouterOS ({ip})") + print(Fore.WHITE + f"[*] Disconnected from RouterOS ({args.ip}:{args.port})") -def main(): - banner() - parser = argparse.ArgumentParser() - sub = parser.add_subparsers(dest="command", required=True) - - # Audit mode - audit = sub.add_parser("audit", help="Run RouterOS security configuration audit") - audit.add_argument("ip", help="RouterOS IP address") - audit.add_argument("username", help="SSH username") - audit.add_argument("profiles", help="Profiles: system,protocols,wifi (comma-separated)") - audit.add_argument("key", nargs="?", default=None, help="Path to SSH private key (optional)") - audit.add_argument("port", nargs="?", type=int, default=22, help="SSH port (default: 22)") - audit.set_defaults(func=run_sara_audit) - - # CVE mode - cve = sub.add_parser("cve", help="Run RouterOS CVE audit (live or by version)") - cve.add_argument("mode_or_ip", help="'version' or RouterOS IP address") - cve.add_argument("username_or_version", help="SSH username or RouterOS version string") - cve.add_argument("key", nargs="?", default=None, help="Path to SSH private key (optional)") - cve.add_argument("port", nargs="?", type=int, default=22, help="SSH port (default: 22)") - cve.set_defaults(func=run_cve_command) - - # no args = help - if len(sys.argv) == 1: - parser.print_help() - sys.exit(0) - try: - # dispatch - args = parser.parse_args() - args.func(args) - except KeyboardInterrupt: - print() - error("Interrupted by user") - sys.exit(1) + end_time = time.time() + total_time = round(end_time - start_time, 2) + print(Fore.WHITE + f"[*] All checks have been completed. Security inspection completed in {total_time} seconds\n") if __name__ == "__main__": main() \ No newline at end of file diff --git a/setup.py b/setup.py index 36b12dd..b531ac4 100644 --- a/setup.py +++ b/setup.py @@ -2,8 +2,8 @@ from setuptools import setup, find_packages setup( name="sara", - version="1.3", - url="https://github.com/caster0x00/Sara", + version="1.2", + url="https://github.com/casterbyte/Sara", author="Magama Bazarov", author_email="magamabazarov@mailbox.org", scripts=['sara.py'], @@ -11,13 +11,12 @@ setup( long_description=open('README.md', encoding="utf8").read(), long_description_content_type='text/markdown', license="Apache-2.0", - keywords=['mikrotik', 'routeros', 'config analyzer', 'network security', 'cve',], + keywords=['mikrotik', 'routeros', 'config analyzer', 'network security',], packages=find_packages(), install_requires=[ 'colorama', 'netmiko', 'packaging', - 'requests', ], py_modules=['cve_analyzer'], entry_points={