#!/usr/bin/env python3 import argparse import csv import pyshark from statistics import mean from collections import defaultdict from enrichment.utils import get_channel_from_freq, convert_timestamp_to_epoch from enrichment.filters import filter_by_time from enrichment.metrics_clients import get_clients_on_ap, get_clients_on_channel from enrichment.metrics_signals import ( get_aps_on_channel, calculate_signal_strength_stats, get_unlinked_devices ) from enrichment.metrics_ssid import extract_ssid_metrics from enrichment.csv_handler import ( read_csv_input, write_enriched_csv, write_ssid_sidecar ) from enrichment.merge_ssid_summaries import merge_ssid_summaries import time def parse_args(): parser = argparse.ArgumentParser() parser.add_argument('--csv', required=True, help='Input speedtest CSV') parser.add_argument('--pcapng', required=True, help='Kismet-generated .pcapng file') parser.add_argument('--output', required=True, help='Output enriched CSV') return parser.parse_args() def analyze_pcap(pcapng_path, start_ts, end_ts, ap_bssid, ap_channel): display_filter = ( "(wlan.fx.type_subtype == 8 || wlan.fx.type_subtype == 5 || wlan.fc.type == 2) && " "(wlan.bssid || wlan.sa || wlan.da)" ) cap = pyshark.FileCapture( pcapng_path, use_json=True, include_raw=False, keep_packets=False, display_filter=display_filter ) ap_channel = int(ap_channel) clients_on_ap = 0 clients_on_channel = 0 aps_on_channel = 0 avg_ap_signal = 0 max_ap_signal = 0 unlinked_devices = 0 ssid_clients = defaultdict(set) ssid_hidden_status = {} ssid_encryption_status = {} cisco_ssid_clients = defaultdict(list) ssid_signals = defaultdict(list) ssid_to_bssids = defaultdict(set) bssid_to_ssid = {} cisco_reported_clients = [] ssid_packet_counts = defaultdict(int) try: filtered_packets = filter_by_time(cap, start_ts, end_ts) ( bssid_to_ssid, ssid_to_bssids, ssid_hidden_status, ssid_encryption_status, ssid_signals, cisco_ssid_clients, cisco_reported_clients, ssid_packet_counts, ssid_clients ) = extract_ssid_metrics(filtered_packets) our_ssid = bssid_to_ssid.get(ap_bssid, None) clients_on_ap = get_clients_on_ap(filtered_packets, ap_bssid) clients_on_channel = get_clients_on_channel(filtered_packets, ap_channel, ap_bssid) aps_on_channel = get_aps_on_channel(filtered_packets, ap_channel) avg_ap_signal, max_ap_signal = calculate_signal_strength_stats(filtered_packets, ap_channel) unlinked_devices = get_unlinked_devices(filtered_packets, ap_channel) cisco_avg_reported_clients = mean(cisco_reported_clients) if cisco_reported_clients else 0 cisco_max_reported_clients = max(cisco_reported_clients) if cisco_reported_clients else 0 num_bssids = len(ssid_to_bssids[our_ssid]) if our_ssid in ssid_to_bssids else 0 average_signal = mean(ssid_signals[our_ssid]) if our_ssid in ssid_signals else 0 max_ssid_signal = max(ssid_signals[our_ssid]) if our_ssid in ssid_signals else 0 num_channels_ssid = len(ssid_to_bssids[our_ssid]) if our_ssid in ssid_to_bssids else 0 # Generate SSID summary sidecar ssid_summary = [] for ssid, bssids in ssid_to_bssids.items(): signals = ssid_signals.get(ssid, []) ssid_summary.append({ 'SSID': ssid, 'Hidden': ssid_hidden_status.get(ssid, False), 'Open': ssid_encryption_status.get(ssid, True), 'BSSID_Count': len(bssids), 'BSSIDs': ";".join(sorted(bssids)), 'Avg_Signal': mean(signals) if signals else 0, 'Max_Signal': max(signals) if signals else 0, 'Min_Signal': min(signals) if signals else 0, 'Clients_Seen': len(ssid_clients.get(ssid, [])), 'CiscoAvgClients': round(mean(cisco_ssid_clients[ssid]), 2) if ssid in cisco_ssid_clients else 0, 'CiscoMaxClients': max(cisco_ssid_clients[ssid]) if ssid in cisco_ssid_clients else 0, 'PacketCount': ssid_packet_counts[ssid] }) finally: packet_count = len(filtered_packets) cap.close() return ( clients_on_ap, clients_on_channel, aps_on_channel, avg_ap_signal, max_ap_signal, unlinked_devices, cisco_avg_reported_clients, cisco_max_reported_clients, num_bssids, average_signal, max_ssid_signal, num_channels_ssid, ssid_summary, packet_count ) def main(): total_start_time = time.perf_counter() args = parse_args() cap = pyshark.FileCapture( args.pcapng, use_json=True, include_raw=False, keep_packets=False ) # Checking if the pcapng file is valid count = 0 try: for packet in cap: count += 1 if count > 0: break except Exception as e: print(f"[!] Error reading pcapng file: {e}") return finally: cap.close() rows, original_fields = read_csv_input(args.csv) fieldnames = original_fields + [ 'ClientsOnAP', 'ClientsOnChannel', 'APsOnChannel', 'AvgAPSignal', 'StrongestAPSignal', 'UnlinkedDevices', 'CiscoAvgReportedClients', 'CiscoMaxReportedClients', 'NumberofBSSIDsOnSSID', 'AvgSSIDSignal', 'MaxSSIDSignal', 'NumberofChannelsOnSSID', 'PacketCount' ] enriched_rows = [] ssid_summary = None all_ssid_summaries = [] for row in rows: tstart = convert_timestamp_to_epoch(row.get("StartTimestamp")) tend = convert_timestamp_to_epoch(row.get("EndTimestamp")) ap_bssid = row.get("BSSID", "").strip().lower() ap_channel = row.get("Channel") if not tstart or not tend: enriched_rows.append(row) continue start_time = time.perf_counter() result = analyze_pcap(args.pcapng, tstart, tend, ap_bssid, ap_channel) ( clients_ap, clients_chan, aps_chan, avg_signal, strongest_signal, unlinked, cisco_avg_reported_clients, cisco_max_reported_clients, num_bssids, average_signal, max_ssid_signal, num_channels_ssid, ssid_summary, packet_count ) = result elapsed_time = time.perf_counter() - start_time print(f"[+] Analyzed {ap_bssid} in {elapsed_time:.2f} seconds") row.update({ 'ClientsOnAP': clients_ap, 'ClientsOnChannel': clients_chan, 'APsOnChannel': aps_chan, 'AvgAPSignal': avg_signal, 'StrongestAPSignal': strongest_signal, 'UnlinkedDevices': unlinked, 'CiscoAvgReportedClients': cisco_avg_reported_clients, 'CiscoMaxReportedClients': cisco_max_reported_clients, 'NumberofBSSIDsOnSSID': num_bssids, 'AvgSSIDSignal': average_signal, 'MaxSSIDSignal': max_ssid_signal, 'NumberofChannelsOnSSID': num_channels_ssid, 'PacketCount': packet_count }) enriched_rows.append(row) ssid_summary = result[-2] all_ssid_summaries.append(ssid_summary) write_enriched_csv(args.output, fieldnames, enriched_rows) merged_ssid_summary = merge_ssid_summaries(all_ssid_summaries) write_ssid_sidecar(args.output, merged_ssid_summary) print(f"[+] Enrichment complete: {args.output}") total_elapsed_time = time.perf_counter() - total_start_time print(f"[+] Total time taken: {total_elapsed_time:.2f} seconds") if __name__ == "__main__": main()