Implement get_aps and get_unlinked_devices functions; refactor get_aps_on_channel for improved clarity and functionality

This commit is contained in:
Yaro Kasear 2025-04-15 10:45:38 -05:00
parent 2abcc1e5b4
commit 6f83109579

139
enrich.py
View file

@ -62,6 +62,58 @@ def get_channel_from_freq(freq):
def get_freq_details(channel): def get_freq_details(channel):
return CHANNEL_LOOKUP_TABLE.get(channel, None) return CHANNEL_LOOKUP_TABLE.get(channel, None)
def get_aps(capture, ap_channel):
try:
ap_channel = int(ap_channel)
except ValueError:
print(f"[!] Could not parse channel number: {ap_channel}")
return 0
aps = set()
ghost_clients = set()
for packet in capture:
try:
if 'radiotap' not in packet or 'wlan' not in packet:
continue
radio = packet.radiotap
wlan = packet.wlan
if not hasattr(radio, 'channel') or not hasattr(radio.channel, 'freq'):
continue
packet_freq = int(radio.channel.freq)
packet_channel = get_channel_from_freq(packet_freq)
# For debugging purposes, print the channel and frequency
# print(f"Packet Channel: {packet_channel}, Frequency: {packet_freq} MHz")
if packet_channel != ap_channel:
continue
# Check for beacon or probe response
ts_hex = getattr(wlan, 'type_subtype', None)
if ts_hex is None:
continue
ts = int(ts_hex, 16)
if ts not in (5, 8): # Probe Response or Beacon
continue
# Grab BSSID
bssid = getattr(wlan, 'bssid', '').lower()
if bssid and bssid != 'ff:ff:ff:ff:ff:ff':
aps.add(bssid)
print(aps)
except Exception as e:
print(f"[DEBUG] Packet parse error: {e}")
continue
return aps
def parse_args(): def parse_args():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
@ -150,54 +202,7 @@ def get_clients_on_channel(capture, ap_channel, ap_bssid):
return len(clients) return len(clients)
def get_aps_on_channel(capture, ap_channel): def get_aps_on_channel(capture, ap_channel):
try: return len(get_aps_on_channel(capture, ap_channel))
ap_channel = int(ap_channel)
except ValueError:
print(f"[!] Could not parse channel number: {ap_channel}")
return 0
aps = set()
for packet in capture:
try:
if 'radiotap' not in packet or 'wlan' not in packet:
continue
radio = packet.radiotap
wlan = packet.wlan
if not hasattr(radio, 'channel') or not hasattr(radio.channel, 'freq'):
continue
packet_freq = int(radio.channel.freq)
packet_channel = get_channel_from_freq(packet_freq)
# For debugging purposes, print the channel and frequency
# print(f"Packet Channel: {packet_channel}, Frequency: {packet_freq} MHz")
if packet_channel != ap_channel:
continue
# Check for beacon or probe response
ts_hex = getattr(wlan, 'type_subtype', None)
if ts_hex is None:
continue
ts = int(ts_hex, 16)
if ts not in (5, 8): # Probe Response or Beacon
continue
# Grab BSSID
bssid = getattr(wlan, 'bssid', '').lower()
if bssid and bssid != 'ff:ff:ff:ff:ff:ff':
aps.add(bssid)
except Exception as e:
print(f"[DEBUG] Packet parse error: {e}")
continue
return len(aps)
def calculate_signal_strength_stats(capture, ap_channel): def calculate_signal_strength_stats(capture, ap_channel):
try: try:
@ -247,6 +252,36 @@ def calculate_signal_strength_stats(capture, ap_channel):
else: else:
return 0, 0 return 0, 0
def get_unlinked_devices(capture, ap_channel):
aps = get_aps(capture, ap_channel)
ghost_clients = set()
for packet in capture:
if 'radiotap' not in packet or 'wlan' not in packet:
continue
radio = packet.radiotap
wlan = packet.wlan
if not hasattr(radio, 'channel') or not hasattr(radio.channel, 'freq'):
continue
packet_freq = int(radio.channel.freq)
packet_channel = get_channel_from_freq(packet_freq)
if packet_channel != ap_channel:
continue
for mac in (getattr(wlan, 'sa', ''), getattr(wlan, 'da', '')):
mac = mac.lower()
if (
mac
and mac != 'ff:ff:ff:ff:ff:ff'
and mac not in aps
):
ghost_clients.add(mac)
return len(ghost_clients)
def analyze_pcap(pcapng_path, start_ts, end_ts, ap_bssid, ap_channel): def analyze_pcap(pcapng_path, start_ts, end_ts, ap_bssid, ap_channel):
cap = pyshark.FileCapture( cap = pyshark.FileCapture(
pcapng_path, pcapng_path,
@ -262,6 +297,8 @@ def analyze_pcap(pcapng_path, start_ts, end_ts, ap_bssid, ap_channel):
avg_ap_signal = 0 avg_ap_signal = 0
max_ap_signal = 0 max_ap_signal = 0
unlinked_devices = 0
try: try:
# Filter packets manually by timestamp # Filter packets manually by timestamp
filtered_packets = [] filtered_packets = []
@ -279,17 +316,15 @@ def analyze_pcap(pcapng_path, start_ts, end_ts, ap_bssid, ap_channel):
# Placeholder: Logic will be added for: # Placeholder: Logic will be added for:
# - CongestionScore # - CongestionScore
# - AvgAPSignal
avg_ap_signal, max_ap_signal = calculate_signal_strength_stats(filtered_packets, ap_channel) avg_ap_signal, max_ap_signal = calculate_signal_strength_stats(filtered_packets, ap_channel)
# - StrongestAPSignal unlinked_devices = get_unlinked_devices(filtered_packets, ap_channel)
# - UnlinkedDevices
finally: finally:
cap.close() cap.close()
return clients_on_ap, clients_on_channel, aps_on_channel, None, avg_ap_signal, max_ap_signal, 0 return clients_on_ap, clients_on_channel, aps_on_channel, None, avg_ap_signal, max_ap_signal, unlinked_devices
def main(): def main():