Compare commits

...

2 commits

Author SHA1 Message Date
Yaro Kasear
32488ccbe2 Add debug print statements to IndexedCapture for packet indexing insights 2025-04-29 10:32:43 -05:00
Yaro Kasear
dca27cf034 Revert a fix that did not work. 2025-04-29 10:28:32 -05:00

View file

@ -72,6 +72,27 @@ class IndexedCapture:
capture.close()
# Debugging indexing
# 1. Inspect self.channel_to_aps and self.channel_to_clients
# Are they empty?
print(f"Indexed {len(self.time_index)} packets.")
# Are they missing just specific channels?
print(f"Channels with APs: {len(self.channel_to_aps)}")
# Are they populated, but wrong?
print(f"Channels with clients: {len(self.channel_to_clients)}")
# How many channels were seen?
print(f"Total channels: {len(set(self.channel_to_aps.keys()).union(set(self.channel_to_clients.keys())))}")
# How many APS were put into channel_to_aps?
print(f"Total APs: {sum(len(aps) for aps in self.channel_to_aps.values())}")
# How many clients were put into channel_to_clients?
print(f"Total clients: {sum(len(clients) for clients in self.channel_to_clients.values())}")
# Print out a few entries from channel_to_clients.
for channel, clients in list(self.channel_to_clients.items())[:5]:
print(f"Channel {channel}: {len(clients)} clients")
# Print out a few entries from channel_to_aps.
for channel, aps in list(self.channel_to_aps.items())[:5]:
print(f"Channel {channel}: {len(aps)} APs")
def _process_management_frame(self, packet, wlan, radio, channel):
try:
mgt = packet.get_multiple_layers('wlan.mgt')[0]
@ -138,13 +159,13 @@ class IndexedCapture:
def query_metrics(self, start_ts, end_ts, ap_bssid, ap_channel):
packets = self.get_packets_in_time_range(start_ts, end_ts)
print(f"[DEBUG] Packets in window: {len(packets)} between {start_ts} and {end_ts}")
# Instead of global stats, base everything off `packets`
# Use indexed data instead of recalculating
clients_on_ap = self._count_clients_on_ap(packets, ap_bssid)
clients_on_channel = self._count_clients_on_channel(packets, ap_channel)
aps_on_channel = self._count_aps_on_channel(packets, ap_channel)
avg_ap_signal, max_ap_signal = self._calc_signal_stats_window(packets, ap_channel)
clients_on_channel = len(self.channel_to_clients.get(ap_channel, []))
aps_on_channel = len(self.channel_to_aps.get(ap_channel, []))
avg_ap_signal, max_ap_signal = self._calc_signal_stats(ap_channel)
unlinked_devices = self._count_unlinked_devices(packets, ap_channel)
our_ssid = self.bssid_to_ssid.get(ap_bssid)
@ -162,7 +183,6 @@ class IndexedCapture:
num_channels_ssid, packet_count
)
def _count_clients_on_ap(self, packets, ap_bssid):
clients = defaultdict(int)
ap_bssid = ap_bssid.lower()
@ -222,92 +242,3 @@ class IndexedCapture:
if ssid in self.cisco_ssid_clients:
return max(self.cisco_ssid_clients[ssid])
return 0
def _count_clients_on_channel(self, packets, ap_channel):
clients = set()
for packet in packets:
try:
if 'radiotap' not in packet or 'wlan' not in packet:
continue
radio = packet.radiotap
wlan = packet.wlan
if not hasattr(radio, 'channel') or not hasattr(radio.channel, 'freq'):
continue
freq = int(radio.channel.freq)
packet_channel = get_channel_from_freq(freq)
if packet_channel != ap_channel:
continue
sa = getattr(wlan, 'sa', '').lower()
da = getattr(wlan, 'da', '').lower()
for mac in (sa, da):
if mac and mac != 'ff:ff:ff:ff:ff:ff':
clients.add(mac)
except Exception:
continue
return len(clients)
def _count_aps_on_channel(self, packets, ap_channel):
aps = set()
for packet in packets:
try:
if 'radiotap' not in packet or 'wlan' not in packet:
continue
radio = packet.radiotap
wlan = packet.wlan
if not hasattr(radio, 'channel') or not hasattr(radio.channel, 'freq'):
continue
freq = int(radio.channel.freq)
packet_channel = get_channel_from_freq(freq)
if packet_channel != ap_channel:
continue
subtype = int(getattr(wlan, 'type_subtype', '0'), 16)
if subtype not in (5, 8): # Beacon or Probe Response
continue
bssid = getattr(wlan, 'bssid', '').lower()
if bssid and bssid != 'ff:ff:ff:ff:ff:ff':
aps.add(bssid)
except Exception:
continue
return len(aps)
def _calc_signal_stats_window(self, packets, ap_channel):
signals = []
for packet in packets:
try:
if 'radiotap' not in packet or 'wlan' not in packet:
continue
radio = packet.radiotap
wlan = packet.wlan
if not hasattr(radio, 'channel') or not hasattr(radio.channel, 'freq'):
continue
freq = int(radio.channel.freq)
packet_channel = get_channel_from_freq(freq)
if packet_channel != ap_channel:
continue
signal = getattr(radio, 'dbm_antsignal', None)
if signal is not None:
signals.append(int(signal))
except Exception:
continue
return (mean(signals), max(signals)) if signals else (0, 0)