#!/usr/bin/env python3 import argparse import csv from datetime import datetime import pyshark def parse_args(): parser = argparse.ArgumentParser() parser.add_argument('--csv', required=True, help='Input speedtest CSV') parser.add_argument('--pcapng', required=True, help='Kismet-generated .pcapng file') parser.add_argument('--output', required=True, help='Output enriched CSV') return parser.parse_args() def convert_timestamp_to_epoch(ts_string): try: return int(datetime.fromisoformat(ts_string.replace("Z", "+00:00")).timestamp()) except Exception as e: print(f"[!] Failed to parse timestamp: {ts_string}") return None def get_clients_on_ap(capture, ap_bssid): clients = set() for packet in capture: try: if not hasattr(packet, 'wlan'): continue sa = packet.wlan.sa da = getattr(packet.wlan, 'da', None) bssid = getattr(packet.wlan, 'bssid', None) # Skip if AP is the sender if sa == ap_bssid: continue # Count if the AP is the target or part of the BSSID context if da == ap_bssid or bssid == ap_bssid: clients.add(sa) except AttributeError: continue return len(clients) def analyze_pcap(pcapng_path, start_ts, end_ts, ap_bssid, ap_channel): cap = pyshark.FileCapture( pcapng_path, display_filter=f'time >= {start_ts} && time <= {end_ts}', use_json=True, include_raw=False ) clients_on_ap = get_clients_on_ap(cap, ap_bssid.lower()) # Placeholder: Logic will be added for: # - ClientsOnChannel # - APsOnChannel # - CongestionScore # - AvgAPSignal # - StrongestAPSignal # - UnlinkedDevices cap.close() return clients_on_ap, 0, 0, None, None, None, 0 def main(): args = parse_args() cap = pyshark.FileCapture( args.pcapng, use_json=True, include_raw=False, keep_packets=False ) # Checking if the pcapng file is valid count = 0 try: for packet in cap: count += 1 if count > 0: break except Exception as e: print(f"[!] Error reading pcapng file: {e}") return finally: cap.close() with open(args.csv, newline='') as infile, open(args.output, 'w', newline='', encoding='utf-8') as outfile: reader = csv.DictReader(infile) fieldnames = reader.fieldnames + [ 'ClientsOnAP', 'ClientsOnChannel', 'APsOnChannel', 'CongestionScore', 'AvgAPSignal', 'StrongestAPSignal', 'UnlinkedDevices' ] writer = csv.DictWriter(outfile, fieldnames=fieldnames) writer.writeheader() for row in reader: tstart = convert_timestamp_to_epoch(row.get("StartTimestamp")) tend = convert_timestamp_to_epoch(row.get("EndTimestamp")) ap_bssid = row.get("BSSID", "").strip().lower() ap_channel = row.get("Channel") if not tstart or not tend: writer.writerow(row) continue clients_ap, clients_chan, aps_chan, congestion, avg_signal, strongest_signal, unlinked = analyze_pcap(args.pcapng, tstart, tend, ap_bssid, ap_channel) row.update({ 'ClientsOnAP': clients_ap, 'ClientsOnChannel': clients_chan, 'APsOnChannel': aps_chan, 'CongestionScore': congestion, 'AvgAPSignal': avg_signal, 'StrongestAPSignal': strongest_signal, 'UnlinkedDevices': unlinked }) writer.writerow(row) print(f"[+] Enrichment complete: {args.output}") if __name__ == "__main__": main()