wifi_test/enrich.py

206 lines
7.4 KiB
Python
Executable file

#!/usr/bin/env python3
import argparse
import csv
import pyshark
from statistics import mean
from collections import defaultdict
from enrichment.utils import get_channel_from_freq, convert_timestamp_to_epoch
from enrichment.filters import filter_by_time
from enrichment.metrics_clients import get_clients_on_ap, get_clients_on_channel
from enrichment.metrics_signals import (
get_aps_on_channel,
calculate_signal_strength_stats,
get_unlinked_devices
)
from enrichment.metrics_ssid import extract_ssid_metrics
from enrichment.csv_handler import (
read_csv_input,
write_enriched_csv,
write_ssid_sidecar
)
from enrichment.merge_ssid_summaries import merge_ssid_summaries
import time
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('--csv', required=True, help='Input speedtest CSV')
parser.add_argument('--pcapng', required=True, help='Kismet-generated .pcapng file')
parser.add_argument('--output', required=True, help='Output enriched CSV')
return parser.parse_args()
def analyze_pcap(pcapng_path, start_ts, end_ts, ap_bssid, ap_channel):
cap = pyshark.FileCapture(
pcapng_path,
use_json=True,
include_raw=False
)
ap_channel = int(ap_channel)
clients_on_ap = 0
clients_on_channel = 0
aps_on_channel = 0
avg_ap_signal = 0
max_ap_signal = 0
unlinked_devices = 0
ssid_clients = defaultdict(set)
ssid_hidden_status = {}
ssid_encryption_status = {}
cisco_ssid_clients = defaultdict(list)
ssid_signals = defaultdict(list)
ssid_to_bssids = defaultdict(set)
bssid_to_ssid = {}
cisco_reported_clients = []
ssid_packet_counts = defaultdict(int)
try:
filtered_packets = filter_by_time(cap, start_ts, end_ts)
(
bssid_to_ssid,
ssid_to_bssids,
ssid_hidden_status,
ssid_encryption_status,
ssid_signals,
cisco_ssid_clients,
cisco_reported_clients,
ssid_packet_counts,
ssid_clients
) = extract_ssid_metrics(filtered_packets)
our_ssid = bssid_to_ssid.get(ap_bssid, None)
clients_on_ap = get_clients_on_ap(filtered_packets, ap_bssid)
clients_on_channel = get_clients_on_channel(filtered_packets, ap_channel, ap_bssid)
aps_on_channel = get_aps_on_channel(filtered_packets, ap_channel)
avg_ap_signal, max_ap_signal = calculate_signal_strength_stats(filtered_packets, ap_channel)
unlinked_devices = get_unlinked_devices(filtered_packets, ap_channel)
cisco_avg_reported_clients = mean(cisco_reported_clients) if cisco_reported_clients else 0
cisco_max_reported_clients = max(cisco_reported_clients) if cisco_reported_clients else 0
num_bssids = len(ssid_to_bssids[our_ssid]) if our_ssid in ssid_to_bssids else 0
average_signal = mean(ssid_signals[our_ssid]) if our_ssid in ssid_signals else 0
max_ssid_signal = max(ssid_signals[our_ssid]) if our_ssid in ssid_signals else 0
num_channels_ssid = len(ssid_to_bssids[our_ssid]) if our_ssid in ssid_to_bssids else 0
# Generate SSID summary sidecar
ssid_summary = []
for ssid, bssids in ssid_to_bssids.items():
signals = ssid_signals.get(ssid, [])
ssid_summary.append({
'SSID': ssid,
'Hidden': ssid_hidden_status.get(ssid, False),
'Open': ssid_encryption_status.get(ssid, True),
'BSSID_Count': len(bssids),
'BSSIDs': ";".join(sorted(bssids)),
'Avg_Signal': mean(signals) if signals else 0,
'Max_Signal': max(signals) if signals else 0,
'Min_Signal': min(signals) if signals else 0,
'Clients_Seen': len(ssid_clients.get(ssid, [])),
'CiscoAvgClients': round(mean(cisco_ssid_clients[ssid]), 2) if ssid in cisco_ssid_clients else 0,
'CiscoMaxClients': max(cisco_ssid_clients[ssid]) if ssid in cisco_ssid_clients else 0,
'PacketCount': ssid_packet_counts[ssid]
})
finally:
packet_count = len(filtered_packets)
cap.close()
return (
clients_on_ap, clients_on_channel, aps_on_channel,
avg_ap_signal, max_ap_signal, unlinked_devices,
cisco_avg_reported_clients, cisco_max_reported_clients, num_bssids,
average_signal, max_ssid_signal, num_channels_ssid,
ssid_summary, packet_count
)
def main():
total_start_time = time.perf_counter()
args = parse_args()
cap = pyshark.FileCapture(
args.pcapng,
use_json=True,
include_raw=False,
keep_packets=False
)
# Checking if the pcapng file is valid
count = 0
try:
for packet in cap:
count += 1
if count > 0:
break
except Exception as e:
print(f"[!] Error reading pcapng file: {e}")
return
finally:
cap.close()
rows, original_fields = read_csv_input(args.csv)
fieldnames = original_fields + [
'ClientsOnAP', 'ClientsOnChannel', 'APsOnChannel',
'AvgAPSignal', 'StrongestAPSignal', 'UnlinkedDevices',
'CiscoAvgReportedClients', 'CiscoMaxReportedClients', 'NumberofBSSIDsOnSSID',
'AvgSSIDSignal', 'MaxSSIDSignal', 'NumberofChannelsOnSSID', 'PacketCount'
]
enriched_rows = []
ssid_summary = None
all_ssid_summaries = []
for row in rows:
tstart = convert_timestamp_to_epoch(row.get("StartTimestamp"))
tend = convert_timestamp_to_epoch(row.get("EndTimestamp"))
ap_bssid = row.get("BSSID", "").strip().lower()
ap_channel = row.get("Channel")
if not tstart or not tend:
enriched_rows.append(row)
continue
start_time = time.perf_counter()
result = analyze_pcap(args.pcapng, tstart, tend, ap_bssid, ap_channel)
(
clients_ap, clients_chan, aps_chan,
avg_signal, strongest_signal, unlinked,
cisco_avg_reported_clients, cisco_max_reported_clients, num_bssids,
average_signal, max_ssid_signal, num_channels_ssid,
ssid_summary, packet_count
) = result
elapsed_time = time.perf_counter() - start_time
print(f"[+] Analyzed {ap_bssid} in {elapsed_time:.2f} seconds")
row.update({
'ClientsOnAP': clients_ap,
'ClientsOnChannel': clients_chan,
'APsOnChannel': aps_chan,
'AvgAPSignal': avg_signal,
'StrongestAPSignal': strongest_signal,
'UnlinkedDevices': unlinked,
'CiscoAvgReportedClients': cisco_avg_reported_clients,
'CiscoMaxReportedClients': cisco_max_reported_clients,
'NumberofBSSIDsOnSSID': num_bssids,
'AvgSSIDSignal': average_signal,
'MaxSSIDSignal': max_ssid_signal,
'NumberofChannelsOnSSID': num_channels_ssid,
'PacketCount': packet_count
})
enriched_rows.append(row)
ssid_summary = result[-2]
all_ssid_summaries.append(ssid_summary)
write_enriched_csv(args.output, fieldnames, enriched_rows)
merged_ssid_summary = merge_ssid_summaries(all_ssid_summaries)
write_ssid_sidecar(args.output, merged_ssid_summary)
print(f"[+] Enrichment complete: {args.output}")
total_elapsed_time = time.perf_counter() - total_start_time
print(f"[+] Total time taken: {total_elapsed_time:.2f} seconds")
if __name__ == "__main__":
main()