#!/usr/bin/env python3 import os import signal import csv import sys import subprocess import time from scapy.all import sniff, Dot11, RadioTap from collections import defaultdict from datetime import datetime from argparse import ArgumentParser # === Globals === running = True packet_count = 0 clients = set() aps = set() ap_signals = defaultdict(list) # BSSID -> list of dBm ssid_map = {} # BSSID -> SSID ssid_signals = defaultdict(list) # SSID -> list of dBm ap_clients = defaultdict(set) # === Signal handling === def stop_sniff(signum, frame): global running print("[!] SIGINT received, stopping sniffing and writing metrics.") running = False signal.signal(signal.SIGINT, stop_sniff) # === Channel syncing === def get_current_channel(interface): try: output = subprocess.check_output(["iw", "dev", interface, "info"]).decode() for line in output.splitlines(): if "channel" in line.lower(): return int(line.strip().split()[1]) except Exception as e: print(f"[!] Failed to get channel for {interface}: {e}") return None def set_monitor_channel(interface, channel): try: subprocess.check_call(["iw", "dev", interface, "set", "channel", str(channel)]) print(f"[+] Set {interface} to channel {channel}") except subprocess.CalledProcessError as e: print(f"[!] Failed to set {interface} to channel {channel}: {e}") # === Packet handling === def parse_ssid(pkt): try: elt = pkt.getlayer(Dot11).payload while elt: if hasattr(elt, "ID") and elt.ID == 0: # SSID tag return elt.info.decode(errors="ignore") elt = elt.payload except: return None return None def handle_packet(pkt): global packet_count packet_count += 1 if not pkt.haslayer(Dot11) or not pkt.haslayer(RadioTap): return dot11 = pkt[Dot11] # === Detect APs via beacons/probe responses === if dot11.type == 0 and dot11.subtype in (5, 8): # Probe Response or Beacon if dot11.addr2: aps.add(dot11.addr2) ssid = parse_ssid(pkt) if ssid: ssid_map[dot11.addr2] = ssid # === Track all seen clients === if dot11.addr1: clients.add(dot11.addr1) if dot11.addr2: clients.add(dot11.addr2) # === Guess client <-> AP relationships === if dot11.addr1 in aps and dot11.addr2: ap_clients[dot11.addr1].add(dot11.addr2) elif dot11.addr2 in aps and dot11.addr1: ap_clients[dot11.addr2].add(dot11.addr1) # === Signal strength tracking === try: signal_dbm = pkt[RadioTap].dBm_AntSignal if dot11.addr2 in aps: ap_signals[dot11.addr2].append(signal_dbm) ssid = ssid_map.get(dot11.addr2) if ssid: ssid_signals[ssid].append(signal_dbm) except: pass # === Metrics dump === def write_csv(outfile): timestamp = datetime.utcnow().isoformat() row = { "Timestamp": timestamp, "ClientsOnChannel": len(clients), "APsOnChannel": len(aps), "PacketCount": packet_count, "AvgAPSignal": round(sum([sum(v)/len(v) for v in ap_signals.values() if v]) / len(ap_signals) if ap_signals else 0, 2), "StrongestAPSignal": max([max(v) for v in ap_signals.values() if v], default=0), "AvgSSIDSignal": round(sum([sum(v)/len(v) for v in ssid_signals.values() if v]) / len(ssid_signals) if ssid_signals else 0, 2), "MaxSSIDSignal": max([max(v) for v in ssid_signals.values() if v], default=0), "ClientsOnAP": sum(len(s) for s in ap_clients.values()), "CiscoAvgReportedClients": "N/A", "CiscoMaxReportedClients": "N/A", "NumberofBSSIDsOnSSID": "N/A", "NumberofChannelsOnSSID": "N/A" } new_file = not os.path.exists(outfile) with open(outfile, "a", newline="") as f: writer = csv.DictWriter(f, fieldnames=row.keys()) if new_file: writer.writeheader() writer.writerow(row) print(f"[+] Metrics written to {outfile}") def reset_interface(interface): print(f"[~] Resetting interface {interface} to default state...") try: subprocess.call(["ip", "link", "set", interface, "down"]) time.sleep(1) subprocess.call(["iw", interface, "set", "type", "monitor"]) time.sleep(1) subprocess.call(["ip", "link", "set", interface, "up"]) time.sleep(1) print(f"[+] Interface {interface} reset complete.") except Exception as e: print(f"[!] Failed to reset interface {interface}: {e}") # === Main === def main(): parser = ArgumentParser() parser.add_argument("--main-iface", required=True, help="Active interface (used to determine channel)") parser.add_argument("--monitor-iface", required=True, help="Monitor interface to sniff on") parser.add_argument("--outfile", required=True, help="CSV file to append metrics row") args = parser.parse_args() reset_interface(args.monitor_iface) print(f"[+] Starting passive observer.") print(f" Main interface: {args.main_iface}") print(f" Monitor interface: {args.monitor_iface}") print(f" Output file: {args.outfile}") channel = get_current_channel(args.main_iface) if channel: set_monitor_channel(args.monitor_iface, channel) else: print("[!] Could not determine current channel. Exiting.") sys.exit(1) print("[+] Sniffing... (waiting for SIGINT to stop)") while running: sniff(iface=args.monitor_iface, prn=handle_packet, store=False, timeout=5) write_csv(args.outfile) reset_interface(args.monitor_iface) if __name__ == "__main__": main()