wifi_test/kismet_enrich_from_pcap.py
2025-04-11 09:33:51 -05:00

156 lines
5 KiB
Python
Executable file

#!/usr/bin/env python3
import argparse
import csv
from datetime import datetime
import pyshark
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('--csv', required=True, help='Input speedtest CSV')
parser.add_argument('--pcapng', required=True, help='Kismet-generated .pcapng file')
parser.add_argument('--output', required=True, help='Output enriched CSV')
return parser.parse_args()
def convert_timestamp_to_epoch(ts_string):
try:
return int(datetime.fromisoformat(ts_string.replace("Z", "+00:00")).timestamp())
except Exception as e:
print(f"[!] Failed to parse timestamp: {ts_string}")
return None
def get_clients_on_ap(capture, ap_bssid):
clients = set()
ap_bssid = ap_bssid.lower() # Normalize for comparison
for packet in capture:
try:
if not hasattr(packet, 'wlan'):
continue
sa = getattr(packet.wlan, 'sa', '').lower()
da = getattr(packet.wlan, 'da', '').lower()
bssid = getattr(packet.wlan, 'bssid', '').lower()
# Count any frame *to or from* a client, if AP is involved
if bssid == ap_bssid or sa == ap_bssid or da == ap_bssid:
# If it's the AP sending, add the destination (client)
if sa == ap_bssid and da and da != ap_bssid:
clients.add(da)
# If it's the client sending, add the source
elif sa and sa != ap_bssid:
clients.add(sa)
except AttributeError:
continue
return len(clients)
def get_clients_on_channel(capture, ap_channel, ap_bssid):
clients = set()
ap_bssid = ap_bssid.lower() # Normalize for comparison
ap_channel = str(ap_channel) # Ensure channel is a string for comparison
for packet in capture:
try:
if not hasattr(packet, 'wlan'):
continue
channel = getattr(packet.wlan, 'channel', None)
sa = getattr(packet.wlan, 'sa', '').lower()
da = getattr(packet.wlan, 'da', '').lower()
bssid = getattr(packet.wlan, 'bssid', '').lower()
# Check if the packet is on the specified channel and not from the AP
if channel == ap_channel and (sa != ap_bssid and da != ap_bssid):
clients.add(sa)
clients.add(da)
except AttributeError:
continue
return len(clients)
def analyze_pcap(pcapng_path, start_ts, end_ts, ap_bssid, ap_channel):
cap = pyshark.FileCapture(
pcapng_path,
display_filter=f'frame.time_epoch >= {start_ts} && frame.time_epoch <= {end_ts}',
use_json=True,
include_raw=False
)
# Get clients on the specified AP
clients_on_ap = get_clients_on_ap(cap, ap_bssid.lower())
# Get clients on the specified channel
# Placeholder: Logic will be added for:
# - APsOnChannel
# - CongestionScore
# - AvgAPSignal
# - StrongestAPSignal
# - UnlinkedDevices
cap.close()
return clients_on_ap, 0, 0, None, None, None, 0
def main():
args = parse_args()
cap = pyshark.FileCapture(
args.pcapng,
use_json=True,
include_raw=False,
keep_packets=False
)
# Checking if the pcapng file is valid
count = 0
try:
for packet in cap:
count += 1
if count > 0:
break
except Exception as e:
print(f"[!] Error reading pcapng file: {e}")
return
finally:
cap.close()
with open(args.csv, newline='') as infile, open(args.output, 'w', newline='', encoding='utf-8') as outfile:
reader = csv.DictReader(infile)
fieldnames = reader.fieldnames + [
'ClientsOnAP', 'ClientsOnChannel', 'APsOnChannel', 'CongestionScore',
'AvgAPSignal', 'StrongestAPSignal', 'UnlinkedDevices'
]
writer = csv.DictWriter(outfile, fieldnames=fieldnames)
writer.writeheader()
for row in reader:
tstart = convert_timestamp_to_epoch(row.get("StartTimestamp"))
tend = convert_timestamp_to_epoch(row.get("EndTimestamp"))
ap_bssid = row.get("BSSID", "").strip().lower()
ap_channel = row.get("Channel")
if not tstart or not tend:
writer.writerow(row)
continue
clients_ap, clients_chan, aps_chan, congestion, avg_signal, strongest_signal, unlinked = analyze_pcap(args.pcapng, tstart, tend, ap_bssid, ap_channel)
row.update({
'ClientsOnAP': clients_ap,
'ClientsOnChannel': clients_chan,
'APsOnChannel': aps_chan,
'CongestionScore': congestion,
'AvgAPSignal': avg_signal,
'StrongestAPSignal': strongest_signal,
'UnlinkedDevices': unlinked
})
writer.writerow(row)
print(f"[+] Enrichment complete: {args.output}")
if __name__ == "__main__":
main()