#!/usr/bin/env python3 import argparse import csv from datetime import datetime import pyshark def parse_args(): parser = argparse.ArgumentParser() parser.add_argument('--csv', required=True, help='Input speedtest CSV') parser.add_argument('--pcapng', required=True, help='Kismet-generated .pcapng file') parser.add_argument('--output', required=True, help='Output enriched CSV') return parser.parse_args() def convert_timestamp_to_epoch(ts_string): try: return int(datetime.fromisoformat(ts_string.replace("Z", "+00:00")).timestamp()) except Exception as e: print(f"[!] Failed to parse timestamp: {ts_string}") return None def analyze_pcap(pcapng_path, start_ts, end_ts): # Placeholder: Logic will be added for: # - ClientsOnAP # - ClientsOnChannel # - APsOnChannel # - CongestionScore # - AvgAPSignal # - StrongestAPSignal # - UnlinkedDevices return 0, 0, 0, None, None, None, 0 def main(): args = parse_args() with open(args.csv, newline='') as infile, open(args.output, 'w', newline='', encoding='utf-8') as outfile: reader = csv.DictReader(infile) fieldnames = reader.fieldnames + [ 'ClientsOnAP', 'ClientsOnChannel', 'APsOnChannel', 'CongestionScore', 'AvgAPSignal', 'StrongestAPSignal', 'UnlinkedDevices' ] writer = csv.DictWriter(outfile, fieldnames=fieldnames) writer.writeheader() for row in reader: tstart = convert_timestamp_to_epoch(row.get("StartTimestamp")) tend = convert_timestamp_to_epoch(row.get("EndTimestamp")) if not tstart or not tend: writer.writerow(row) continue clients_ap, clients_chan, aps_chan, congestion, avg_signal, strongest_signal, unlinked = analyze_pcap(args.pcapng, tstart, tend) row.update({ 'ClientsOnAP': clients_ap, 'ClientsOnChannel': clients_chan, 'APsOnChannel': aps_chan, 'CongestionScore': congestion, 'AvgAPSignal': avg_signal, 'StrongestAPSignal': strongest_signal, 'UnlinkedDevices': unlinked }) writer.writerow(row) print(f"[+] Enrichment complete: {args.output}") if __name__ == "__main__": main()