from statistics import mean from collections import defaultdict from enrichment.utils import get_channel_from_freq def extract_ssid_metrics(packets): ssid_clients = defaultdict(set) bssid_to_ssid = {} ssid_to_bssids = defaultdict(set) ssid_hidden_status = {} ssid_encryption_status = {} ssid_signals = defaultdict(list) ssid_packet_counts = defaultdict(int) for packet in packets: try: if 'radiotap' not in packet or 'wlan' not in packet: continue radio = packet.radiotap wlan = packet.wlan if not hasattr(radio.channel, 'freq'): continue packet_freq = int(radio.channel.freq) get_channel_from_freq(packet_freq) # Validate channel or skip subtype = int(getattr(wlan, 'type_subtype', 0), 16) # --- EXTRACT SSID from Management Frames --- if subtype in (5, 8): # Beacon or Probe Response try: mgt = packet.get_multiple_layers('wlan.mgt')[0] tags = mgt._all_fields.get('wlan.tagged.all', {}).get('wlan.tag', []) except Exception: continue ssid = None hidden_ssid = False privacy_bit = mgt._all_fields.get('wlan_mgt.fixed.capabilities.privacy') is_open = (str(privacy_bit) != '1') for tag in tags: tag_number = tag.get('wlan.tag.number') if tag_number == '0': raw_ssid = tag.get('wlan.ssid', '') if not raw_ssid: hidden_ssid = True ssid = '' else: try: ssid_bytes = bytes.fromhex(raw_ssid.replace(':', '')) ssid = ssid_bytes.decode('utf-8', errors='replace') except Exception: ssid = None if not ssid: continue ssid_hidden_status[ssid] = hidden_ssid ssid_encryption_status.setdefault(ssid, is_open) ssid_packet_counts[ssid] += 1 bssid = getattr(wlan, 'bssid', '').lower() if not bssid or bssid == 'ff:ff:ff:ff:ff:ff': continue bssid_to_ssid[bssid] = ssid ssid_to_bssids[ssid].add(bssid) signal = getattr(radio, 'dbm_antsignal', None) if signal: ssid_signals[ssid].append(int(signal)) # --- CAPTURE CLIENTS on any frames --- # This runs even on non-beacon frames bssid = getattr(wlan, 'bssid', '').lower() sa = getattr(wlan, 'sa', '').lower() da = getattr(wlan, 'da', '').lower() if bssid in bssid_to_ssid: ssid = bssid_to_ssid[bssid] for mac in (sa, da): if mac and mac != "ff:ff:ff:ff:ff:ff" and mac != bssid: ssid_clients[ssid].add(mac) except Exception: continue return ( bssid_to_ssid, ssid_to_bssids, ssid_hidden_status, ssid_encryption_status, ssid_signals, ssid_packet_counts, ssid_clients )