#!/usr/bin/env python3 import argparse import csv from datetime import datetime import pyshark # United States regulatory domain channel lookup table CHANNEL_LOOKUP_TABLE = { # 2.4 GHz (non-DFS, always allowed) 1: {"freq": 2412, "dfs": False, "band": "2.4GHz"}, 2: {"freq": 2417, "dfs": False, "band": "2.4GHz"}, 3: {"freq": 2422, "dfs": False, "band": "2.4GHz"}, 4: {"freq": 2427, "dfs": False, "band": "2.4GHz"}, 5: {"freq": 2432, "dfs": False, "band": "2.4GHz"}, 6: {"freq": 2437, "dfs": False, "band": "2.4GHz"}, 7: {"freq": 2442, "dfs": False, "band": "2.4GHz"}, 8: {"freq": 2447, "dfs": False, "band": "2.4GHz"}, 9: {"freq": 2452, "dfs": False, "band": "2.4GHz"}, 10: {"freq": 2457, "dfs": False, "band": "2.4GHz"}, 11: {"freq": 2462, "dfs": False, "band": "2.4GHz"}, # 5 GHz UNII-1 (indoor only) 36: {"freq": 5180, "dfs": False, "band": "UNII-1"}, 40: {"freq": 5200, "dfs": False, "band": "UNII-1"}, 44: {"freq": 5220, "dfs": False, "band": "UNII-1"}, 48: {"freq": 5240, "dfs": False, "band": "UNII-1"}, # 5 GHz UNII-2 (DFS required) 52: {"freq": 5260, "dfs": True, "band": "UNII-2"}, 56: {"freq": 5280, "dfs": True, "band": "UNII-2"}, 60: {"freq": 5300, "dfs": True, "band": "UNII-2"}, 64: {"freq": 5320, "dfs": True, "band": "UNII-2"}, # 5 GHz UNII-2e (DFS required) 100: {"freq": 5500, "dfs": True, "band": "UNII-2e"}, 104: {"freq": 5520, "dfs": True, "band": "UNII-2e"}, 108: {"freq": 5540, "dfs": True, "band": "UNII-2e"}, 112: {"freq": 5560, "dfs": True, "band": "UNII-2e"}, 116: {"freq": 5580, "dfs": True, "band": "UNII-2e"}, 120: {"freq": 5600, "dfs": True, "band": "UNII-2e"}, 124: {"freq": 5620, "dfs": True, "band": "UNII-2e"}, 128: {"freq": 5640, "dfs": True, "band": "UNII-2e"}, 132: {"freq": 5660, "dfs": True, "band": "UNII-2e"}, 136: {"freq": 5680, "dfs": True, "band": "UNII-2e"}, 140: {"freq": 5700, "dfs": True, "band": "UNII-2e"}, # 5 GHz UNII-3 (outdoor/indoor, no DFS) 149: {"freq": 5745, "dfs": False, "band": "UNII-3"}, 153: {"freq": 5765, "dfs": False, "band": "UNII-3"}, 157: {"freq": 5785, "dfs": False, "band": "UNII-3"}, 161: {"freq": 5805, "dfs": False, "band": "UNII-3"}, 165: {"freq": 5825, "dfs": False, "band": "UNII-3"}, } FREQ_LOOKUP_TABLE = {v["freq"]: ch for ch, v in CHANNEL_LOOKUP_TABLE.items()} def get_channel_from_freq(freq): return FREQ_LOOKUP_TABLE.get(freq, None) def get_freq_details(channel): return CHANNEL_LOOKUP_TABLE.get(channel, None) def parse_args(): parser = argparse.ArgumentParser() parser.add_argument('--csv', required=True, help='Input speedtest CSV') parser.add_argument('--pcapng', required=True, help='Kismet-generated .pcapng file') parser.add_argument('--output', required=True, help='Output enriched CSV') return parser.parse_args() def convert_timestamp_to_epoch(ts_string): try: return int(datetime.fromisoformat(ts_string.replace("Z", "+00:00")).timestamp()) except Exception as e: print(f"[!] Failed to parse timestamp: {ts_string}") return None def get_clients_on_ap(capture, ap_bssid): clients = set() ap_bssid = ap_bssid.lower() # Normalize for comparison for packet in capture: try: if not hasattr(packet, 'wlan'): continue sa = getattr(packet.wlan, 'sa', '').lower() da = getattr(packet.wlan, 'da', '').lower() bssid = getattr(packet.wlan, 'bssid', '').lower() # Count any frame *to or from* a client, if AP is involved if bssid == ap_bssid or sa == ap_bssid or da == ap_bssid: # If it's the AP sending, add the destination (client) if sa == ap_bssid and da and da != ap_bssid: clients.add(da) # If it's the client sending, add the source elif sa and sa != ap_bssid: clients.add(sa) except AttributeError: continue return len(clients) def get_clients_on_channel(capture, ap_channel, ap_bssid): try: ap_channel = int(ap_channel) except ValueError: print(f"[!] Could not parse channel number: {ap_channel}") return 0 clients = set() for packet in capture: try: if 'radiotap' not in packet or 'wlan' not in packet: continue radio = packet.radiotap wlan = packet.wlan if not hasattr(radio, 'channel') or not hasattr(radio.channel, 'freq'): continue packet_freq = int(radio.channel.freq) packet_channel = get_channel_from_freq(packet_freq) # For debugging purposes, print the channel and frequency # print(f"Packet Channel: {packet_channel}, Frequency: {packet_freq} MHz") if packet_channel != ap_channel: continue sa = getattr(wlan, 'sa', '').lower() da = getattr(wlan, 'da', '').lower() for mac in (sa, da): if mac and mac != 'ff:ff:ff:ff:ff:ff' and mac != ap_bssid: clients.add(mac) except AttributeError: continue except Exception as e: print(f"[!] Error parsing packet: {e}") continue return len(clients) def get_aps_on_channel(capture, ap_channel): try: ap_channel = int(ap_channel) except ValueError: print(f"[!] Could not parse channel number: {ap_channel}") return 0 aps = set() for packet in capture: try: if not hasattr(packet, 'radiotap') or not hasattr(packet, 'wlan'): continue packet_freq = getattr(packet.radiotap.channel, 'freq', None) if packet_freq is None: print("[DEBUG] No channel frequency found.") continue packet_freq = int(packet_freq) packet_channel = get_channel_from_freq(packet_freq) if packet_channel != ap_channel: print(f"[DEBUG] Skipped packet on channel {packet_channel}, looking for {ap_channel}") continue fc_raw = getattr(packet.wlan, 'fc', None) if not fc_raw: print("[DEBUG] No FC field found.") continue fc_int = int(fc_raw, 16) frame_type = (fc_int >> 2) & 0b11 subtype = (fc_int >> 4) & 0b1111 print(f"[DEBUG] FC: {fc_raw}, frame_type: {frame_type}, subtype: {subtype}") if frame_type != 0 or subtype not in (5, 8): print("[DEBUG] Not a Beacon or Probe Response.") continue bssid = getattr(packet.wlan, 'bssid', '').lower() if bssid: print(f"[DEBUG] Adding BSSID: {bssid}") aps.add(bssid) else: print("[DEBUG] No BSSID found.") except Exception as e: print(f"[DEBUG] Failed packet: {e}") continue print(f"[DEBUG] Final AP count: {len(aps)} | APs: {sorted(aps)}") return len(aps) def analyze_pcap(pcapng_path, start_ts, end_ts, ap_bssid, ap_channel): cap = pyshark.FileCapture( pcapng_path, use_json=True, include_raw=False ) ap_channel = int(ap_channel) clients_on_ap = 0 clients_on_channel = 0 try: # Filter packets manually by timestamp filtered_packets = [] for packet in cap: try: frame_time = float(packet.frame_info.time_epoch) if start_ts <= frame_time <= end_ts: filtered_packets.append(packet) except Exception: continue clients_on_ap = get_clients_on_ap(filtered_packets, ap_bssid) clients_on_channel = get_clients_on_channel(filtered_packets, ap_channel, ap_bssid) aps_on_channel = get_aps_on_channel(filtered_packets, ap_channel) # Placeholder: Logic will be added for: # - CongestionScore # - AvgAPSignal # - StrongestAPSignal # - UnlinkedDevices finally: cap.close() return clients_on_ap, clients_on_channel, aps_on_channel, None, None, None, 0 def main(): args = parse_args() cap = pyshark.FileCapture( args.pcapng, use_json=True, include_raw=False, keep_packets=False ) # Checking if the pcapng file is valid count = 0 try: for packet in cap: count += 1 if count > 0: break except Exception as e: print(f"[!] Error reading pcapng file: {e}") return finally: cap.close() with open(args.csv, newline='') as infile, open(args.output, 'w', newline='', encoding='utf-8') as outfile: reader = csv.DictReader(infile) fieldnames = reader.fieldnames + [ 'ClientsOnAP', 'ClientsOnChannel', 'APsOnChannel', 'CongestionScore', 'AvgAPSignal', 'StrongestAPSignal', 'UnlinkedDevices' ] writer = csv.DictWriter(outfile, fieldnames=fieldnames) writer.writeheader() for row in reader: tstart = convert_timestamp_to_epoch(row.get("StartTimestamp")) tend = convert_timestamp_to_epoch(row.get("EndTimestamp")) ap_bssid = row.get("BSSID", "").strip().lower() ap_channel = row.get("Channel") if not tstart or not tend: writer.writerow(row) continue clients_ap, clients_chan, aps_chan, congestion, avg_signal, strongest_signal, unlinked = analyze_pcap(args.pcapng, tstart, tend, ap_bssid, ap_channel) row.update({ 'ClientsOnAP': clients_ap, 'ClientsOnChannel': clients_chan, 'APsOnChannel': aps_chan, 'CongestionScore': congestion, 'AvgAPSignal': avg_signal, 'StrongestAPSignal': strongest_signal, 'UnlinkedDevices': unlinked }) writer.writerow(row) print(f"[+] Enrichment complete: {args.output}") if __name__ == "__main__": main()