#!/usr/bin/env python3 import os import signal import csv import sys import subprocess from scapy.all import sniff, Dot11, RadioTap from collections import defaultdict from datetime import datetime from argparse import ArgumentParser # === Globals === running = True packet_count = 0 clients = set() aps = set() ap_signals = defaultdict(list) # BSSID -> list of dBm ssid_map = {} # BSSID -> SSID ssid_signals = defaultdict(list) # SSID -> list of dBm ap_clients = defaultdict(set) # === Signal handling === def stop_sniff(signum, frame): global running print("[!] SIGINT received, stopping sniffing and writing metrics.") running = False signal.signal(signal.SIGINT, stop_sniff) # === Channel syncing === def get_current_channel(interface): try: output = subprocess.check_output(["iw", "dev", interface, "info"]).decode() for line in output.splitlines(): if "channel" in line.lower(): return int(line.strip().split()[1]) except Exception as e: print(f"[!] Failed to get channel for {interface}: {e}") return None def set_monitor_channel(interface, channel): try: subprocess.check_call(["iw", "dev", interface, "set", "channel", str(channel)]) print(f"[+] Set {interface} to channel {channel}") except subprocess.CalledProcessError as e: print(f"[!] Failed to set {interface} to channel {channel}: {e}") # === Packet handling === def parse_ssid(pkt): try: elt = pkt.getlayer(Dot11).payload while elt: if hasattr(elt, "ID") and elt.ID == 0: # SSID tag return elt.info.decode(errors="ignore") elt = elt.payload except: return None return None def handle_packet(pkt): global packet_count packet_count += 1 if not pkt.haslayer(Dot11) or not pkt.haslayer(RadioTap): return dot11 = pkt[Dot11] # === Detect APs via beacons/probe responses === if dot11.type == 0 and dot11.subtype in (5, 8): # Probe Response or Beacon if dot11.addr2: aps.add(dot11.addr2) ssid = parse_ssid(pkt) if ssid: ssid_map[dot11.addr2] = ssid # === Track all seen clients === if dot11.addr1: clients.add(dot11.addr1) if dot11.addr2: clients.add(dot11.addr2) # === Guess client <-> AP relationships === if dot11.addr1 in aps and dot11.addr2: ap_clients[dot11.addr1].add(dot11.addr2) elif dot11.addr2 in aps and dot11.addr1: ap_clients[dot11.addr2].add(dot11.addr1) # === Signal strength tracking === try: signal_dbm = pkt[RadioTap].dBm_AntSignal if dot11.addr2 in aps: ap_signals[dot11.addr2].append(signal_dbm) ssid = ssid_map.get(dot11.addr2) if ssid: ssid_signals[ssid].append(signal_dbm) except: pass # === Metrics dump === def write_csv(outfile): timestamp = datetime.utcnow().isoformat() row = { "Timestamp": timestamp, "ClientsOnChannel": len(clients), "APsOnChannel": len(aps), "PacketCount": packet_count, "AvgAPSignal": round(sum([sum(v)/len(v) for v in ap_signals.values() if v]) / len(ap_signals) if ap_signals else 0, 2), "StrongestAPSignal": max([max(v) for v in ap_signals.values() if v], default=0), "AvgSSIDSignal": round(sum([sum(v)/len(v) for v in ssid_signals.values() if v]) / len(ssid_signals) if ssid_signals else 0, 2), "MaxSSIDSignal": max([max(v) for v in ssid_signals.values() if v], default=0), "ClientsOnAP": sum(len(s) for s in ap_clients.values()), "CiscoAvgReportedClients": "N/A", "CiscoMaxReportedClients": "N/A", "NumberofBSSIDsOnSSID": "N/A", "NumberofChannelsOnSSID": "N/A" } new_file = not os.path.exists(outfile) with open(outfile, "a", newline="") as f: writer = csv.DictWriter(f, fieldnames=row.keys()) if new_file: writer.writeheader() writer.writerow(row) print(f"[+] Metrics written to {outfile}") # === Main === def main(): parser = ArgumentParser() parser.add_argument("--main-iface", required=True, help="Active interface (used to determine channel)") parser.add_argument("--monitor-iface", required=True, help="Monitor interface to sniff on") parser.add_argument("--outfile", required=True, help="CSV file to append metrics row") args = parser.parse_args() print(f"[+] Starting passive observer.") print(f" Main interface: {args.main_iface}") print(f" Monitor interface: {args.monitor_iface}") print(f" Output file: {args.outfile}") channel = get_current_channel(args.main_iface) if channel: set_monitor_channel(args.monitor_iface, channel) else: print("[!] Could not determine current channel. Exiting.") sys.exit(1) print("[+] Sniffing... (waiting for SIGINT to stop)") while running: sniff(iface=args.monitor_iface, prn=handle_packet, store=False, timeout=5) write_csv(args.outfile) if __name__ == "__main__": main()