97 lines
3.2 KiB
Python
97 lines
3.2 KiB
Python
from statistics import mean
|
|
from collections import defaultdict
|
|
from enrichment.utils import get_channel_from_freq
|
|
|
|
def extract_ssid_metrics(packets):
|
|
bssid_to_ssid = {}
|
|
ssid_to_bssids = defaultdict(set)
|
|
ssid_hidden_status = {}
|
|
ssid_encryption_status = {}
|
|
cisco_ssid_clients = defaultdict(list)
|
|
cisco_reported_clients = []
|
|
ssid_signals = defaultdict(list)
|
|
ssid_packet_counts = defaultdict(int)
|
|
|
|
for packet in packets:
|
|
try:
|
|
if 'radiotap' not in packet or 'wlan' not in packet:
|
|
continue
|
|
|
|
radio = packet.radiotap
|
|
wlan = packet.wlan
|
|
|
|
if not hasattr(radio.channel, 'freq'):
|
|
continue
|
|
|
|
packet_freq = int(radio.channel.freq)
|
|
get_channel_from_freq(packet_freq) # validate channel, or skip
|
|
|
|
subtype = int(getattr(wlan, 'type_subtype', 0), 16)
|
|
if subtype not in (5, 8): # Beacon or Probe Response
|
|
continue
|
|
|
|
try:
|
|
mgt = packet.get_multiple_layers('wlan.mgt')[0]
|
|
tags = mgt._all_fields.get('wlan.tagged.all', {}).get('wlan.tag', [])
|
|
except Exception:
|
|
continue
|
|
|
|
ssid = None
|
|
hidden_ssid = False
|
|
|
|
privacy_bit = mgt._all_fields.get('wlan_mgt.fixed.capabilities.privacy')
|
|
is_open = (str(privacy_bit) != '1')
|
|
|
|
for tag in tags:
|
|
tag_number = tag.get('wlan.tag.number')
|
|
if tag_number == '0':
|
|
raw_ssid = tag.get('wlan.ssid', '')
|
|
if not raw_ssid:
|
|
hidden_ssid = True
|
|
ssid = '<hidden>'
|
|
else:
|
|
try:
|
|
ssid_bytes = bytes.fromhex(raw_ssid.replace(':', ''))
|
|
ssid = ssid_bytes.decode('utf-8', errors='replace')
|
|
except Exception:
|
|
ssid = None
|
|
if tag_number == '133':
|
|
try:
|
|
num_clients = int(tag.get('wlan.cisco.ccx1.clients'))
|
|
if ssid:
|
|
cisco_ssid_clients[ssid].append(num_clients)
|
|
cisco_reported_clients.append(num_clients)
|
|
except (TypeError, ValueError):
|
|
pass
|
|
|
|
if not ssid:
|
|
continue
|
|
|
|
ssid_hidden_status[ssid] = hidden_ssid
|
|
ssid_encryption_status.setdefault(ssid, is_open)
|
|
ssid_packet_counts[ssid] += 1
|
|
|
|
bssid = getattr(wlan, 'bssid', '').lower()
|
|
if not bssid or bssid == 'ff:ff:ff:ff:ff:ff':
|
|
continue
|
|
|
|
bssid_to_ssid[bssid] = ssid
|
|
ssid_to_bssids[ssid].add(bssid)
|
|
|
|
signal = getattr(radio, 'dbm_antsignal', None)
|
|
if signal:
|
|
ssid_signals[ssid].append(int(signal))
|
|
|
|
except Exception:
|
|
continue
|
|
|
|
return (
|
|
bssid_to_ssid,
|
|
ssid_to_bssids,
|
|
ssid_hidden_status,
|
|
ssid_encryption_status,
|
|
ssid_signals,
|
|
cisco_ssid_clients,
|
|
cisco_reported_clients,
|
|
ssid_packet_counts
|
|
)
|