from collections import defaultdict from statistics import mean from enrichment.utils import get_channel_from_freq class IndexedCapture: def __init__(self, pcap_path): self.pcap_path = pcap_path self.all_packets = [] self.time_index = [] # List of (timestamp, packet) self.bssid_to_ssid = {} # BSSID → SSID self.ssid_to_bssids = defaultdict(set) # SSID → {BSSIDs} self.ssid_hidden_status = {} # SSID → hidden True/False self.ssid_encryption_status = {} # SSID → open/encrypted self.ssid_signals = defaultdict(list) # SSID → list of dBm values self.cisco_ssid_clients = defaultdict(list) # SSID → client counts self.cisco_reported_clients = [] # list of all reported counts self.ssid_packet_counts = defaultdict(int) # SSID → number of packets self.ssid_clients = defaultdict(set) # SSID → MAC addresses self.channel_to_aps = defaultdict(set) # Channel → BSSID self.channel_to_clients = defaultdict(set) # Channel → client MACs self.packet_signals_by_channel = defaultdict(list) # Channel → dBm signals self.packet_timestamps = [] # List of packet timestamps (for quick windowing) self._load_and_index() def _load_and_index(self): import pyshark capture = pyshark.FileCapture( self.pcap_path, use_json=True, include_raw=False, keep_packets=False, display_filter="(wlan.fc.type_subtype == 8 || wlan.fc.type_subtype == 5 || wlan.fc.type == 2) && (wlan.bssid || wlan.sa || wlan.da)" ) for packet in capture: try: ts = float(packet.frame_info.time_epoch) self.time_index.append((ts, packet)) self.packet_timestamps.append(ts) if 'radiotap' not in packet or 'wlan' not in packet: continue radio = packet.radiotap wlan = packet.wlan if not hasattr(radio, 'channel') or not hasattr(radio.channel, 'freq'): continue freq = int(radio.channel.freq) channel = get_channel_from_freq(freq) subtype = int(getattr(wlan, 'type_subtype', '0'), 16) # Management Frames: Beacon / Probe Response if subtype in (5, 8): self._process_management_frame(packet, wlan, radio, channel) # Track clients on this channel sa = getattr(wlan, 'sa', '').lower() da = getattr(wlan, 'da', '').lower() bssid = getattr(wlan, 'bssid', '').lower() for mac in (sa, da): if mac and mac != 'ff:ff:ff:ff:ff:ff': self.channel_to_clients[channel].add(mac) except Exception as e: continue capture.close() def _process_management_frame(self, packet, wlan, radio, channel): try: mgt = packet.get_multiple_layers('wlan.mgt')[0] tags = mgt._all_fields.get('wlan.tagged.all', {}).get('wlan.tag', []) ssid = None hidden_ssid = False privacy_bit = mgt._all_fields.get('wlan_mgt.fixed.capabilities.privacy') is_open = (str(privacy_bit) != '1') for tag in tags: if tag.get('wlan.tag.number') == '0': raw_ssid = tag.get('wlan.ssid', '') if not raw_ssid: hidden_ssid = True ssid = '' else: try: ssid_bytes = bytes.fromhex(raw_ssid.replace(':', '')) ssid = ssid_bytes.decode('utf-8', errors='replace') except Exception: ssid = None if tag.get('wlan.tag.number') == '133': try: num_clients = int(tag.get('wlan.cisco.ccx1.clients')) if ssid: self.cisco_ssid_clients[ssid].append(num_clients) self.cisco_reported_clients.append(num_clients) except (TypeError, ValueError): pass if not ssid: return self.ssid_hidden_status[ssid] = hidden_ssid self.ssid_encryption_status.setdefault(ssid, is_open) self.ssid_packet_counts[ssid] += 1 bssid = getattr(wlan, 'bssid', '').lower() if not bssid or bssid == 'ff:ff:ff:ff:ff:ff': return self.bssid_to_ssid[bssid] = ssid self.ssid_to_bssids[ssid].add(bssid) signal = getattr(radio, 'dbm_antsignal', None) if signal: self.ssid_signals[ssid].append(int(signal)) self.channel_to_aps[channel].add(bssid) if signal: self.packet_signals_by_channel[channel].append(int(signal)) except Exception as e: pass def get_packets_in_time_range(self, start_ts, end_ts): # This is fast because packet timestamps were recorded at load return [ packet for ts, packet in self.time_index if start_ts <= ts <= end_ts ] def query_metrics(self, start_ts, end_ts, ap_bssid, ap_channel): ap_channel = int(ap_channel) assert isinstance(ap_channel, int), f"ap_channel must be int, got {type(ap_channel)}" packets = self.get_packets_in_time_range(start_ts, end_ts) # Build local windowed structures window_clients = defaultdict(set) window_aps = defaultdict(set) window_signals = defaultdict(list) window_ssid_to_bssids = defaultdict(set) window_ssid_signals = defaultdict(list) for packet in packets: try: if 'radiotap' not in packet or 'wlan' not in packet: continue wlan = packet.wlan radio = packet.radiotap sa = getattr(wlan, 'sa', '').lower() da = getattr(wlan, 'da', '').lower() bssid = getattr(wlan, 'bssid', '').lower() if hasattr(radio, 'channel') and hasattr(radio.channel, 'freq'): freq = int(radio.channel.freq) channel = get_channel_from_freq(freq) else: continue # Determine subtype subtype = int(getattr(wlan, 'type_subtype', '0'), 16) # Track APs (beacons / probe responses only) if subtype in (5, 8) and bssid: window_aps[channel].add(bssid) # Track clients (always) for mac in (sa, da): if mac and mac != 'ff:ff:ff:ff:ff:ff': window_clients[channel].add(mac) # Only beacon/probe have SSID ssid = self.bssid_to_ssid.get(bssid) if ssid: window_ssid_to_bssids[ssid].add(bssid) signal = getattr(radio, 'dbm_antsignal', None) if signal: window_ssid_signals[ssid].append(int(signal)) if channel == ap_channel: print(f"[DEBUG] [AP] Adding bssid={bssid} to window_aps[{channel}]") # Track signals signal = getattr(radio, 'dbm_antsignal', None) if signal: window_signals[channel].append(int(signal)) except Exception: continue clients_on_ap = self._count_clients_on_ap(packets, ap_bssid) clients_on_channel = len(window_clients.get(ap_channel, [])) aps_on_channel = len(window_aps.get(ap_channel, [])) avg_ap_signal = mean(window_signals[ap_channel]) if window_signals.get(ap_channel) else 0 max_ap_signal = max(window_signals[ap_channel]) if window_signals.get(ap_channel) else 0 unlinked_devices = self._count_unlinked_devices(packets, ap_channel) our_ssid = self.bssid_to_ssid.get(ap_bssid) num_bssids = len(window_ssid_to_bssids.get(our_ssid, [])) if our_ssid else 0 avg_ssid_signal = mean(window_ssid_signals.get(our_ssid, [])) if our_ssid else 0 max_ssid_signal = max(window_ssid_signals.get(our_ssid, [])) if our_ssid else 0 num_channels_ssid = len({self._channel_of_bssid(b) for b in window_ssid_to_bssids.get(our_ssid, [])}) if our_ssid else 0 packet_count = len(packets) return ( clients_on_ap, clients_on_channel, aps_on_channel, avg_ap_signal, max_ap_signal, unlinked_devices, self._cisco_avg_clients(our_ssid), self._cisco_max_clients(our_ssid), num_bssids, avg_ssid_signal, max_ssid_signal, num_channels_ssid, packet_count ) def _count_clients_on_ap(self, packets, ap_bssid): clients = defaultdict(int) ap_bssid = ap_bssid.lower() for packet in packets: try: if not hasattr(packet, "wlan"): continue sa = getattr(packet.wlan, "sa", '').lower() da = getattr(packet.wlan, "da", '').lower() bssid = getattr(packet.wlan, "bssid", '').lower() if bssid == ap_bssid or sa == ap_bssid or da == ap_bssid: if sa == ap_bssid and da and da != ap_bssid and not da.startswith("ff:ff:ff:ff:ff:ff"): clients[da] += 1 elif sa and sa != ap_bssid and not sa.startswith("ff:ff:ff:ff:ff:ff"): clients[sa] += 1 except AttributeError: continue return len([mac for mac, count in clients.items() if count > 3]) def _calc_signal_stats(self, ap_channel): signals = self.packet_signals_by_channel.get(ap_channel, []) return (mean(signals), max(signals)) if signals else (0, 0) def _count_unlinked_devices(self, packets, ap_channel): aps = self.channel_to_aps.get(ap_channel, set()) ghost_candidates = set() for packet in packets: try: if 'radiotap' not in packet or 'wlan' not in packet: continue radio = packet.radiotap wlan = packet.wlan # Must be on our AP's channel if hasattr(radio, 'channel') and hasattr(radio.channel, 'freq'): freq = int(radio.channel.freq) packet_channel = get_channel_from_freq(freq) if packet_channel != ap_channel: continue else: continue sa = getattr(wlan, 'sa', '').lower() da = getattr(wlan, 'da', '').lower() bssid = getattr(wlan, 'bssid', '').lower() # If the packet is *talking to* any known AP, it's **linked**, not ghost if sa in aps or da in aps or bssid in aps: continue # Legit traffic, skip # Otherwise, these are "ghost candidates" for mac in (sa, da): if mac and mac != 'ff:ff:ff:ff:ff:ff': ghost_candidates.add(mac) except Exception: continue return len(ghost_candidates) def _cisco_avg_clients(self, ssid): if ssid in self.cisco_ssid_clients: return round(mean(self.cisco_ssid_clients[ssid]), 2) return 0 def _cisco_max_clients(self, ssid): if ssid in self.cisco_ssid_clients: return max(self.cisco_ssid_clients[ssid]) return 0 def _channel_of_bssid(self, bssid): for channel, bssids in self.channel_to_aps.items(): if bssid in bssids: return channel return None