502 lines
18 KiB
Python
Executable file
502 lines
18 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
import argparse
|
|
import csv
|
|
from datetime import datetime
|
|
import pyshark
|
|
from statistics import mean
|
|
from collections import defaultdict
|
|
|
|
# United States regulatory domain channel lookup table
|
|
|
|
CHANNEL_LOOKUP_TABLE = {
|
|
# 2.4 GHz (non-DFS, always allowed)
|
|
1: {"freq": 2412, "dfs": False, "band": "2.4GHz"},
|
|
2: {"freq": 2417, "dfs": False, "band": "2.4GHz"},
|
|
3: {"freq": 2422, "dfs": False, "band": "2.4GHz"},
|
|
4: {"freq": 2427, "dfs": False, "band": "2.4GHz"},
|
|
5: {"freq": 2432, "dfs": False, "band": "2.4GHz"},
|
|
6: {"freq": 2437, "dfs": False, "band": "2.4GHz"},
|
|
7: {"freq": 2442, "dfs": False, "band": "2.4GHz"},
|
|
8: {"freq": 2447, "dfs": False, "band": "2.4GHz"},
|
|
9: {"freq": 2452, "dfs": False, "band": "2.4GHz"},
|
|
10: {"freq": 2457, "dfs": False, "band": "2.4GHz"},
|
|
11: {"freq": 2462, "dfs": False, "band": "2.4GHz"},
|
|
|
|
# 5 GHz UNII-1 (indoor only)
|
|
36: {"freq": 5180, "dfs": False, "band": "UNII-1"},
|
|
40: {"freq": 5200, "dfs": False, "band": "UNII-1"},
|
|
44: {"freq": 5220, "dfs": False, "band": "UNII-1"},
|
|
48: {"freq": 5240, "dfs": False, "band": "UNII-1"},
|
|
|
|
# 5 GHz UNII-2 (DFS required)
|
|
52: {"freq": 5260, "dfs": True, "band": "UNII-2"},
|
|
56: {"freq": 5280, "dfs": True, "band": "UNII-2"},
|
|
60: {"freq": 5300, "dfs": True, "band": "UNII-2"},
|
|
64: {"freq": 5320, "dfs": True, "band": "UNII-2"},
|
|
|
|
# 5 GHz UNII-2e (DFS required)
|
|
100: {"freq": 5500, "dfs": True, "band": "UNII-2e"},
|
|
104: {"freq": 5520, "dfs": True, "band": "UNII-2e"},
|
|
108: {"freq": 5540, "dfs": True, "band": "UNII-2e"},
|
|
112: {"freq": 5560, "dfs": True, "band": "UNII-2e"},
|
|
116: {"freq": 5580, "dfs": True, "band": "UNII-2e"},
|
|
120: {"freq": 5600, "dfs": True, "band": "UNII-2e"},
|
|
124: {"freq": 5620, "dfs": True, "band": "UNII-2e"},
|
|
128: {"freq": 5640, "dfs": True, "band": "UNII-2e"},
|
|
132: {"freq": 5660, "dfs": True, "band": "UNII-2e"},
|
|
136: {"freq": 5680, "dfs": True, "band": "UNII-2e"},
|
|
140: {"freq": 5700, "dfs": True, "band": "UNII-2e"},
|
|
|
|
# 5 GHz UNII-3 (outdoor/indoor, no DFS)
|
|
149: {"freq": 5745, "dfs": False, "band": "UNII-3"},
|
|
153: {"freq": 5765, "dfs": False, "band": "UNII-3"},
|
|
157: {"freq": 5785, "dfs": False, "band": "UNII-3"},
|
|
161: {"freq": 5805, "dfs": False, "band": "UNII-3"},
|
|
165: {"freq": 5825, "dfs": False, "band": "UNII-3"},
|
|
}
|
|
|
|
FREQ_LOOKUP_TABLE = {v["freq"]: ch for ch, v in CHANNEL_LOOKUP_TABLE.items()}
|
|
|
|
def get_channel_from_freq(freq):
|
|
return FREQ_LOOKUP_TABLE.get(freq, None)
|
|
|
|
def get_freq_details(channel):
|
|
return CHANNEL_LOOKUP_TABLE.get(channel, None)
|
|
|
|
def get_aps(capture, ap_channel):
|
|
try:
|
|
ap_channel = int(ap_channel)
|
|
except ValueError:
|
|
print(f"[!] Could not parse channel number: {ap_channel}")
|
|
return 0
|
|
|
|
aps = set()
|
|
ghost_clients = set()
|
|
|
|
for packet in capture:
|
|
try:
|
|
if 'radiotap' not in packet or 'wlan' not in packet:
|
|
continue
|
|
|
|
radio = packet.radiotap
|
|
wlan = packet.wlan
|
|
|
|
if not hasattr(radio, 'channel') or not hasattr(radio.channel, 'freq'):
|
|
continue
|
|
|
|
packet_freq = int(radio.channel.freq)
|
|
|
|
packet_channel = get_channel_from_freq(packet_freq)
|
|
|
|
# For debugging purposes, print the channel and frequency
|
|
# print(f"Packet Channel: {packet_channel}, Frequency: {packet_freq} MHz")
|
|
|
|
if packet_channel != ap_channel:
|
|
continue
|
|
|
|
# Check for beacon or probe response
|
|
ts_hex = getattr(wlan, 'type_subtype', None)
|
|
if ts_hex is None:
|
|
continue
|
|
|
|
ts = int(ts_hex, 16)
|
|
if ts not in (5, 8): # Probe Response or Beacon
|
|
continue
|
|
|
|
# Grab BSSID
|
|
bssid = getattr(wlan, 'bssid', '').lower()
|
|
if bssid and bssid != 'ff:ff:ff:ff:ff:ff':
|
|
aps.add(bssid)
|
|
|
|
except Exception as e:
|
|
print(f"[DEBUG] Packet parse error: {e}")
|
|
continue
|
|
|
|
return aps
|
|
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('--csv', required=True, help='Input speedtest CSV')
|
|
parser.add_argument('--pcapng', required=True, help='Kismet-generated .pcapng file')
|
|
parser.add_argument('--output', required=True, help='Output enriched CSV')
|
|
return parser.parse_args()
|
|
|
|
def convert_timestamp_to_epoch(ts_string):
|
|
try:
|
|
return int(datetime.fromisoformat(ts_string.replace("Z", "+00:00")).timestamp())
|
|
except Exception as e:
|
|
print(f"[!] Failed to parse timestamp: {ts_string}")
|
|
return None
|
|
|
|
def get_clients_on_ap(capture, ap_bssid):
|
|
clients = defaultdict(int)
|
|
ap_bssid = ap_bssid.lower()
|
|
|
|
for packet in capture:
|
|
try:
|
|
if not hasattr(packet, 'wlan'):
|
|
continue
|
|
|
|
sa = getattr(packet.wlan, 'sa', '').lower()
|
|
da = getattr(packet.wlan, 'da', '').lower()
|
|
bssid = getattr(packet.wlan, 'bssid', '').lower()
|
|
|
|
# Count any frame *to or from* a client, if AP is involved
|
|
if bssid == ap_bssid or sa == ap_bssid or da == ap_bssid:
|
|
# If it's the AP sending, add the destination (client)
|
|
if sa == ap_bssid and da and da != ap_bssid and not da.startswith("ff:ff:ff"):
|
|
clients[da] += 1
|
|
# If it's the client sending, add the source
|
|
elif sa and sa != ap_bssid and not sa.startswith("ff:ff:ff"):
|
|
clients[sa] += 1
|
|
except AttributeError:
|
|
continue
|
|
|
|
# Only count clients that show up more than 3 times — tweak as needed
|
|
stable_clients = [mac for mac, count in clients.items() if count > 3]
|
|
return len(stable_clients)
|
|
|
|
def get_clients_on_channel(capture, ap_channel, ap_bssid):
|
|
try:
|
|
ap_channel = int(ap_channel)
|
|
except ValueError:
|
|
print(f"[!] Could not parse channel number: {ap_channel}")
|
|
return 0
|
|
|
|
clients = set()
|
|
|
|
for packet in capture:
|
|
try:
|
|
if 'radiotap' not in packet or 'wlan' not in packet:
|
|
continue
|
|
|
|
radio = packet.radiotap
|
|
wlan = packet.wlan
|
|
|
|
if not hasattr(radio, 'channel') or not hasattr(radio.channel, 'freq'):
|
|
continue
|
|
|
|
packet_freq = int(radio.channel.freq)
|
|
|
|
packet_channel = get_channel_from_freq(packet_freq)
|
|
|
|
# For debugging purposes, print the channel and frequency
|
|
# print(f"Packet Channel: {packet_channel}, Frequency: {packet_freq} MHz")
|
|
|
|
if packet_channel != ap_channel:
|
|
continue
|
|
|
|
sa = getattr(wlan, 'sa', '').lower()
|
|
da = getattr(wlan, 'da', '').lower()
|
|
|
|
for mac in (sa, da):
|
|
if mac and mac != 'ff:ff:ff:ff:ff:ff' and mac != ap_bssid:
|
|
clients.add(mac)
|
|
|
|
except AttributeError:
|
|
continue
|
|
except Exception as e:
|
|
print(f"[!] Error parsing packet: {e}")
|
|
continue
|
|
|
|
return len(clients)
|
|
|
|
def get_aps_on_channel(capture, ap_channel):
|
|
return len(get_aps(capture, ap_channel))
|
|
|
|
def calculate_signal_strength_stats(capture, ap_channel):
|
|
try:
|
|
ap_channel = int(ap_channel)
|
|
except ValueError:
|
|
print(f"[!] Could not parse channel number: {ap_channel}")
|
|
return 0
|
|
|
|
ap_signals = []
|
|
for packet in capture:
|
|
try:
|
|
if 'radiotap' not in packet or 'wlan' not in packet:
|
|
continue
|
|
|
|
radio = packet.radiotap
|
|
wlan = packet.wlan
|
|
|
|
if not hasattr(radio, 'channel') or not hasattr(radio.channel, 'freq'):
|
|
continue
|
|
|
|
packet_freq = int(radio.channel.freq)
|
|
packet_channel = get_channel_from_freq(packet_freq)
|
|
|
|
if packet_channel != ap_channel:
|
|
continue
|
|
|
|
# Check for beacon or probe response
|
|
ts_hex = getattr(wlan, 'type_subtype', None)
|
|
if ts_hex is None:
|
|
continue
|
|
|
|
ts = int(ts_hex, 16)
|
|
if ts not in (5, 8): # Probe Response or Beacon
|
|
continue
|
|
|
|
# Get signal strength
|
|
signal_strength = getattr(radio, 'dbm_antsignal', None)
|
|
if signal_strength is not None:
|
|
ap_signals.append(int(signal_strength))
|
|
|
|
except Exception as e:
|
|
print(f"[DEBUG] Signal strength parse error: {e}")
|
|
continue
|
|
|
|
if ap_signals:
|
|
return mean(ap_signals), max(ap_signals)
|
|
else:
|
|
return 0, 0
|
|
|
|
def get_unlinked_devices(capture, ap_channel):
|
|
aps = get_aps(capture, ap_channel)
|
|
ghost_clients = set()
|
|
|
|
for packet in capture:
|
|
if 'radiotap' not in packet or 'wlan' not in packet:
|
|
continue
|
|
|
|
radio = packet.radiotap
|
|
wlan = packet.wlan
|
|
|
|
if not hasattr(radio, 'channel') or not hasattr(radio.channel, 'freq'):
|
|
continue
|
|
|
|
packet_freq = int(radio.channel.freq)
|
|
packet_channel = get_channel_from_freq(packet_freq)
|
|
if packet_channel != ap_channel:
|
|
continue
|
|
|
|
for mac in (getattr(wlan, 'sa', ''), getattr(wlan, 'da', '')):
|
|
mac = mac.lower()
|
|
if (
|
|
mac
|
|
and mac != 'ff:ff:ff:ff:ff:ff'
|
|
and mac not in aps
|
|
):
|
|
ghost_clients.add(mac)
|
|
|
|
return len(ghost_clients)
|
|
|
|
def analyze_pcap(pcapng_path, start_ts, end_ts, ap_bssid, ap_channel):
|
|
cap = pyshark.FileCapture(
|
|
pcapng_path,
|
|
use_json=True,
|
|
include_raw=False
|
|
)
|
|
|
|
ap_channel = int(ap_channel)
|
|
|
|
clients_on_ap = 0
|
|
clients_on_channel = 0
|
|
aps_on_channel = 0
|
|
avg_ap_signal = 0
|
|
max_ap_signal = 0
|
|
unlinked_devices = 0
|
|
|
|
ssid_clients = defaultdict(set)
|
|
ssid_signals = defaultdict(list)
|
|
ssid_to_bssids = defaultdict(set)
|
|
bssid_to_ssid = {}
|
|
cisco_reported_clients = []
|
|
|
|
try:
|
|
# Filter packets manually by timestamp
|
|
filtered_packets = []
|
|
for packet in cap:
|
|
try:
|
|
frame_time = float(packet.frame_info.time_epoch)
|
|
if start_ts <= frame_time <= end_ts:
|
|
filtered_packets.append(packet)
|
|
except Exception:
|
|
continue
|
|
|
|
for packet in filtered_packets:
|
|
try:
|
|
if 'radiotap' not in packet or 'wlan' not in packet:
|
|
continue
|
|
|
|
radio = packet.radiotap
|
|
wlan = packet.wlan
|
|
|
|
if not hasattr(radio.channel, 'freq'):
|
|
continue
|
|
|
|
packet_freq = int(radio.channel.freq)
|
|
packet_channel = get_channel_from_freq(packet_freq)
|
|
|
|
subtype = int(getattr(wlan, 'type_subtype', 0), 16)
|
|
if subtype not in (5, 8): # Probe Response or Beacon
|
|
continue
|
|
|
|
# Grab management layer once
|
|
try:
|
|
mgt = packet.get_multiple_layers('wlan.mgt')[0]
|
|
tags = mgt._all_fields.get('wlan.tagged.all', {}).get('wlan.tag', [])
|
|
except Exception as e:
|
|
continue
|
|
|
|
ssid = None
|
|
for tag in tags:
|
|
tag_number = tag.get('wlan.tag.number')
|
|
|
|
# SSID
|
|
if tag_number == '0' and 'wlan.ssid' in tag:
|
|
try:
|
|
raw_ssid = tag['wlan.ssid']
|
|
ssid_bytes = bytes.fromhex(raw_ssid.replace(':', ''))
|
|
ssid = ssid_bytes.decode('utf-8', errors='replace')
|
|
except Exception as e:
|
|
ssid = None
|
|
|
|
# Cisco Client Count
|
|
if tag_number == '133':
|
|
try:
|
|
num_clients = int(tag.get('wlan.cisco.ccx1.clients'))
|
|
cisco_reported_clients.append(num_clients)
|
|
except (TypeError, ValueError):
|
|
pass
|
|
|
|
if not ssid:
|
|
continue
|
|
|
|
bssid = getattr(wlan, 'bssid', '').lower()
|
|
if not bssid or bssid == 'ff:ff:ff:ff:ff:ff':
|
|
continue
|
|
|
|
bssid_to_ssid[bssid] = ssid
|
|
ssid_to_bssids[ssid].add(bssid)
|
|
|
|
signal = getattr(radio, 'dbm_antsignal', None)
|
|
if signal:
|
|
ssid_signals[ssid].append(int(signal))
|
|
|
|
except Exception as e:
|
|
continue
|
|
|
|
our_ssid = bssid_to_ssid.get(ap_bssid, None)
|
|
|
|
clients_on_ap = get_clients_on_ap(filtered_packets, ap_bssid)
|
|
clients_on_channel = get_clients_on_channel(filtered_packets, ap_channel, ap_bssid)
|
|
aps_on_channel = get_aps_on_channel(filtered_packets, ap_channel)
|
|
avg_ap_signal, max_ap_signal = calculate_signal_strength_stats(filtered_packets, ap_channel)
|
|
unlinked_devices = get_unlinked_devices(filtered_packets, ap_channel)
|
|
cisco_avg_reported_clients = mean(cisco_reported_clients) if cisco_reported_clients else 0
|
|
cisco_max_reported_clients = max(cisco_reported_clients) if cisco_reported_clients else 0
|
|
num_bssids = len(ssid_to_bssids[our_ssid]) if our_ssid in ssid_to_bssids else 0
|
|
average_signal = mean(ssid_signals[our_ssid]) if our_ssid in ssid_signals else 0
|
|
max_ssid_signal = max(ssid_signals[our_ssid]) if our_ssid in ssid_signals else 0
|
|
num_channels_ssid = len(ssid_to_bssids[our_ssid]) if our_ssid in ssid_to_bssids else 0
|
|
|
|
# Generate SSID summary sidecar
|
|
ssid_summary = []
|
|
for ssid, bssids in ssid_to_bssids.items():
|
|
signals = ssid_signals.get(ssid, [])
|
|
ssid_summary.append({
|
|
'SSID': ssid,
|
|
'BSSID_Count': len(bssids),
|
|
'Avg_Signal': mean(signals) if signals else 0,
|
|
'Max_Signal': max(signals) if signals else 0,
|
|
'Min_Signal': min(signals) if signals else 0,
|
|
'Clients_Seen': len(ssid_clients.get(ssid, [])),
|
|
'CiscoAvgClients': round(mean(cisco_reported_clients), 2) if cisco_reported_clients else 0,
|
|
'CiscoMaxClients': max(cisco_reported_clients) if cisco_reported_clients else 0
|
|
})
|
|
|
|
|
|
finally:
|
|
cap.close()
|
|
|
|
return (clients_on_ap, clients_on_channel, aps_on_channel,
|
|
avg_ap_signal, max_ap_signal, unlinked_devices,
|
|
cisco_avg_reported_clients, cisco_max_reported_clients, num_bssids,
|
|
average_signal, max_ssid_signal, num_channels_ssid,
|
|
ssid_summary)
|
|
|
|
def main():
|
|
args = parse_args()
|
|
cap = pyshark.FileCapture(
|
|
args.pcapng,
|
|
use_json=True,
|
|
include_raw=False,
|
|
keep_packets=False
|
|
)
|
|
|
|
# Checking if the pcapng file is valid
|
|
count = 0
|
|
try:
|
|
for packet in cap:
|
|
count += 1
|
|
if count > 0:
|
|
break
|
|
except Exception as e:
|
|
print(f"[!] Error reading pcapng file: {e}")
|
|
return
|
|
finally:
|
|
cap.close()
|
|
|
|
with open(args.csv, newline='') as infile, open(args.output, 'w', newline='', encoding='utf-8') as outfile:
|
|
reader = csv.DictReader(infile)
|
|
fieldnames = reader.fieldnames + [
|
|
'ClientsOnAP', 'ClientsOnChannel', 'APsOnChannel',
|
|
'AvgAPSignal', 'StrongestAPSignal', 'UnlinkedDevices',
|
|
'CiscoAvgReportedClients', 'CiscoMaxReportedClients', 'NumberofBSSIDsOnSSID',
|
|
'AvgSSIDSignal', 'MaxSSIDSignal', 'NumberofChannelsOnSSID'
|
|
]
|
|
writer = csv.DictWriter(outfile, fieldnames=fieldnames)
|
|
writer.writeheader()
|
|
|
|
for row in reader:
|
|
tstart = convert_timestamp_to_epoch(row.get("StartTimestamp"))
|
|
tend = convert_timestamp_to_epoch(row.get("EndTimestamp"))
|
|
ap_bssid = row.get("BSSID", "").strip().lower()
|
|
ap_channel = row.get("Channel")
|
|
|
|
if not tstart or not tend:
|
|
writer.writerow(row)
|
|
continue
|
|
|
|
clients_ap, clients_chan, aps_chan, \
|
|
avg_signal, strongest_signal, unlinked, \
|
|
cisco_avg_reported_clients, cisco_max_reported_clients, num_bssids, \
|
|
average_signal, max_ssid_signal, num_channels_ssid, \
|
|
ssid_summary = analyze_pcap(args.pcapng, tstart, tend, ap_bssid, ap_channel)
|
|
|
|
row.update({
|
|
'ClientsOnAP': clients_ap,
|
|
'ClientsOnChannel': clients_chan,
|
|
'APsOnChannel': aps_chan,
|
|
'AvgAPSignal': avg_signal,
|
|
'StrongestAPSignal': strongest_signal,
|
|
'UnlinkedDevices': unlinked,
|
|
'CiscoAvgReportedClients': cisco_avg_reported_clients,
|
|
'CiscoMaxReportedClients': cisco_max_reported_clients,
|
|
'NumberofBSSIDsOnSSID': num_bssids,
|
|
'AvgSSIDSignal': average_signal,
|
|
'MaxSSIDSignal': max_ssid_signal,
|
|
'NumberofChannelsOnSSID': num_channels_ssid
|
|
})
|
|
|
|
writer.writerow(row)
|
|
|
|
# Dump SSID metrics sidecar
|
|
if ssid_summary:
|
|
ssid_outfile = args.output.replace('.csv+rf.csv', '-ssid-metrics.csv')
|
|
with open(ssid_outfile, 'w', newline='', encoding='utf-8') as f:
|
|
fieldnames = [
|
|
'SSID', 'BSSID_Count', 'Avg_Signal', 'Max_Signal',
|
|
'Min_Signal', 'Clients_Seen', 'CiscoAvgClients', 'CiscoMaxClients'
|
|
]
|
|
ssid_writer = csv.DictWriter(f, fieldnames=fieldnames)
|
|
ssid_writer.writeheader()
|
|
for row in ssid_summary:
|
|
ssid_writer.writerow(row)
|
|
|
|
print(f"[+] Enrichment complete: {args.output}")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|