Refactor client tracking to use defaultdict for improved counting; update metrics to reflect active clients per AP.

This commit is contained in:
Yaro Kasear 2025-05-01 10:15:40 -05:00
parent d24157ba52
commit c12bdb80e3

View file

@ -13,12 +13,12 @@ from argparse import ArgumentParser
# === Globals === # === Globals ===
running = True running = True
packet_count = 0 packet_count = 0
clients = set() clients = defaultdict(int)
aps = set() aps = set()
ap_signals = defaultdict(list) # BSSID -> list of dBm ap_signals = defaultdict(list) # BSSID -> list of dBm
ssid_map = {} # BSSID -> SSID ssid_map = {} # BSSID -> SSID
ssid_signals = defaultdict(list) # SSID -> list of dBm ssid_signals = defaultdict(list) # SSID -> list of dBm
ap_clients = defaultdict(set) ap_clients = defaultdict(int)
target_ap_bssid = None target_ap_bssid = None
# === Signal handling === # === Signal handling ===
@ -72,11 +72,8 @@ def handle_packet(pkt):
return return
dot11 = pkt[Dot11] dot11 = pkt[Dot11]
a1 = dot11.addr1.lower() a1 = dot11.addr1.lower() if dot11.addr1 else None
try: a2 = dot11.addr2.lower() if dot11.addr2 else None
a2 = dot11.addr2.lower()
except:
a2 = None
# === Detect APs via beacons/probe responses === # === Detect APs via beacons/probe responses ===
if dot11.type == 0 and dot11.subtype in (5, 8): # Probe Response or Beacon if dot11.type == 0 and dot11.subtype in (5, 8): # Probe Response or Beacon
@ -88,10 +85,9 @@ def handle_packet(pkt):
# === Track all seen clients === # === Track all seen clients ===
if is_unicast(a1) and a1 not in aps: if is_unicast(a1) and a1 not in aps:
clients.add(a1) clients[a1] += 1
if is_unicast(a2) and a2 not in aps: if is_unicast(a2) and a2 not in aps:
clients.add(a2) clients[a2] += 1
# === Guess client <-> AP relationships === # === Guess client <-> AP relationships ===
if a1 in aps and a2: if a1 in aps and a2:
@ -105,7 +101,7 @@ def handle_packet(pkt):
if target_ap_bssid in (a1.lower(), a2.lower()): if target_ap_bssid in (a1.lower(), a2.lower()):
peer = a2 if a1 == target_ap_bssid else a1 peer = a2 if a1 == target_ap_bssid else a1
if is_unicast(peer) and peer not in aps: if is_unicast(peer) and peer not in aps:
ap_clients[target_ap_bssid].add(peer) ap_clients[target_ap_bssid][peer] += 1
# === Signal strength tracking === # === Signal strength tracking ===
try: try:
@ -123,14 +119,14 @@ def write_csv(outfile):
timestamp = datetime.utcnow().isoformat() timestamp = datetime.utcnow().isoformat()
row = { row = {
"Timestamp": timestamp, "Timestamp": timestamp,
"ClientsOnChannel": len(clients), "ClientsOnChannel": len([mac for mac, count in clients.items() if count > 3]),
"APsOnChannel": len(aps), "APsOnChannel": len(aps),
"PacketCount": packet_count, "PacketCount": packet_count,
"AvgAPSignal": round(sum([sum(v)/len(v) for v in ap_signals.values() if v]) / len(ap_signals) if ap_signals else 0, 2), "AvgAPSignal": round(sum([sum(v)/len(v) for v in ap_signals.values() if v]) / len(ap_signals) if ap_signals else 0, 2),
"StrongestAPSignal": max([max(v) for v in ap_signals.values() if v], default=0), "StrongestAPSignal": max([max(v) for v in ap_signals.values() if v], default=0),
"AvgSSIDSignal": round(sum([sum(v)/len(v) for v in ssid_signals.values() if v]) / len(ssid_signals) if ssid_signals else 0, 2), "AvgSSIDSignal": round(sum([sum(v)/len(v) for v in ssid_signals.values() if v]) / len(ssid_signals) if ssid_signals else 0, 2),
"MaxSSIDSignal": max([max(v) for v in ssid_signals.values() if v], default=0), "MaxSSIDSignal": max([max(v) for v in ssid_signals.values() if v], default=0),
"ClientsOnAP": sum(len(s) for s in ap_clients.values()), "ClientsOnAP": len([mac for mac, count in ap_clients[target_ap_bssid].items() if count > 3]),
"CiscoAvgReportedClients": "N/A", "CiscoAvgReportedClients": "N/A",
"CiscoMaxReportedClients": "N/A", "CiscoMaxReportedClients": "N/A",
"NumberofBSSIDsOnSSID": "N/A", "NumberofBSSIDsOnSSID": "N/A",