126 lines
3.5 KiB
Python
126 lines
3.5 KiB
Python
# enrichment/metrics_signals.py
|
|
|
|
from statistics import mean
|
|
from enrichment.utils import get_channel_from_freq
|
|
from enrichment.metrics_clients import get_clients_on_channel # in case you want to consolidate later
|
|
|
|
def get_aps(capture, ap_channel):
|
|
try:
|
|
ap_channel = int(ap_channel)
|
|
except ValueError:
|
|
print(f"[!] Could not parse channel number: {ap_channel}")
|
|
return 0
|
|
|
|
aps = set()
|
|
|
|
for packet in capture:
|
|
try:
|
|
if 'radiotap' not in packet or 'wlan' not in packet:
|
|
continue
|
|
|
|
radio = packet.radiotap
|
|
wlan = packet.wlan
|
|
|
|
if not hasattr(radio, 'channel') or not hasattr(radio.channel, 'freq'):
|
|
continue
|
|
|
|
packet_freq = int(radio.channel.freq)
|
|
packet_channel = get_channel_from_freq(packet_freq)
|
|
|
|
if packet_channel != ap_channel:
|
|
continue
|
|
|
|
ts_hex = getattr(wlan, 'type_subtype', None)
|
|
if ts_hex is None:
|
|
continue
|
|
|
|
ts = int(ts_hex, 16)
|
|
if ts not in (5, 8): # Beacon/Probe
|
|
continue
|
|
|
|
bssid = getattr(wlan, 'bssid', '').lower()
|
|
if bssid and bssid != 'ff:ff:ff:ff:ff:ff':
|
|
aps.add(bssid)
|
|
|
|
except Exception:
|
|
continue
|
|
|
|
return aps
|
|
|
|
|
|
def get_aps_on_channel(capture, ap_channel):
|
|
return len(get_aps(capture, ap_channel))
|
|
|
|
|
|
def calculate_signal_strength_stats(capture, ap_channel):
|
|
try:
|
|
ap_channel = int(ap_channel)
|
|
except ValueError:
|
|
print(f"[!] Could not parse channel number: {ap_channel}")
|
|
return 0, 0
|
|
|
|
ap_signals = []
|
|
for packet in capture:
|
|
try:
|
|
if 'radiotap' not in packet or 'wlan' not in packet:
|
|
continue
|
|
|
|
radio = packet.radiotap
|
|
wlan = packet.wlan
|
|
|
|
if not hasattr(radio, 'channel') or not hasattr(radio.channel, 'freq'):
|
|
continue
|
|
|
|
packet_freq = int(radio.channel.freq)
|
|
packet_channel = get_channel_from_freq(packet_freq)
|
|
|
|
if packet_channel != ap_channel:
|
|
continue
|
|
|
|
ts_hex = getattr(wlan, 'type_subtype', None)
|
|
if ts_hex is None:
|
|
continue
|
|
|
|
ts = int(ts_hex, 16)
|
|
if ts not in (5, 8): # Beacon or Probe
|
|
continue
|
|
|
|
signal_strength = getattr(radio, 'dbm_antsignal', None)
|
|
if signal_strength is not None:
|
|
ap_signals.append(int(signal_strength))
|
|
|
|
except Exception:
|
|
continue
|
|
|
|
return (mean(ap_signals), max(ap_signals)) if ap_signals else (0, 0)
|
|
|
|
|
|
def get_unlinked_devices(capture, ap_channel):
|
|
aps = get_aps(capture, ap_channel)
|
|
ghost_clients = set()
|
|
|
|
for packet in capture:
|
|
try:
|
|
if 'radiotap' not in packet or 'wlan' not in packet:
|
|
continue
|
|
|
|
radio = packet.radiotap
|
|
wlan = packet.wlan
|
|
|
|
if not hasattr(radio, 'channel') or not hasattr(radio.channel, 'freq'):
|
|
continue
|
|
|
|
packet_freq = int(radio.channel.freq)
|
|
packet_channel = get_channel_from_freq(packet_freq)
|
|
if packet_channel != ap_channel:
|
|
continue
|
|
|
|
for mac in (getattr(wlan, 'sa', ''), getattr(wlan, 'da', '')):
|
|
mac = mac.lower()
|
|
if mac and mac != 'ff:ff:ff:ff:ff:ff' and mac not in aps:
|
|
ghost_clients.add(mac)
|
|
|
|
except Exception:
|
|
continue
|
|
|
|
return len(ghost_clients)
|