diff --git a/listener.py b/listener.py index 81a512b..196fd8f 100755 --- a/listener.py +++ b/listener.py @@ -1,21 +1,40 @@ #!/usr/bin/env python3 import os import signal -import time -from collections import defaultdict -from scapy.all import sniff, Dot11, RadioTap -from dotenv import load_dotenv -from pathlib import Path +import csv +import sys import subprocess +from scapy.all import sniff, Dot11, RadioTap +from collections import defaultdict +from datetime import datetime +from argparse import ArgumentParser +# === Globals === +running = True +packet_count = 0 +clients = set() +aps = set() +ap_signals = defaultdict(list) # BSSID -> list of dBm +ssid_map = {} # BSSID -> SSID +ssid_signals = defaultdict(list) # SSID -> list of dBm + +# === Signal handling === +def stop_sniff(signum, frame): + global running + print("[!] SIGINT received, stopping sniffing and writing metrics.") + running = False + +signal.signal(signal.SIGINT, stop_sniff) + +# === Channel syncing === def get_current_channel(interface): try: output = subprocess.check_output(["iw", "dev", interface, "info"]).decode() for line in output.splitlines(): if "channel" in line.lower(): - return int(line.split()[1]) + return int(line.strip().split()[1]) except Exception as e: - print(f"[!] Failed to determine current channel for {interface}: {e}") + print(f"[!] Failed to get channel for {interface}: {e}") return None def set_monitor_channel(interface, channel): @@ -25,118 +44,101 @@ def set_monitor_channel(interface, channel): except subprocess.CalledProcessError as e: print(f"[!] Failed to set {interface} to channel {channel}: {e}") -# --- Settings.env auto-detection logic --- -SCRIPT_DIRECTORY = Path(__file__).resolve().parent - -# Try intelligent guess of real user home even under sudo -SUDO_USER = os.environ.get("SUDO_USER") -USER_HOME = Path(f"/home/{SUDO_USER}") if SUDO_USER else Path.home() -ENV_PATH = USER_HOME / "wifi_test" / "settings.env" - -# Fallback if all else fails -if not ENV_PATH.exists(): - ENV_PATH = Path("/home/yaro/wifi_test/settings.env") - -if not ENV_PATH.exists(): - print(f"[!] Failed to load settings from {ENV_PATH}") - exit(1) - -load_dotenv(dotenv_path=ENV_PATH) - -# Now load settings -LISTEN_INTERFACE = os.getenv("LISTEN_INTERFACE", "wlan0") -print(f"[+] Using LISTEN_INTERFACE = {LISTEN_INTERFACE}") - -# === Globals === -clients_per_channel = defaultdict(set) -aps_per_channel = defaultdict(set) -running = True - -def get_channel(pkt): - # Convert frequency to channel - if not pkt.haslayer(RadioTap): - return None +# === Packet handling === +def parse_ssid(pkt): try: - freq = pkt[RadioTap].ChannelFrequency - if 2412 <= freq <= 2472: - return (freq - 2407) // 5 - elif freq == 2484: - return 14 - elif 5180 <= freq <= 5825: - return (freq - 5000) // 5 + elt = pkt.getlayer(Dot11).payload + while elt: + if hasattr(elt, "ID") and elt.ID == 0: # SSID tag + return elt.info.decode(errors="ignore") + elt = elt.payload except: return None return None def handle_packet(pkt): - if not pkt.haslayer(Dot11): - return - ch = get_channel(pkt) - if ch is None: + global packet_count + packet_count += 1 + + if not pkt.haslayer(Dot11) or not pkt.haslayer(RadioTap): return dot11 = pkt[Dot11] - if dot11.type == 0 and dot11.subtype in (5, 8): # Probe Response or Beacon - aps_per_channel[ch].add(dot11.addr2) + if dot11.type == 0 and dot11.subtype in (5, 8): # Probe Resp or Beacon + aps.add(dot11.addr2) + ssid = parse_ssid(pkt) + if ssid: + ssid_map[dot11.addr2] = ssid else: if dot11.addr1: - clients_per_channel[ch].add(dot11.addr1) + clients.add(dot11.addr1) if dot11.addr2: - clients_per_channel[ch].add(dot11.addr2) + clients.add(dot11.addr2) -def print_stats(): - print("\n[+] Live Wi-Fi Stats") - for ch in sorted(set(clients_per_channel) | set(aps_per_channel)): - c = len(clients_per_channel[ch]) - a = len(aps_per_channel[ch]) - print(f" - Channel {ch}: {c} clients, {a} APs") - print("-" * 40) - -def stop_sniff(signum, frame): - global running - print("\n[!] Caught Ctrl+C, exiting...") - running = False - -# === Main === -if __name__ == "__main__": - test_interface = os.getenv("INTERFACE", "wlan0") - monitor_interface = os.getenv("LISTEN_INTERFACE", "wlan0") - - current_channel = get_current_channel(test_interface) - if current_channel: - set_monitor_channel(monitor_interface, current_channel) - else: - print("[!] Unable to determine the current channel. Exiting.") - exit(1) - - signal.signal(signal.SIGINT, stop_sniff) - print(f"[+] Listening on interface {LISTEN_INTERFACE} (press Ctrl+C to stop)") - - # Start sniffing in a background thread try: - last_channel = current_channel - channel_check_counter = 0 - - while running: - sniff( - iface=LISTEN_INTERFACE, - prn=handle_packet, - store=False, - monitor=True, - timeout=10 - ) - print_stats() - - # Check channel every 3 loops (~30 seconds) - channel_check_counter += 1 - if channel_check_counter >= 3: - channel_check_counter = 0 - new_channel = get_current_channel(test_interface) - if new_channel and new_channel != last_channel: - print(f"[~] Detected channel change: {last_channel} → {new_channel}") - set_monitor_channel(monitor_interface, new_channel) - last_channel = new_channel - except KeyboardInterrupt: + signal_dbm = pkt[RadioTap].dBm_AntSignal + if dot11.addr2 in aps: + ap_signals[dot11.addr2].append(signal_dbm) + ssid = ssid_map.get(dot11.addr2) + if ssid: + ssid_signals[ssid].append(signal_dbm) + except: pass +# === Metrics dump === +def write_csv(outfile): + timestamp = datetime.utcnow().isoformat() + row = { + "Timestamp": timestamp, + "ClientsOnChannel": len(clients), + "APsOnChannel": len(aps), + "PacketCount": packet_count, + "AvgAPSignal": round(sum([sum(v)/len(v) for v in ap_signals.values() if v]) / len(ap_signals) if ap_signals else 0, 2), + "StrongestAPSignal": max([max(v) for v in ap_signals.values() if v], default=0), + "AvgSSIDSignal": round(sum([sum(v)/len(v) for v in ssid_signals.values() if v]) / len(ssid_signals) if ssid_signals else 0, 2), + "MaxSSIDSignal": max([max(v) for v in ssid_signals.values() if v], default=0), + "ClientsOnAP": "N/A", + "CiscoAvgReportedClients": "N/A", + "CiscoMaxReportedClients": "N/A", + "NumberofBSSIDsOnSSID": "N/A", + "NumberofChannelsOnSSID": "N/A" + } + + new_file = not os.path.exists(outfile) + with open(outfile, "a", newline="") as f: + writer = csv.DictWriter(f, fieldnames=row.keys()) + if new_file: + writer.writeheader() + writer.writerow(row) + print(f"[+] Metrics written to {outfile}") + +# === Main === +def main(): + parser = ArgumentParser() + parser.add_argument("--main-iface", required=True, help="Active interface (used to determine channel)") + parser.add_argument("--monitor-iface", required=True, help="Monitor interface to sniff on") + parser.add_argument("--outfile", required=True, help="CSV file to append metrics row") + args = parser.parse_args() + + print(f"[+] Starting passive observer.") + print(f" Main interface: {args.main_iface}") + print(f" Monitor interface: {args.monitor_iface}") + print(f" Output file: {args.outfile}") + + channel = get_current_channel(args.main_iface) + if channel: + set_monitor_channel(args.monitor_iface, channel) + else: + print("[!] Could not determine current channel. Exiting.") + sys.exit(1) + + print("[+] Sniffing... (waiting for SIGINT to stop)") + + while running: + sniff(iface=args.monitor_iface, prn=handle_packet, store=False, timeout=5) + + write_csv(args.outfile) + +if __name__ == "__main__": + main()