Refactor packet handling to improve client tracking and AP relationship mapping; add function to get connected BSSID.

This commit is contained in:
Yaro Kasear 2025-05-01 10:03:39 -05:00
parent 2da759bf39
commit 4517f0cc95

View file

@ -19,6 +19,7 @@ ap_signals = defaultdict(list) # BSSID -> list of dBm
ssid_map = {} # BSSID -> SSID ssid_map = {} # BSSID -> SSID
ssid_signals = defaultdict(list) # SSID -> list of dBm ssid_signals = defaultdict(list) # SSID -> list of dBm
ap_clients = defaultdict(set) ap_clients = defaultdict(set)
target_ap_bssid = None
# === Signal handling === # === Signal handling ===
def stop_sniff(signum, frame): def stop_sniff(signum, frame):
@ -58,6 +59,11 @@ def parse_ssid(pkt):
return None return None
return None return None
def is_unicast(mac):
if not mac:
return False
return int(mac.split(":")[0], 16) % 2 == 0 and mac.lower() != "ff:ff:ff:ff:ff:ff"
def handle_packet(pkt): def handle_packet(pkt):
global packet_count global packet_count
packet_count += 1 packet_count += 1
@ -66,33 +72,44 @@ def handle_packet(pkt):
return return
dot11 = pkt[Dot11] dot11 = pkt[Dot11]
a1 = dot11.addr1.lower()
a2 = dot11.addr2.lower()
# === Detect APs via beacons/probe responses === # === Detect APs via beacons/probe responses ===
if dot11.type == 0 and dot11.subtype in (5, 8): # Probe Response or Beacon if dot11.type == 0 and dot11.subtype in (5, 8): # Probe Response or Beacon
if dot11.addr2: if a2:
aps.add(dot11.addr2) aps.add(a2)
ssid = parse_ssid(pkt) ssid = parse_ssid(pkt)
if ssid: if ssid:
ssid_map[dot11.addr2] = ssid ssid_map[a2] = ssid
# === Track all seen clients === # === Track all seen clients ===
if dot11.addr1: if is_unicast(a1) and a1 not in aps:
clients.add(dot11.addr1) clients.add(a1)
if dot11.addr2:
clients.add(dot11.addr2) if is_unicast(a2) and a2 not in aps:
clients.add(a2)
# === Guess client <-> AP relationships === # === Guess client <-> AP relationships ===
if dot11.addr1 in aps and dot11.addr2: if a1 in aps and a2:
ap_clients[dot11.addr1].add(dot11.addr2) ap_clients[a1].add(a2)
elif dot11.addr2 in aps and dot11.addr1: elif a2 in aps and a1:
ap_clients[dot11.addr2].add(dot11.addr1) ap_clients[a2].add(a1)
# Track clients talking to the same AP we're connected to
if target_ap_bssid:
if a1 and a2:
if target_ap_bssid in (a1.lower(), a2.lower()):
peer = a2 if a1 == target_ap_bssid else a1
if is_unicast(peer) and peer not in aps:
ap_clients[target_ap_bssid].add(peer)
# === Signal strength tracking === # === Signal strength tracking ===
try: try:
signal_dbm = pkt[RadioTap].dBm_AntSignal signal_dbm = pkt[RadioTap].dBm_AntSignal
if dot11.addr2 in aps: if a2 in aps:
ap_signals[dot11.addr2].append(signal_dbm) ap_signals[a2].append(signal_dbm)
ssid = ssid_map.get(dot11.addr2) ssid = ssid_map.get(a2)
if ssid: if ssid:
ssid_signals[ssid].append(signal_dbm) ssid_signals[ssid].append(signal_dbm)
except: except:
@ -138,6 +155,17 @@ def reset_interface(interface):
except Exception as e: except Exception as e:
print(f"[!] Failed to reset interface {interface}: {e}") print(f"[!] Failed to reset interface {interface}: {e}")
def get_connected_bssid(interface):
try:
out = subprocess.check_output(["iw", interface, "link"]).decode()
for line in out.splitlines():
if "Connected to" in line:
return line.strip().split()[-1].lower()
except Exception as e:
print(f"[!] Failed to get connected BSSID for {interface}: {e}")
return None
# === Main === # === Main ===
def main(): def main():
parser = ArgumentParser() parser = ArgumentParser()
@ -160,6 +188,13 @@ def main():
print("[!] Could not determine current channel. Exiting.") print("[!] Could not determine current channel. Exiting.")
sys.exit(1) sys.exit(1)
global target_ap_bssid
target_ap_bssid = get_connected_bssid(args.main_iface)
if not target_ap_bssid:
print("[!] Could not determine connected BSSID. ClientsOnAP will be 0.")
else:
print(f"[+] Connected BSSID (target AP): {target_ap_bssid}")
print("[+] Sniffing... (waiting for SIGINT to stop)") print("[+] Sniffing... (waiting for SIGINT to stop)")
while running: while running: