#!/usr/bin/env python3 import argparse import csv from datetime import datetime import pyshark from statistics import mean from collections import defaultdict # United States regulatory domain channel lookup table CHANNEL_LOOKUP_TABLE = { # 2.4 GHz (non-DFS, always allowed) 1: {"freq": 2412, "dfs": False, "band": "2.4GHz"}, 2: {"freq": 2417, "dfs": False, "band": "2.4GHz"}, 3: {"freq": 2422, "dfs": False, "band": "2.4GHz"}, 4: {"freq": 2427, "dfs": False, "band": "2.4GHz"}, 5: {"freq": 2432, "dfs": False, "band": "2.4GHz"}, 6: {"freq": 2437, "dfs": False, "band": "2.4GHz"}, 7: {"freq": 2442, "dfs": False, "band": "2.4GHz"}, 8: {"freq": 2447, "dfs": False, "band": "2.4GHz"}, 9: {"freq": 2452, "dfs": False, "band": "2.4GHz"}, 10: {"freq": 2457, "dfs": False, "band": "2.4GHz"}, 11: {"freq": 2462, "dfs": False, "band": "2.4GHz"}, # 5 GHz UNII-1 (indoor only) 36: {"freq": 5180, "dfs": False, "band": "UNII-1"}, 40: {"freq": 5200, "dfs": False, "band": "UNII-1"}, 44: {"freq": 5220, "dfs": False, "band": "UNII-1"}, 48: {"freq": 5240, "dfs": False, "band": "UNII-1"}, # 5 GHz UNII-2 (DFS required) 52: {"freq": 5260, "dfs": True, "band": "UNII-2"}, 56: {"freq": 5280, "dfs": True, "band": "UNII-2"}, 60: {"freq": 5300, "dfs": True, "band": "UNII-2"}, 64: {"freq": 5320, "dfs": True, "band": "UNII-2"}, # 5 GHz UNII-2e (DFS required) 100: {"freq": 5500, "dfs": True, "band": "UNII-2e"}, 104: {"freq": 5520, "dfs": True, "band": "UNII-2e"}, 108: {"freq": 5540, "dfs": True, "band": "UNII-2e"}, 112: {"freq": 5560, "dfs": True, "band": "UNII-2e"}, 116: {"freq": 5580, "dfs": True, "band": "UNII-2e"}, 120: {"freq": 5600, "dfs": True, "band": "UNII-2e"}, 124: {"freq": 5620, "dfs": True, "band": "UNII-2e"}, 128: {"freq": 5640, "dfs": True, "band": "UNII-2e"}, 132: {"freq": 5660, "dfs": True, "band": "UNII-2e"}, 136: {"freq": 5680, "dfs": True, "band": "UNII-2e"}, 140: {"freq": 5700, "dfs": True, "band": "UNII-2e"}, # 5 GHz UNII-3 (outdoor/indoor, no DFS) 149: {"freq": 5745, "dfs": False, "band": "UNII-3"}, 153: {"freq": 5765, "dfs": False, "band": "UNII-3"}, 157: {"freq": 5785, "dfs": False, "band": "UNII-3"}, 161: {"freq": 5805, "dfs": False, "band": "UNII-3"}, 165: {"freq": 5825, "dfs": False, "band": "UNII-3"}, } FREQ_LOOKUP_TABLE = {v["freq"]: ch for ch, v in CHANNEL_LOOKUP_TABLE.items()} def get_channel_from_freq(freq): return FREQ_LOOKUP_TABLE.get(freq, None) def get_freq_details(channel): return CHANNEL_LOOKUP_TABLE.get(channel, None) def get_aps(capture, ap_channel): try: ap_channel = int(ap_channel) except ValueError: print(f"[!] Could not parse channel number: {ap_channel}") return 0 aps = set() ghost_clients = set() for packet in capture: try: if 'radiotap' not in packet or 'wlan' not in packet: continue radio = packet.radiotap wlan = packet.wlan if not hasattr(radio, 'channel') or not hasattr(radio.channel, 'freq'): continue packet_freq = int(radio.channel.freq) packet_channel = get_channel_from_freq(packet_freq) # For debugging purposes, print the channel and frequency # print(f"Packet Channel: {packet_channel}, Frequency: {packet_freq} MHz") if packet_channel != ap_channel: continue # Check for beacon or probe response ts_hex = getattr(wlan, 'type_subtype', None) if ts_hex is None: continue ts = int(ts_hex, 16) if ts not in (5, 8): # Probe Response or Beacon continue # Grab BSSID bssid = getattr(wlan, 'bssid', '').lower() if bssid and bssid != 'ff:ff:ff:ff:ff:ff': aps.add(bssid) except Exception as e: print(f"[DEBUG] Packet parse error: {e}") continue return aps def parse_args(): parser = argparse.ArgumentParser() parser.add_argument('--csv', required=True, help='Input speedtest CSV') parser.add_argument('--pcapng', required=True, help='Kismet-generated .pcapng file') parser.add_argument('--output', required=True, help='Output enriched CSV') return parser.parse_args() def convert_timestamp_to_epoch(ts_string): try: return int(datetime.fromisoformat(ts_string.replace("Z", "+00:00")).timestamp()) except Exception as e: print(f"[!] Failed to parse timestamp: {ts_string}") return None def get_clients_on_ap(capture, ap_bssid): clients = defaultdict(int) ap_bssid = ap_bssid.lower() for packet in capture: try: if not hasattr(packet, 'wlan'): continue sa = getattr(packet.wlan, 'sa', '').lower() da = getattr(packet.wlan, 'da', '').lower() bssid = getattr(packet.wlan, 'bssid', '').lower() # Count any frame *to or from* a client, if AP is involved if bssid == ap_bssid or sa == ap_bssid or da == ap_bssid: # If it's the AP sending, add the destination (client) if sa == ap_bssid and da and da != ap_bssid and not da.startswith("ff:ff:ff"): clients[da] += 1 # If it's the client sending, add the source elif sa and sa != ap_bssid and not sa.startswith("ff:ff:ff"): clients[sa] += 1 except AttributeError: continue # Only count clients that show up more than 3 times — tweak as needed stable_clients = [mac for mac, count in clients.items() if count > 3] return len(stable_clients) def get_clients_on_channel(capture, ap_channel, ap_bssid): try: ap_channel = int(ap_channel) except ValueError: print(f"[!] Could not parse channel number: {ap_channel}") return 0 clients = set() for packet in capture: try: if 'radiotap' not in packet or 'wlan' not in packet: continue radio = packet.radiotap wlan = packet.wlan if not hasattr(radio, 'channel') or not hasattr(radio.channel, 'freq'): continue packet_freq = int(radio.channel.freq) packet_channel = get_channel_from_freq(packet_freq) # For debugging purposes, print the channel and frequency # print(f"Packet Channel: {packet_channel}, Frequency: {packet_freq} MHz") if packet_channel != ap_channel: continue sa = getattr(wlan, 'sa', '').lower() da = getattr(wlan, 'da', '').lower() for mac in (sa, da): if mac and mac != 'ff:ff:ff:ff:ff:ff' and mac != ap_bssid: clients.add(mac) except AttributeError: continue except Exception as e: print(f"[!] Error parsing packet: {e}") continue return len(clients) def get_aps_on_channel(capture, ap_channel): return len(get_aps(capture, ap_channel)) def calculate_signal_strength_stats(capture, ap_channel): try: ap_channel = int(ap_channel) except ValueError: print(f"[!] Could not parse channel number: {ap_channel}") return 0 ap_signals = [] for packet in capture: try: if 'radiotap' not in packet or 'wlan' not in packet: continue radio = packet.radiotap wlan = packet.wlan if not hasattr(radio, 'channel') or not hasattr(radio.channel, 'freq'): continue packet_freq = int(radio.channel.freq) packet_channel = get_channel_from_freq(packet_freq) if packet_channel != ap_channel: continue # Check for beacon or probe response ts_hex = getattr(wlan, 'type_subtype', None) if ts_hex is None: continue ts = int(ts_hex, 16) if ts not in (5, 8): # Probe Response or Beacon continue # Get signal strength signal_strength = getattr(radio, 'dbm_antsignal', None) if signal_strength is not None: ap_signals.append(int(signal_strength)) except Exception as e: print(f"[DEBUG] Signal strength parse error: {e}") continue if ap_signals: return mean(ap_signals), max(ap_signals) else: return 0, 0 def get_unlinked_devices(capture, ap_channel): aps = get_aps(capture, ap_channel) ghost_clients = set() for packet in capture: if 'radiotap' not in packet or 'wlan' not in packet: continue radio = packet.radiotap wlan = packet.wlan if not hasattr(radio, 'channel') or not hasattr(radio.channel, 'freq'): continue packet_freq = int(radio.channel.freq) packet_channel = get_channel_from_freq(packet_freq) if packet_channel != ap_channel: continue for mac in (getattr(wlan, 'sa', ''), getattr(wlan, 'da', '')): mac = mac.lower() if ( mac and mac != 'ff:ff:ff:ff:ff:ff' and mac not in aps ): ghost_clients.add(mac) return len(ghost_clients) def analyze_pcap(pcapng_path, start_ts, end_ts, ap_bssid, ap_channel): cap = pyshark.FileCapture( pcapng_path, use_json=True, include_raw=False ) ap_channel = int(ap_channel) clients_on_ap = 0 clients_on_channel = 0 aps_on_channel = 0 avg_ap_signal = 0 max_ap_signal = 0 unlinked_devices = 0 ssid_clients = defaultdict(set) ssid_hidden_status = {} cisco_ssid_clients = defaultdict(list) ssid_signals = defaultdict(list) ssid_to_bssids = defaultdict(set) bssid_to_ssid = {} cisco_reported_clients = [] try: # Filter packets manually by timestamp filtered_packets = [] for packet in cap: try: frame_time = float(packet.frame_info.time_epoch) if start_ts <= frame_time <= end_ts: filtered_packets.append(packet) except Exception: continue for packet in filtered_packets: try: if 'radiotap' not in packet or 'wlan' not in packet: continue radio = packet.radiotap wlan = packet.wlan if not hasattr(radio.channel, 'freq'): continue packet_freq = int(radio.channel.freq) packet_channel = get_channel_from_freq(packet_freq) subtype = int(getattr(wlan, 'type_subtype', 0), 16) if subtype not in (5, 8): # Probe Response or Beacon continue # Grab management layer once try: mgt = packet.get_multiple_layers('wlan.mgt')[0] tags = mgt._all_fields.get('wlan.tagged.all', {}).get('wlan.tag', []) except Exception as e: continue ssid = None hidden_ssid = False for tag in tags: tag_number = tag.get('wlan.tag.number') if tag_number == '0': raw_ssid = tag.get('wlan.ssid', '') if not raw_ssid: hidden_ssid = True ssid = '' else: try: ssid_bytes = bytes.fromhex(raw_ssid.replace(':', '')) ssid = ssid_bytes.decode('utf-8', errors='replace') except Exception: ssid = None if tag_number == '133': try: num_clients = int(tag.get('wlan.cisco.ccx1.clients')) if ssid: cisco_ssid_clients[ssid].append(num_clients) except (TypeError, ValueError): pass if ssid: ssid_hidden_status[ssid] = hidden_ssid # Cisco Client Count if tag_number == '133': try: num_clients = int(tag.get('wlan.cisco.ccx1.clients')) cisco_reported_clients.append(num_clients) except (TypeError, ValueError): pass if not ssid: continue bssid = getattr(wlan, 'bssid', '').lower() if not bssid or bssid == 'ff:ff:ff:ff:ff:ff': continue bssid_to_ssid[bssid] = ssid ssid_to_bssids[ssid].add(bssid) signal = getattr(radio, 'dbm_antsignal', None) if signal: ssid_signals[ssid].append(int(signal)) except Exception as e: continue our_ssid = bssid_to_ssid.get(ap_bssid, None) clients_on_ap = get_clients_on_ap(filtered_packets, ap_bssid) clients_on_channel = get_clients_on_channel(filtered_packets, ap_channel, ap_bssid) aps_on_channel = get_aps_on_channel(filtered_packets, ap_channel) avg_ap_signal, max_ap_signal = calculate_signal_strength_stats(filtered_packets, ap_channel) unlinked_devices = get_unlinked_devices(filtered_packets, ap_channel) cisco_avg_reported_clients = mean(cisco_reported_clients) if cisco_reported_clients else 0 cisco_max_reported_clients = max(cisco_reported_clients) if cisco_reported_clients else 0 num_bssids = len(ssid_to_bssids[our_ssid]) if our_ssid in ssid_to_bssids else 0 average_signal = mean(ssid_signals[our_ssid]) if our_ssid in ssid_signals else 0 max_ssid_signal = max(ssid_signals[our_ssid]) if our_ssid in ssid_signals else 0 num_channels_ssid = len(ssid_to_bssids[our_ssid]) if our_ssid in ssid_to_bssids else 0 # Generate SSID summary sidecar ssid_summary = [] for ssid, bssids in ssid_to_bssids.items(): signals = ssid_signals.get(ssid, []) ssid_summary.append({ 'SSID': ssid, 'Hidden': ssid_hidden_status.get(ssid, False), 'BSSID_Count': len(bssids), 'Avg_Signal': mean(signals) if signals else 0, 'Max_Signal': max(signals) if signals else 0, 'Min_Signal': min(signals) if signals else 0, 'Clients_Seen': len(ssid_clients.get(ssid, [])), 'CiscoAvgClients': round(mean(cisco_ssid_clients.get(ssid, [])), 2) if cisco_ssid_clients.get(ssid) else 0, 'CiscoMaxClients': max(cisco_ssid_clients.get(ssid, [])) if cisco_ssid_clients.get(ssid) else 0 }) finally: cap.close() return (clients_on_ap, clients_on_channel, aps_on_channel, avg_ap_signal, max_ap_signal, unlinked_devices, cisco_avg_reported_clients, cisco_max_reported_clients, num_bssids, average_signal, max_ssid_signal, num_channels_ssid, ssid_summary) def main(): args = parse_args() cap = pyshark.FileCapture( args.pcapng, use_json=True, include_raw=False, keep_packets=False ) # Checking if the pcapng file is valid count = 0 try: for packet in cap: count += 1 if count > 0: break except Exception as e: print(f"[!] Error reading pcapng file: {e}") return finally: cap.close() with open(args.csv, newline='') as infile, open(args.output, 'w', newline='', encoding='utf-8') as outfile: reader = csv.DictReader(infile) fieldnames = reader.fieldnames + [ 'ClientsOnAP', 'ClientsOnChannel', 'APsOnChannel', 'AvgAPSignal', 'StrongestAPSignal', 'UnlinkedDevices', 'CiscoAvgReportedClients', 'CiscoMaxReportedClients', 'NumberofBSSIDsOnSSID', 'AvgSSIDSignal', 'MaxSSIDSignal', 'NumberofChannelsOnSSID' ] writer = csv.DictWriter(outfile, fieldnames=fieldnames) writer.writeheader() for row in reader: tstart = convert_timestamp_to_epoch(row.get("StartTimestamp")) tend = convert_timestamp_to_epoch(row.get("EndTimestamp")) ap_bssid = row.get("BSSID", "").strip().lower() ap_channel = row.get("Channel") if not tstart or not tend: writer.writerow(row) continue clients_ap, clients_chan, aps_chan, \ avg_signal, strongest_signal, unlinked, \ cisco_avg_reported_clients, cisco_max_reported_clients, num_bssids, \ average_signal, max_ssid_signal, num_channels_ssid, \ ssid_summary = analyze_pcap(args.pcapng, tstart, tend, ap_bssid, ap_channel) row.update({ 'ClientsOnAP': clients_ap, 'ClientsOnChannel': clients_chan, 'APsOnChannel': aps_chan, 'AvgAPSignal': avg_signal, 'StrongestAPSignal': strongest_signal, 'UnlinkedDevices': unlinked, 'CiscoAvgReportedClients': cisco_avg_reported_clients, 'CiscoMaxReportedClients': cisco_max_reported_clients, 'NumberofBSSIDsOnSSID': num_bssids, 'AvgSSIDSignal': average_signal, 'MaxSSIDSignal': max_ssid_signal, 'NumberofChannelsOnSSID': num_channels_ssid }) writer.writerow(row) # Dump SSID metrics sidecar if ssid_summary: ssid_outfile = args.output.replace('.csv+rf.csv', '-ssid-metrics.csv') with open(ssid_outfile, 'w', newline='', encoding='utf-8') as f: fieldnames = [ 'SSID', 'Hidden', 'BSSID_Count', 'Avg_Signal', 'Max_Signal', 'Min_Signal', 'Clients_Seen', 'CiscoAvgClients', 'CiscoMaxClients' ] ssid_writer = csv.DictWriter(f, fieldnames=fieldnames) ssid_writer.writeheader() for row in ssid_summary: ssid_writer.writerow(row) print(f"[+] Enrichment complete: {args.output}") if __name__ == "__main__": main()