#!/usr/bin/env python3 # Note: ClientsOnAP may exceed ClientsOnChannel due to silent/low-activity clients # receiving traffic from AP but not sending enough to cross channel activity threshold. import csv import os import re import signal import subprocess import sys import time import threading import psutil from argparse import ArgumentParser from collections import defaultdict from datetime import datetime from scapy.all import sniff, Dot11, RadioTap # === Globals === running = True packet_count = 0 clients = defaultdict(int) aps = set() ap_signals = defaultdict(list) # BSSID -> list of dBm ssid_map = {} # BSSID -> SSID ssid_signals = defaultdict(list) # SSID -> list of dBm ap_clients = defaultdict(lambda: defaultdict(int)) target_ap_bssid = None beacon_counts = defaultdict(int) current_channel = None include_probes = False # deadpoint_candidates = set() unlinked_candidates = set() bssid_channels = {} vendor_cache = {} CHANNEL_LIST = [1, 6, 11, 36, 40, 48, 52, 64, 100, 104, 108, 112, 149, 153, 161] # Channels to hop CHANNEL_HOP_INTERVAL = 5 # Seconds per channel def safe_call(cmd): try: subprocess.check_call(cmd) except subprocess.CalledProcessError as e: print(f"[!] Command failed: {' '.join(cmd)}\n{e}") def get_channel_from_freq(freq): if 2412 <= freq <= 2472: return (freq - 2407) // 5 elif freq == 2484: return 14 elif 5180 <= freq <= 5825: return (freq - 5000) // 5 return None # === Signal handling === def stop_sniff(signum, frame): global running print("[!] SIGINT received, stopping sniffing and writing metrics.") running = False signal.signal(signal.SIGINT, stop_sniff) # === Channel syncing === def get_current_channel(interface): try: output = subprocess.check_output(["iw", "dev", interface, "info"]).decode() for line in output.splitlines(): if "channel" in line.lower(): return int(line.strip().split()[1]) except Exception as e: print(f"[!] Failed to get channel for {interface}: {e}") return None def set_monitor_channel(interface, channel): try: subprocess.check_call(["iw", "dev", interface, "set", "channel", str(channel)]) print(f"[+] Set {interface} to channel {channel}") except subprocess.CalledProcessError as e: print(f"[!] Failed to set {interface} to channel {channel}: {e}") # === Packet handling === def parse_ssid(pkt): try: elt = pkt.getlayer(Dot11).payload while elt: if hasattr(elt, "ID") and elt.ID == 0: # SSID tag return elt.info.decode(errors="ignore") elt = elt.payload except: return None return None def is_unicast(mac): if not mac: return False return int(mac.split(":")[0], 16) % 2 == 0 and mac.lower() != "ff:ff:ff:ff:ff:ff" def handle_packet(pkt): global packet_count packet_count += 1 if not pkt.haslayer(Dot11) or not pkt.haslayer(RadioTap): return dot11 = pkt[Dot11] a1 = dot11.addr1.lower() if dot11.addr1 else None a2 = dot11.addr2.lower() if dot11.addr2 else None try: freq = pkt[RadioTap].ChannelFrequency packet_channel = get_channel_from_freq(freq) except: packet_channel = current_channel or "?" # === Detect APs via beacon (and optionally probe response) frames === if dot11.type == 0 and dot11.subtype in ([8, 5] if include_probes else [8]): if a2 and is_unicast(a2): beacon_counts[a2] += 1 if beacon_counts[a2] > 1: aps.add(a2) ssid = parse_ssid(pkt) if ssid: ssid_map[a2] = ssid # Track channel seen on if pkt.haslayer(RadioTap) and hasattr(pkt[RadioTap], 'ChannelFrequency'): # Nah, be lazy if packet_channel: bssid_channels[a2] = packet_channel # === Track all seen clients === if dot11.type == 2: sa = dot11.addr2.lower() if dot11.addr2 else None da = dot11.addr1.lower() if dot11.addr1 else None for mac in (sa, da): if is_unicast(mac) and mac != target_ap_bssid: clients[mac] += 1 if mac not in aps: unlinked_candidates.add(mac) if dot11.type == 0 and dot11.subtype in [0, 1]: sa = dot11.addr2.lower() if dot11.addr2 else None da = dot11.addr1.lower() if dot11.addr1 else None for mac in (sa, da): if is_unicast(mac) and mac != target_ap_bssid: clients[mac] += 1 if mac not in aps: unlinked_candidates.add(mac) if da in aps and is_unicast(sa): ap_clients[da][sa] += 5 # Track clients talking to any known AP if a1 in aps and is_unicast(a2): ap_clients[a1][a2] += 1 elif a2 in aps and is_unicast(a1): ap_clients[a2][a1] += 1 # === Signal strength tracking === try: signal_dbm = pkt[RadioTap].dBm_AntSignal if a2 in aps: ap_signals[a2].append(signal_dbm) ssid = ssid_map.get(a2) if ssid: ssid_signals[ssid].append(signal_dbm) except: pass # === Metrics dump === def write_csv(outfile): timestamp = datetime.utcnow().isoformat() row = { "Timestamp": timestamp, "ClientsOnAP": len([mac for mac, count in ap_clients[target_ap_bssid].items() if count > 3]), "ClientsOnChannel": len([mac for mac, count in clients.items() if count > 3]), "APsOnChannel": len([ bssid for bssid in aps if bssid_channels.get(bssid) == current_channel ]), "AvgAPSignal": round(sum([sum(v)/len(v) for v in ap_signals.values() if v]) / len(ap_signals) if ap_signals else 0, 2), "StrongestAPSignal": max([max(v) for v in ap_signals.values() if v], default=0), "UnlinkedDevices": len(unlinked_candidates), "NumberofBSSIDsOnSSID": len([ bssid for bssid, ssid in ssid_map.items() if target_ap_bssid in ssid_map and ssid == ssid_map.get(target_ap_bssid) ]) if target_ap_bssid and target_ap_bssid in ssid_map else 0, "AvgSSIDSignal": round(sum([sum(v)/len(v) for v in ssid_signals.values() if v]) / len(ssid_signals) if ssid_signals else 0, 2), "MaxSSIDSignal": max([max(v) for v in ssid_signals.values() if v], default=0), "NumberofChannelsOnSSID": len(set([ bssid_channels.get(bssid) for bssid, ssid in ssid_map.items() if target_ap_bssid in ssid_map and ssid == ssid_map.get(target_ap_bssid) ])) if target_ap_bssid and target_ap_bssid in ssid_map else 0, "PacketCount": packet_count, "Deadpoints": len([ap for ap in aps if is_deadpoint(ap)]) } new_file = not os.path.exists(outfile) with open(outfile, "a", newline="") as f: writer = csv.DictWriter(f, fieldnames=row.keys()) if new_file: writer.writeheader() writer.writerow(row) print(f"[+] Metrics written to {outfile}") def reset_interface(interface): print(f"[~] Resetting interface {interface} to default state...") try: safe_call(["ip", "link", "set", interface, "down"]) time.sleep(1) safe_call(["iw", interface, "set", "type", "monitor"]) time.sleep(1) safe_call(["ip", "link", "set", interface, "up"]) time.sleep(1) print(f"[+] Interface {interface} reset complete.") except Exception as e: print(f"[!] Failed to reset interface {interface}: {e}") def get_connected_bssid(interface): try: out = subprocess.check_output(["iw", interface, "link"]).decode() match = re.search(r"Connected to ([0-9A-Fa-f:]{17})", out, re.MULTILINE) if match: return match.group(1).lower() print("[!] No valid MAC address found in iw link output.") return None except Exception as e: print(f"[!] Failed to get connected BSSID for {interface}: {e}") return None def print_suspect_aps(): print("\n[?] Suspect SSIDs (possibly printers, IoT, weird stuff):") keywords = ("setup", "direct-", "hp", "epson", "canon", "brother", "smart", "wifi-", "printer") suspects = [] for bssid in aps: ssid = ssid_map.get(bssid, "") flags = [] if any(kw in ssid.lower() for kw in keywords): flags.append("Likely non-AP") if is_deadpoint(bssid): flags.append("Deadpoint") if flags: suspects.append((bssid, ssid, flags)) if suspects: for bssid, ssid, flags in suspects: ch = bssid_channels.get(bssid, "?") print(f" - {bssid} (SSID: {ssid}, Channel: {ch}) <-- {' + '.join(flags)}") else: print(" None found (yet).") def channel_hopper(interface, hop_interval): global running i = 0 while running: channel = CHANNEL_LIST[i % len(CHANNEL_LIST)] set_monitor_channel(interface, channel) i += 1 time.sleep(hop_interval) def is_deadpoint(ap_bssid): return sum(ap_clients[ap_bssid].values()) < 2 # No meaningful client interaction def wait_for_interface_up(interface, timeout=5): import shutil import time for _ in range(timeout * 5): result = shutil.which("iw") # Just checking iw exists first if not result: print("[!] 'iw' not found.") return False try: info = subprocess.check_output(["iw", "dev"]).decode() if f"Interface {interface}" in info: if "type monitor" in info: return True except subprocess.CalledProcessError: pass time.sleep(0.2) print(f"[!] Timeout waiting for interface {interface} to be up and in monitor mode.") return False # === Main === def main(): parser = ArgumentParser() parser.add_argument("--main-iface", required=True, help="Active interface (used to determine channel)") parser.add_argument("--monitor-iface", required=True, help="Monitor interface to sniff on") parser.add_argument("--outfile", required=True, help="CSV file to append metrics row") parser.add_argument("--include-probes", action="store_true", help="Include probe responses as valid APs") parser.add_argument("--hop-interval", type=int, default=CHANNEL_HOP_INTERVAL, help="Interval for channel hopping (default: 5 seconds)") group = parser.add_mutually_exclusive_group() group.add_argument("--channel", type=int, help="Channel to lock monitor interface to") group.add_argument("--channel-hop", action="store_true", help="Enable channel hopping") args = parser.parse_args() reset_interface(args.monitor_iface) if not wait_for_interface_up(args.monitor_iface): print("[!] Interface failed to become available. Exiting.") sys.exit(1) print(f"[+] Starting passive observer.") print(f" Main interface: {args.main_iface}") print(f" Monitor interface: {args.monitor_iface}") print(f" Output file: {args.outfile}") if args.channel: print(f" Override channel: {args.channel}") global target_ap_bssid global current_channel current_channel = args.channel or get_current_channel(args.main_iface) if current_channel: set_monitor_channel(args.monitor_iface, current_channel) else: print("[!] Could not determine current channel. Exiting.") sys.exit(1) if not args.channel: target_ap_bssid = get_connected_bssid(args.main_iface) if not target_ap_bssid: print("[!] Could not determine connected BSSID. ClientsOnAP will be 0.") else: print(f"[+] Connected BSSID (target AP): {target_ap_bssid}") else: target_ap_bssid = None # Can't determine AP if we're not associated print("[+] Sniffing... (waiting for SIGINT to stop)") if args.channel_hop: hopper_thread = threading.Thread(target=channel_hopper, args=(args.monitor_iface,args.hop_interval)) hopper_thread.daemon = True hopper_thread.start() print("LISTENING_STARTED", flush=True) while running: sniff(iface=args.monitor_iface, prn=handle_packet, store=False, timeout=5) write_csv(args.outfile) print("\n[+] Final APs counted on this channel:") for bssid in sorted(aps): ssid = ssid_map.get(bssid, "") ch = bssid_channels.get(bssid, "?") print(f" - {bssid} (SSID: {ssid}, Channel: {ch})") print(f"[+] Total APsOnChannel: {len(aps)}") print_suspect_aps() print("\n[+] Vendor Summary (known APs):") for bssid in sorted(aps): vendor = get_mac_vendor(bssid) print(f" {bssid} → {vendor}") reset_interface(args.monitor_iface) def get_mac_vendor(mac): prefix = mac.upper()[0:8].replace(":", "-") if prefix in vendor_cache: return vendor_cache[prefix] try: if not os.path.exists("oui.txt"): print("[~] Downloading IEEE OUI list...") import urllib.request url = "http://standards-oui.ieee.org/oui/oui.txt" urllib.request.urlretrieve(url, "oui.txt") with open("oui.txt", "r", encoding="utf-8", errors="ignore") as f: for line in f: if "(hex)" in line and prefix in line: parts = line.strip().split() if len(parts) >= 3 and parts[0] == prefix and parts[1] == "(hex)": vendor = " ".join(parts[2:]).strip() vendor_cache[prefix] = vendor return vendor except Exception as e: print(f"[ERROR] Vendor lookup failed: {e}") vendor_cache[prefix] = "Unknown" return "Unknown" if __name__ == "__main__": main()