#!/usr/bin/env python3 import csv import os import re import signal import subprocess import sys import time from argparse import ArgumentParser from collections import defaultdict from datetime import datetime from scapy.all import sniff, Dot11, RadioTap # === Globals === running = True packet_count = 0 clients = defaultdict(int) aps = set() ap_signals = defaultdict(list) # BSSID -> list of dBm ssid_map = {} # BSSID -> SSID ssid_signals = defaultdict(list) # SSID -> list of dBm ap_clients = defaultdict(lambda: defaultdict(int)) target_ap_bssid = None # === Signal handling === def stop_sniff(signum, frame): global running print("[!] SIGINT received, stopping sniffing and writing metrics.") running = False signal.signal(signal.SIGINT, stop_sniff) # === Channel syncing === def get_current_channel(interface): try: output = subprocess.check_output(["iw", "dev", interface, "info"]).decode() for line in output.splitlines(): if "channel" in line.lower(): return int(line.strip().split()[1]) except Exception as e: print(f"[!] Failed to get channel for {interface}: {e}") return None def set_monitor_channel(interface, channel): try: subprocess.check_call(["iw", "dev", interface, "set", "channel", str(channel)]) print(f"[+] Set {interface} to channel {channel}") except subprocess.CalledProcessError as e: print(f"[!] Failed to set {interface} to channel {channel}: {e}") # === Packet handling === def parse_ssid(pkt): try: elt = pkt.getlayer(Dot11).payload while elt: if hasattr(elt, "ID") and elt.ID == 0: # SSID tag return elt.info.decode(errors="ignore") elt = elt.payload except: return None return None def is_unicast(mac): if not mac: return False return int(mac.split(":")[0], 16) % 2 == 0 and mac.lower() != "ff:ff:ff:ff:ff:ff" def handle_packet(pkt): global packet_count packet_count += 1 if not pkt.haslayer(Dot11) or not pkt.haslayer(RadioTap): return dot11 = pkt[Dot11] a1 = dot11.addr1.lower() if dot11.addr1 else None a2 = dot11.addr2.lower() if dot11.addr2 else None # === Detect APs via beacons/probe responses === if dot11.type == 0 and dot11.subtype in (5, 8): # Probe Response or Beacon if a2: aps.add(a2) ssid = parse_ssid(pkt) if ssid: ssid_map[a2] = ssid # === Track all seen clients === if dot11.type == 2: sa = dot11.addr2.lower() if dot11.addr2 else None da = dot11.addr1.lower() if dot11.addr1 else None for mac in (sa, da): if is_unicast(mac) and mac != target_ap_bssid: print(f"[D] Counting client: {mac} (frame type: {dot11.type}, subtype: {dot11.subtype})") clients[mac] += 1 # Track clients talking to the same AP we're connected to if target_ap_bssid: if a1 and a2: if target_ap_bssid in (a1, a2): peer = a2 if a1 == target_ap_bssid else a1 if is_unicast(peer) and peer not in aps: ap_clients[target_ap_bssid][peer] += 1 # === Signal strength tracking === try: signal_dbm = pkt[RadioTap].dBm_AntSignal if a2 in aps: ap_signals[a2].append(signal_dbm) ssid = ssid_map.get(a2) if ssid: ssid_signals[ssid].append(signal_dbm) except: pass # === Metrics dump === def write_csv(outfile): timestamp = datetime.utcnow().isoformat() row = { "Timestamp": timestamp, "ClientsOnChannel": len([mac for mac, count in clients.items() if count > 3]), "APsOnChannel": len(aps), "PacketCount": packet_count, "AvgAPSignal": round(sum([sum(v)/len(v) for v in ap_signals.values() if v]) / len(ap_signals) if ap_signals else 0, 2), "StrongestAPSignal": max([max(v) for v in ap_signals.values() if v], default=0), "AvgSSIDSignal": round(sum([sum(v)/len(v) for v in ssid_signals.values() if v]) / len(ssid_signals) if ssid_signals else 0, 2), "MaxSSIDSignal": max([max(v) for v in ssid_signals.values() if v], default=0), "ClientsOnAP": len([mac for mac, count in ap_clients[target_ap_bssid].items() if count > 3]), "CiscoAvgReportedClients": "N/A", "CiscoMaxReportedClients": "N/A", "NumberofBSSIDsOnSSID": "N/A", "NumberofChannelsOnSSID": "N/A" } new_file = not os.path.exists(outfile) with open(outfile, "a", newline="") as f: writer = csv.DictWriter(f, fieldnames=row.keys()) if new_file: writer.writeheader() writer.writerow(row) print(f"[+] Metrics written to {outfile}") def reset_interface(interface): print(f"[~] Resetting interface {interface} to default state...") try: subprocess.call(["ip", "link", "set", interface, "down"]) time.sleep(1) subprocess.call(["iw", interface, "set", "type", "monitor"]) time.sleep(1) subprocess.call(["ip", "link", "set", interface, "up"]) time.sleep(1) print(f"[+] Interface {interface} reset complete.") except Exception as e: print(f"[!] Failed to reset interface {interface}: {e}") def get_connected_bssid(interface): try: out = subprocess.check_output(["iw", interface, "link"]).decode() match = re.search(r"Connected to ([0-9A-Fa-f:]{17})", out, re.MULTILINE) if match: return match.group(1).lower() print("[!] No valid MAC address found in iw link output.") return None except Exception as e: print(f"[!] Failed to get connected BSSID for {interface}: {e}") return None # === Main === def main(): parser = ArgumentParser() parser.add_argument("--main-iface", required=True, help="Active interface (used to determine channel)") parser.add_argument("--monitor-iface", required=True, help="Monitor interface to sniff on") parser.add_argument("--outfile", required=True, help="CSV file to append metrics row") args = parser.parse_args() reset_interface(args.monitor_iface) print(f"[+] Starting passive observer.") print(f" Main interface: {args.main_iface}") print(f" Monitor interface: {args.monitor_iface}") print(f" Output file: {args.outfile}") channel = get_current_channel(args.main_iface) if channel: set_monitor_channel(args.monitor_iface, channel) else: print("[!] Could not determine current channel. Exiting.") sys.exit(1) global target_ap_bssid target_ap_bssid = get_connected_bssid(args.main_iface) if not target_ap_bssid: print("[!] Could not determine connected BSSID. ClientsOnAP will be 0.") else: print(f"[+] Connected BSSID (target AP): {target_ap_bssid}") print("[+] Sniffing... (waiting for SIGINT to stop)") while running: sniff(iface=args.monitor_iface, prn=handle_packet, store=False, timeout=5) write_csv(args.outfile) reset_interface(args.monitor_iface) if __name__ == "__main__": main()