Enhance pcap analysis by loading and indexing capture file once, improving efficiency and adding dummy SSID summary generation for future compatibility

This commit is contained in:
Yaro Kasear 2025-04-29 09:43:44 -05:00
parent f9837f01e8
commit deedca8d2f

View file

@ -1,5 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import argparse import argparse
import csv
import pyshark import pyshark
from statistics import mean from statistics import mean
from collections import defaultdict from collections import defaultdict
@ -127,23 +128,26 @@ def main():
total_start_time = time.perf_counter() total_start_time = time.perf_counter()
args = parse_args() args = parse_args()
# Step 1: Build indexed capture ONCE # Step 1: Load and index the capture file once
print(f"[+] Loading and indexing capture file: {args.pcapng}") print(f"[+] Loading and indexing capture file: {args.pcapng}")
indexed_cap = IndexedCapture(args.pcapng) indexed_cap = IndexedCapture(args.pcapng)
print(f"[+] Capture loaded and indexed successfully.")
# Step 2: Process CSV # Step 2: Read input CSV
rows, original_fields = read_csv_input(args.csv) rows, original_fields = read_csv_input(args.csv)
fieldnames = original_fields + [ fieldnames = original_fields + [
'ClientsOnAP', 'ClientsOnChannel', 'APsOnChannel', 'ClientsOnAP', 'ClientsOnChannel', 'APsOnChannel',
'AvgAPSignal', 'StrongestAPSignal', 'UnlinkedDevices', 'AvgAPSignal', 'StrongestAPSignal', 'UnlinkedDevices',
'CiscoAvgReportedClients', 'CiscoMaxReportedClients', 'NumberofBSSIDsOnSSID', 'CiscoAvgReportedClients', 'CiscoMaxReportedClients',
'AvgSSIDSignal', 'MaxSSIDSignal', 'NumberofChannelsOnSSID', 'PacketCount' 'NumberofBSSIDsOnSSID', 'AvgSSIDSignal', 'MaxSSIDSignal',
'NumberofChannelsOnSSID', 'PacketCount'
] ]
enriched_rows = [] enriched_rows = []
all_ssid_summaries = [] all_ssid_summaries = [] # Dummy collection for sidecar compatibility
for row in rows: # Step 3: Enrich each row
for idx, row in enumerate(rows):
tstart = convert_timestamp_to_epoch(row.get("StartTimestamp")) tstart = convert_timestamp_to_epoch(row.get("StartTimestamp"))
tend = convert_timestamp_to_epoch(row.get("EndTimestamp")) tend = convert_timestamp_to_epoch(row.get("EndTimestamp"))
ap_bssid = row.get("BSSID", "").strip().lower() ap_bssid = row.get("BSSID", "").strip().lower()
@ -155,19 +159,17 @@ def main():
start_time = time.perf_counter() start_time = time.perf_counter()
# STEP 3: Query preloaded capture instead of reloading PCAP
result = indexed_cap.query_metrics(tstart, tend, ap_bssid, ap_channel) result = indexed_cap.query_metrics(tstart, tend, ap_bssid, ap_channel)
( (
clients_ap, clients_chan, aps_chan, clients_ap, clients_chan, aps_chan,
avg_signal, strongest_signal, unlinked, avg_signal, strongest_signal, unlinked,
cisco_avg_reported_clients, cisco_max_reported_clients, num_bssids, cisco_avg_reported_clients, cisco_max_reported_clients,
average_signal, max_ssid_signal, num_channels_ssid, num_bssids, average_signal, max_ssid_signal,
packet_count num_channels_ssid, packet_count
) = result ) = result
elapsed_time = time.perf_counter() - start_time elapsed_time = time.perf_counter() - start_time
print(f"[+] Queried {ap_bssid} in {elapsed_time:.2f} seconds") print(f"[+] Row {idx+1}/{len(rows)} ({ap_bssid}): Queried in {elapsed_time:.2f} seconds")
row.update({ row.update({
'ClientsOnAP': clients_ap, 'ClientsOnAP': clients_ap,
@ -184,15 +186,21 @@ def main():
'NumberofChannelsOnSSID': num_channels_ssid, 'NumberofChannelsOnSSID': num_channels_ssid,
'PacketCount': packet_count 'PacketCount': packet_count
}) })
enriched_rows.append(row) enriched_rows.append(row)
# Step 4: Save outputs # Dummy SSID summary for now
ssid_summary = []
all_ssid_summaries.append(ssid_summary)
# Step 4: Save enriched output CSV
write_enriched_csv(args.output, fieldnames, enriched_rows) write_enriched_csv(args.output, fieldnames, enriched_rows)
print(f"[+] Enriched CSV written to {args.output}")
# NOTE: SSID summary generation could ALSO come from IndexedCapture later... # Step 5: Dummy sidecar generation (merging empties)
# but for now, use your merge_ssid_summaries method if needed. merged_ssid_summary = merge_ssid_summaries(all_ssid_summaries)
write_ssid_sidecar(args.output, merged_ssid_summary)
print(f"[+] Enrichment complete: {args.output}") print(f"[+] Dummy SSID sidecar written (empty for now)")
total_elapsed_time = time.perf_counter() - total_start_time total_elapsed_time = time.perf_counter() - total_start_time
print(f"[+] Total time taken: {total_elapsed_time:.2f} seconds") print(f"[+] Total time taken: {total_elapsed_time:.2f} seconds")