292 lines
11 KiB
Python
292 lines
11 KiB
Python
from collections import defaultdict
|
|
from statistics import mean
|
|
from enrichment.utils import get_channel_from_freq
|
|
|
|
class IndexedCapture:
|
|
def __init__(self, pcap_path):
|
|
self.pcap_path = pcap_path
|
|
self.all_packets = []
|
|
self.time_index = [] # List of (timestamp, packet)
|
|
self.bssid_to_ssid = {} # BSSID → SSID
|
|
self.ssid_to_bssids = defaultdict(set) # SSID → {BSSIDs}
|
|
self.ssid_hidden_status = {} # SSID → hidden True/False
|
|
self.ssid_encryption_status = {} # SSID → open/encrypted
|
|
self.ssid_signals = defaultdict(list) # SSID → list of dBm values
|
|
self.ssid_packet_counts = defaultdict(int) # SSID → number of packets
|
|
self.ssid_clients = defaultdict(set) # SSID → MAC addresses
|
|
self.channel_to_aps = defaultdict(set) # Channel → BSSID
|
|
self.channel_to_clients = defaultdict(set) # Channel → client MACs
|
|
self.packet_signals_by_channel = defaultdict(list) # Channel → dBm signals
|
|
self.packet_timestamps = [] # List of packet timestamps (for quick windowing)
|
|
|
|
self._load_and_index()
|
|
|
|
def _load_and_index(self):
|
|
import pyshark
|
|
|
|
capture = pyshark.FileCapture(
|
|
self.pcap_path,
|
|
use_json=True,
|
|
include_raw=False,
|
|
keep_packets=False,
|
|
display_filter="(wlan.fc.type_subtype == 8 || wlan.fc.type_subtype == 5 || wlan.fc.type == 2) && (wlan.bssid || wlan.sa || wlan.da)"
|
|
)
|
|
|
|
for packet in capture:
|
|
try:
|
|
ts = float(packet.frame_info.time_epoch)
|
|
self.time_index.append((ts, packet))
|
|
self.packet_timestamps.append(ts)
|
|
|
|
if 'radiotap' not in packet or 'wlan' not in packet:
|
|
continue
|
|
|
|
radio = packet.radiotap
|
|
wlan = packet.wlan
|
|
|
|
if not hasattr(radio, 'channel') or not hasattr(radio.channel, 'freq'):
|
|
continue
|
|
|
|
freq = int(radio.channel.freq)
|
|
channel = get_channel_from_freq(freq)
|
|
|
|
subtype = int(getattr(wlan, 'type_subtype', '0'), 16)
|
|
|
|
# Management Frames: Beacon / Probe Response
|
|
if subtype in (5, 8):
|
|
self._process_management_frame(packet, wlan, radio, channel)
|
|
|
|
# Track clients on this channel
|
|
sa = getattr(wlan, 'sa', '').lower()
|
|
da = getattr(wlan, 'da', '').lower()
|
|
bssid = getattr(wlan, 'bssid', '').lower()
|
|
|
|
for mac in (sa, da):
|
|
if mac and mac != 'ff:ff:ff:ff:ff:ff':
|
|
self.channel_to_clients[channel].add(mac)
|
|
|
|
except Exception as e:
|
|
continue
|
|
|
|
capture.close()
|
|
|
|
def _process_management_frame(self, packet, wlan, radio, channel):
|
|
try:
|
|
mgt = packet.get_multiple_layers('wlan.mgt')[0]
|
|
tags = mgt._all_fields.get('wlan.tagged.all', {}).get('wlan.tag', [])
|
|
|
|
ssid = None
|
|
hidden_ssid = False
|
|
num_clients = None
|
|
|
|
privacy_bit = mgt._all_fields.get('wlan_mgt.fixed.capabilities.privacy')
|
|
is_open = (str(privacy_bit) != '1')
|
|
|
|
# First: extract SSID client info independently
|
|
for tag in tags:
|
|
tag_number = tag.get('wlan.tag.number')
|
|
|
|
if tag_number == '0':
|
|
raw_ssid = tag.get('wlan.ssid', '')
|
|
if not raw_ssid:
|
|
hidden_ssid = True
|
|
ssid = '<hidden>'
|
|
else:
|
|
try:
|
|
ssid_bytes = bytes.fromhex(raw_ssid.replace(':', ''))
|
|
ssid = ssid_bytes.decode('utf-8', errors='replace')
|
|
except Exception:
|
|
ssid = None
|
|
|
|
# Now that we have them parsed, update structures
|
|
if ssid:
|
|
self.ssid_hidden_status[ssid] = hidden_ssid
|
|
self.ssid_encryption_status.setdefault(ssid, is_open)
|
|
self.ssid_packet_counts[ssid] += 1
|
|
|
|
if num_clients is not None:
|
|
timestamp = float(packet.frame_info.time_epoch)
|
|
|
|
bssid = getattr(wlan, 'bssid', '').lower()
|
|
if not bssid or bssid == 'ff:ff:ff:ff:ff:ff':
|
|
return
|
|
|
|
if ssid:
|
|
self.bssid_to_ssid[bssid] = ssid
|
|
self.ssid_to_bssids[ssid].add(bssid)
|
|
|
|
signal = getattr(radio, 'dbm_antsignal', None)
|
|
if signal:
|
|
signal = int(signal)
|
|
if ssid:
|
|
self.ssid_signals[ssid].append(signal)
|
|
self.packet_signals_by_channel[channel].append(signal)
|
|
|
|
self.channel_to_aps[channel].add(bssid)
|
|
|
|
except Exception as e:
|
|
pass # (silently drop malformed packets)
|
|
|
|
def get_packets_in_time_range(self, start_ts, end_ts):
|
|
# This is fast because packet timestamps were recorded at load
|
|
return [
|
|
packet for ts, packet in self.time_index
|
|
if start_ts <= ts <= end_ts
|
|
]
|
|
|
|
def query_metrics(self, start_ts, end_ts, ap_bssid, ap_channel):
|
|
ap_channel = int(ap_channel)
|
|
assert isinstance(ap_channel, int), f"ap_channel must be int, got {type(ap_channel)}"
|
|
|
|
packets = self.get_packets_in_time_range(start_ts, end_ts)
|
|
|
|
# Build local windowed structures
|
|
window_clients = defaultdict(set)
|
|
window_aps = defaultdict(set)
|
|
window_signals = defaultdict(list)
|
|
window_ssid_to_bssids = defaultdict(set)
|
|
window_ssid_signals = defaultdict(list)
|
|
|
|
|
|
for packet in packets:
|
|
try:
|
|
if 'radiotap' not in packet or 'wlan' not in packet:
|
|
continue
|
|
|
|
wlan = packet.wlan
|
|
radio = packet.radiotap
|
|
|
|
sa = getattr(wlan, 'sa', '').lower()
|
|
da = getattr(wlan, 'da', '').lower()
|
|
bssid = getattr(wlan, 'bssid', '').lower()
|
|
|
|
if hasattr(radio, 'channel') and hasattr(radio.channel, 'freq'):
|
|
freq = int(radio.channel.freq)
|
|
channel = get_channel_from_freq(freq)
|
|
else:
|
|
continue
|
|
|
|
# Determine subtype
|
|
subtype = int(getattr(wlan, 'type_subtype', '0'), 16)
|
|
|
|
# Track APs (beacons / probe responses only)
|
|
if subtype in (5, 8) and bssid:
|
|
window_aps[channel].add(bssid)
|
|
|
|
# Track clients (always)
|
|
for mac in (sa, da):
|
|
if mac and mac != 'ff:ff:ff:ff:ff:ff':
|
|
window_clients[channel].add(mac)
|
|
|
|
# Only beacon/probe have SSID
|
|
ssid = self.bssid_to_ssid.get(bssid)
|
|
if ssid:
|
|
window_ssid_to_bssids[ssid].add(bssid)
|
|
signal = getattr(radio, 'dbm_antsignal', None)
|
|
if signal:
|
|
window_ssid_signals[ssid].append(int(signal))
|
|
|
|
# Track signals
|
|
signal = getattr(radio, 'dbm_antsignal', None)
|
|
if signal:
|
|
window_signals[channel].append(int(signal))
|
|
|
|
except Exception:
|
|
continue
|
|
|
|
clients_on_ap = self._count_clients_on_ap(packets, ap_bssid)
|
|
clients_on_channel = len(window_clients.get(ap_channel, []))
|
|
aps_on_channel = len(window_aps.get(ap_channel, []))
|
|
|
|
avg_ap_signal = safe_mean(window_signals[ap_channel])
|
|
max_ap_signal = max(window_signals[ap_channel]) if window_signals.get(ap_channel) else 0
|
|
|
|
unlinked_devices = self._count_unlinked_devices(packets, ap_channel)
|
|
|
|
our_ssid = self.bssid_to_ssid.get(ap_bssid)
|
|
num_bssids = len(window_ssid_to_bssids.get(our_ssid, [])) if our_ssid else 0
|
|
avg_ssid_signal = safe_mean(window_ssid_signals.get(our_ssid, []))
|
|
max_ssid_signal = max(window_ssid_signals.get(our_ssid, [0]))
|
|
num_channels_ssid = len({self._channel_of_bssid(b) for b in window_ssid_to_bssids.get(our_ssid, [])}) if our_ssid else 0
|
|
packet_count = len(packets)
|
|
|
|
return (
|
|
clients_on_ap, clients_on_channel, aps_on_channel,
|
|
avg_ap_signal, max_ap_signal, unlinked_devices,
|
|
num_bssids, avg_ssid_signal, max_ssid_signal,
|
|
num_channels_ssid, packet_count
|
|
)
|
|
|
|
def _count_clients_on_ap(self, packets, ap_bssid):
|
|
clients = defaultdict(int)
|
|
ap_bssid = ap_bssid.lower()
|
|
|
|
for packet in packets:
|
|
try:
|
|
if not hasattr(packet, "wlan"):
|
|
continue
|
|
sa = getattr(packet.wlan, "sa", '').lower()
|
|
da = getattr(packet.wlan, "da", '').lower()
|
|
bssid = getattr(packet.wlan, "bssid", '').lower()
|
|
|
|
if bssid == ap_bssid or sa == ap_bssid or da == ap_bssid:
|
|
if sa == ap_bssid and da and da != ap_bssid and not da.startswith("ff:ff:ff:ff:ff:ff"):
|
|
clients[da] += 1
|
|
elif sa and sa != ap_bssid and not sa.startswith("ff:ff:ff:ff:ff:ff"):
|
|
clients[sa] += 1
|
|
except AttributeError:
|
|
continue
|
|
|
|
return len([mac for mac, count in clients.items() if count > 3])
|
|
|
|
def _calc_signal_stats(self, ap_channel):
|
|
signals = self.packet_signals_by_channel.get(ap_channel, [])
|
|
return (mean(signals), max(signals)) if signals else (0, 0)
|
|
|
|
def _count_unlinked_devices(self, packets, ap_channel):
|
|
aps = self.channel_to_aps.get(ap_channel, set())
|
|
ghost_candidates = set()
|
|
|
|
for packet in packets:
|
|
try:
|
|
if 'radiotap' not in packet or 'wlan' not in packet:
|
|
continue
|
|
|
|
radio = packet.radiotap
|
|
wlan = packet.wlan
|
|
|
|
# Must be on our AP's channel
|
|
if hasattr(radio, 'channel') and hasattr(radio.channel, 'freq'):
|
|
freq = int(radio.channel.freq)
|
|
packet_channel = get_channel_from_freq(freq)
|
|
if packet_channel != ap_channel:
|
|
continue
|
|
else:
|
|
continue
|
|
|
|
sa = getattr(wlan, 'sa', '').lower()
|
|
da = getattr(wlan, 'da', '').lower()
|
|
bssid = getattr(wlan, 'bssid', '').lower()
|
|
|
|
# If the packet is *talking to* any known AP, it's **linked**, not ghost
|
|
if sa in aps or da in aps or bssid in aps:
|
|
continue # Legit traffic, skip
|
|
|
|
# Otherwise, these are "ghost candidates"
|
|
for mac in (sa, da):
|
|
if mac and mac != 'ff:ff:ff:ff:ff:ff':
|
|
ghost_candidates.add(mac)
|
|
|
|
except Exception:
|
|
continue
|
|
|
|
return len(ghost_candidates)
|
|
|
|
def _channel_of_bssid(self, bssid):
|
|
for channel, bssids in self.channel_to_aps.items():
|
|
if bssid in bssids:
|
|
return channel
|
|
return None
|
|
|
|
def safe_mean(data):
|
|
return mean(data) if data else 0
|