#!/usr/bin/env python3 import csv import os import re import signal import subprocess import sys import time import threading from argparse import ArgumentParser from collections import defaultdict from datetime import datetime from scapy.all import sniff, Dot11, RadioTap # === Globals === running = True packet_count = 0 clients = defaultdict(int) aps = set() ap_signals = defaultdict(list) # BSSID -> list of dBm ssid_map = {} # BSSID -> SSID ssid_signals = defaultdict(list) # SSID -> list of dBm ap_clients = defaultdict(lambda: defaultdict(int)) target_ap_bssid = None beacon_counts = defaultdict(int) current_channel = None include_probes = False deadpoint_candidates = set() unlinked_candidates = set() bssid_channels = {} CHANNEL_LIST = [1, 6, 11, 36, 40, 44, 48, 52, 56, 60, 64, 100, 104, 108, 112, 116, 120, 124, 128, 132, 136, 140, 149, 153, 157, 161, 165] # Channels to hop CHANNEL_HOP_INTERVAL = 5 # Seconds per channel # === Signal handling === def stop_sniff(signum, frame): global running print("[!] SIGINT received, stopping sniffing and writing metrics.") running = False signal.signal(signal.SIGINT, stop_sniff) # === Channel syncing === def get_current_channel(interface): try: output = subprocess.check_output(["iw", "dev", interface, "info"]).decode() for line in output.splitlines(): if "channel" in line.lower(): return int(line.strip().split()[1]) except Exception as e: print(f"[!] Failed to get channel for {interface}: {e}") return None def set_monitor_channel(interface, channel): try: subprocess.check_call(["iw", "dev", interface, "set", "channel", str(channel)]) print(f"[+] Set {interface} to channel {channel}") except subprocess.CalledProcessError as e: print(f"[!] Failed to set {interface} to channel {channel}: {e}") # === Packet handling === def parse_ssid(pkt): try: elt = pkt.getlayer(Dot11).payload while elt: if hasattr(elt, "ID") and elt.ID == 0: # SSID tag return elt.info.decode(errors="ignore") elt = elt.payload except: return None return None def is_unicast(mac): if not mac: return False return int(mac.split(":")[0], 16) % 2 == 0 and mac.lower() != "ff:ff:ff:ff:ff:ff" def handle_packet(pkt): global packet_count packet_count += 1 if not pkt.haslayer(Dot11) or not pkt.haslayer(RadioTap): return dot11 = pkt[Dot11] a1 = dot11.addr1.lower() if dot11.addr1 else None a2 = dot11.addr2.lower() if dot11.addr2 else None # === Detect APs via beacon (and optionally probe response) frames === if dot11.type == 0 and dot11.subtype in ([8, 5] if include_probes else [8]): if a2 and is_unicast(a2): beacon_counts[a2] += 1 if beacon_counts[a2] > 1: aps.add(a2) deadpoint_candidates.add(a2) ssid = parse_ssid(pkt) if ssid: ssid_map[a2] = ssid # Track channel seen on if pkt.haslayer(RadioTap) and hasattr(pkt[RadioTap], 'ChannelFrequency'): # Nah, be lazy bssid_channels[a2] = current_channel # === Track all seen clients === if dot11.type == 2: sa = dot11.addr2.lower() if dot11.addr2 else None da = dot11.addr1.lower() if dot11.addr1 else None for mac in (sa, da): if is_unicast(mac) and mac != target_ap_bssid: clients[mac] += 1 if mac not in aps: unlinked_candidates.add(mac) # Track clients talking to the same AP we're connected to if target_ap_bssid: if a1 and a2: if target_ap_bssid in (a1, a2): peer = a2 if a1 == target_ap_bssid else a1 if is_unicast(peer) and peer not in aps: ap_clients[target_ap_bssid][peer] += 1 # === Signal strength tracking === try: signal_dbm = pkt[RadioTap].dBm_AntSignal if a2 in aps: ap_signals[a2].append(signal_dbm) ssid = ssid_map.get(a2) if ssid: ssid_signals[ssid].append(signal_dbm) except: pass # === Metrics dump === def write_csv(outfile): timestamp = datetime.utcnow().isoformat() row = { "Timestamp": timestamp, "ClientsOnChannel": len([mac for mac, count in clients.items() if count > 3]), "APsOnChannel": len(aps), "PacketCount": packet_count, "AvgAPSignal": round(sum([sum(v)/len(v) for v in ap_signals.values() if v]) / len(ap_signals) if ap_signals else 0, 2), "StrongestAPSignal": max([max(v) for v in ap_signals.values() if v], default=0), "AvgSSIDSignal": round(sum([sum(v)/len(v) for v in ssid_signals.values() if v]) / len(ssid_signals) if ssid_signals else 0, 2), "MaxSSIDSignal": max([max(v) for v in ssid_signals.values() if v], default=0), "ClientsOnAP": len([mac for mac, count in ap_clients[target_ap_bssid].items() if count > 3]), "CiscoAvgReportedClients": "N/A", "CiscoMaxReportedClients": "N/A", "NumberofBSSIDsOnSSID": "N/A", "NumberofChannelsOnSSID": "N/A", "UnlinkedDevices": len(unlinked_candidates), "Deadpoints": len([ap for ap in deadpoint_candidates if not ap_clients.get(ap)]) } new_file = not os.path.exists(outfile) with open(outfile, "a", newline="") as f: writer = csv.DictWriter(f, fieldnames=row.keys()) if new_file: writer.writeheader() writer.writerow(row) print(f"[+] Metrics written to {outfile}") def reset_interface(interface): print(f"[~] Resetting interface {interface} to default state...") try: subprocess.call(["ip", "link", "set", interface, "down"]) time.sleep(1) subprocess.call(["iw", interface, "set", "type", "monitor"]) time.sleep(1) subprocess.call(["ip", "link", "set", interface, "up"]) time.sleep(1) print(f"[+] Interface {interface} reset complete.") except Exception as e: print(f"[!] Failed to reset interface {interface}: {e}") def get_connected_bssid(interface): try: out = subprocess.check_output(["iw", interface, "link"]).decode() match = re.search(r"Connected to ([0-9A-Fa-f:]{17})", out, re.MULTILINE) if match: return match.group(1).lower() print("[!] No valid MAC address found in iw link output.") return None except Exception as e: print(f"[!] Failed to get connected BSSID for {interface}: {e}") return None def print_suspect_aps(): print("\n[?] Suspect SSIDs (possibly printers, IoT, weird stuff):") keywords = ("setup", "direct-", "hp", "epson", "canon", "brother", "smart", "wifi-", "printer") suspects = [] for bssid, ssid in ssid_map.items(): flags = [] if any(kw in ssid.lower() for kw in keywords): flags.append("Suspicious SSID") if bssid in deadpoint_candidates and not ap_clients.get(bssid): flags.append("Deadpoint") if flags: suspects.append((bssid, ssid, flags)) if suspects: for bssid, ssid, flags in suspects: ch = bssid_channels.get(bssid, "?") print(f" - {bssid} (SSID: {ssid}, Channel: {ch}) <-- {' + '.join(flags)}") else: print(" None found (yet).") def channel_hopper(interface): global running i = 0 while running: channel = CHANNEL_LIST[i % len(CHANNEL_LIST)] set_monitor_channel(interface, channel) i += 1 time.sleep(CHANNEL_HOP_INTERVAL) # === Main === def main(): parser = ArgumentParser() parser.add_argument("--main-iface", required=True, help="Active interface (used to determine channel)") parser.add_argument("--monitor-iface", required=True, help="Monitor interface to sniff on") parser.add_argument("--outfile", required=True, help="CSV file to append metrics row") parser.add_argument("--include-probes", action="store_true", help="Include probe responses as valid APs") group = parser.add_mutually_exclusive_group() group.add_argument("--channel", type=int, help="Channel to lock monitor interface to") group.add_argument("--channel-hop", action="store_true", help="Enable channel hopping") args = parser.parse_args() reset_interface(args.monitor_iface) print(f"[+] Starting passive observer.") print(f" Main interface: {args.main_iface}") print(f" Monitor interface: {args.monitor_iface}") print(f" Output file: {args.outfile}") if args.channel: print(f" Override channel: {args.channel}") global target_ap_bssid global current_channel current_channel = args.channel or get_current_channel(args.main_iface) if current_channel: set_monitor_channel(args.monitor_iface, current_channel) else: print("[!] Could not determine current channel. Exiting.") sys.exit(1) if not args.channel: target_ap_bssid = get_connected_bssid(args.main_iface) if not target_ap_bssid: print("[!] Could not determine connected BSSID. ClientsOnAP will be 0.") else: print(f"[+] Connected BSSID (target AP): {target_ap_bssid}") else: target_ap_bssid = None # Can't determine AP if we're not associated print("[+] Sniffing... (waiting for SIGINT to stop)") if args.channel_hop: hopper_thread = threading.Thread(target=channel_hopper, args=(args.monitor_iface,)) hopper_thread.daemon = True hopper_thread.start() while running: sniff(iface=args.monitor_iface, prn=handle_packet, store=False, timeout=5) write_csv(args.outfile) print("\n[+] Final APs counted on this channel:") for bssid in sorted(aps): ssid = ssid_map.get(bssid, "") ch = bssid_channels.get(bssid, "?") print(f" - {bssid} (SSID: {ssid}, Channel: {ch})") print(f"[+] Total APsOnChannel: {len(aps)}") print_suspect_aps() reset_interface(args.monitor_iface) if __name__ == "__main__": main()