wifi_test/listener.py

227 lines
7.7 KiB
Python
Executable file

#!/usr/bin/env python3
import csv
import os
import re
import signal
import subprocess
import sys
import time
from argparse import ArgumentParser
from collections import defaultdict
from datetime import datetime
from scapy.all import sniff, Dot11, RadioTap
# === Globals ===
running = True
packet_count = 0
clients = defaultdict(int)
aps = set()
ap_signals = defaultdict(list) # BSSID -> list of dBm
ssid_map = {} # BSSID -> SSID
ssid_signals = defaultdict(list) # SSID -> list of dBm
ap_clients = defaultdict(lambda: defaultdict(int))
target_ap_bssid = None
beacon_counts = defaultdict(int)
current_channel = None
# === Signal handling ===
def stop_sniff(signum, frame):
global running
print("[!] SIGINT received, stopping sniffing and writing metrics.")
running = False
signal.signal(signal.SIGINT, stop_sniff)
# === Channel syncing ===
def get_current_channel(interface):
try:
output = subprocess.check_output(["iw", "dev", interface, "info"]).decode()
for line in output.splitlines():
if "channel" in line.lower():
return int(line.strip().split()[1])
except Exception as e:
print(f"[!] Failed to get channel for {interface}: {e}")
return None
def set_monitor_channel(interface, channel):
try:
subprocess.check_call(["iw", "dev", interface, "set", "channel", str(channel)])
print(f"[+] Set {interface} to channel {channel}")
except subprocess.CalledProcessError as e:
print(f"[!] Failed to set {interface} to channel {channel}: {e}")
# === Packet handling ===
def parse_ssid(pkt):
try:
elt = pkt.getlayer(Dot11).payload
while elt:
if hasattr(elt, "ID") and elt.ID == 0: # SSID tag
return elt.info.decode(errors="ignore")
elt = elt.payload
except:
return None
return None
def is_unicast(mac):
if not mac:
return False
return int(mac.split(":")[0], 16) % 2 == 0 and mac.lower() != "ff:ff:ff:ff:ff:ff"
def handle_packet(pkt):
global packet_count
packet_count += 1
if not pkt.haslayer(Dot11) or not pkt.haslayer(RadioTap):
return
dot11 = pkt[Dot11]
a1 = dot11.addr1.lower() if dot11.addr1 else None
a2 = dot11.addr2.lower() if dot11.addr2 else None
# === Detect APs via beacon frames only ===
if dot11.type == 0 and dot11.subtype == 8:
if a2 and is_unicast(a2):
beacon_counts[a2] += 1
if beacon_counts[a2] > 1:
aps.add(a2)
ssid = parse_ssid(pkt)
if ssid:
ssid_map[a2] = ssid
# === Track all seen clients ===
if dot11.type == 2:
sa = dot11.addr2.lower() if dot11.addr2 else None
da = dot11.addr1.lower() if dot11.addr1 else None
for mac in (sa, da):
if is_unicast(mac) and mac != target_ap_bssid:
clients[mac] += 1
# Track clients talking to the same AP we're connected to
if target_ap_bssid:
if a1 and a2:
if target_ap_bssid in (a1, a2):
peer = a2 if a1 == target_ap_bssid else a1
if is_unicast(peer) and peer not in aps:
ap_clients[target_ap_bssid][peer] += 1
# === Signal strength tracking ===
try:
signal_dbm = pkt[RadioTap].dBm_AntSignal
if a2 in aps:
ap_signals[a2].append(signal_dbm)
ssid = ssid_map.get(a2)
if ssid:
ssid_signals[ssid].append(signal_dbm)
except:
pass
# === Metrics dump ===
def write_csv(outfile):
timestamp = datetime.utcnow().isoformat()
row = {
"Timestamp": timestamp,
"ClientsOnChannel": len([mac for mac, count in clients.items() if count > 3]),
"APsOnChannel": len(aps),
"PacketCount": packet_count,
"AvgAPSignal": round(sum([sum(v)/len(v) for v in ap_signals.values() if v]) / len(ap_signals) if ap_signals else 0, 2),
"StrongestAPSignal": max([max(v) for v in ap_signals.values() if v], default=0),
"AvgSSIDSignal": round(sum([sum(v)/len(v) for v in ssid_signals.values() if v]) / len(ssid_signals) if ssid_signals else 0, 2),
"MaxSSIDSignal": max([max(v) for v in ssid_signals.values() if v], default=0),
"ClientsOnAP": len([mac for mac, count in ap_clients[target_ap_bssid].items() if count > 3]),
"CiscoAvgReportedClients": "N/A",
"CiscoMaxReportedClients": "N/A",
"NumberofBSSIDsOnSSID": "N/A",
"NumberofChannelsOnSSID": "N/A"
}
new_file = not os.path.exists(outfile)
with open(outfile, "a", newline="") as f:
writer = csv.DictWriter(f, fieldnames=row.keys())
if new_file:
writer.writeheader()
writer.writerow(row)
print(f"[+] Metrics written to {outfile}")
def reset_interface(interface):
print(f"[~] Resetting interface {interface} to default state...")
try:
subprocess.call(["ip", "link", "set", interface, "down"])
time.sleep(1)
subprocess.call(["iw", interface, "set", "type", "monitor"])
time.sleep(1)
subprocess.call(["ip", "link", "set", interface, "up"])
time.sleep(1)
print(f"[+] Interface {interface} reset complete.")
except Exception as e:
print(f"[!] Failed to reset interface {interface}: {e}")
def get_connected_bssid(interface):
try:
out = subprocess.check_output(["iw", interface, "link"]).decode()
match = re.search(r"Connected to ([0-9A-Fa-f:]{17})", out, re.MULTILINE)
if match:
return match.group(1).lower()
print("[!] No valid MAC address found in iw link output.")
return None
except Exception as e:
print(f"[!] Failed to get connected BSSID for {interface}: {e}")
return None
# === Main ===
def main():
parser = ArgumentParser()
parser.add_argument("--main-iface", required=True, help="Active interface (used to determine channel)")
parser.add_argument("--monitor-iface", required=True, help="Monitor interface to sniff on")
parser.add_argument("--outfile", required=True, help="CSV file to append metrics row")
parser.add_argument("--channel", type=int, help="Channel to lock monitor interface to (overrides main iface)")
args = parser.parse_args()
reset_interface(args.monitor_iface)
print(f"[+] Starting passive observer.")
print(f" Main interface: {args.main_iface}")
print(f" Monitor interface: {args.monitor_iface}")
print(f" Output file: {args.outfile}")
if args.channel:
print(f" Override channel: {args.channel}")
global target_ap_bssid
global current_channel
current_channel = args.channel or get_current_channel(args.main_iface)
if current_channel:
set_monitor_channel(args.monitor_iface, current_channel)
else:
print("[!] Could not determine current channel. Exiting.")
sys.exit(1)
if not args.channel:
target_ap_bssid = get_connected_bssid(args.main_iface)
if not target_ap_bssid:
print("[!] Could not determine connected BSSID. ClientsOnAP will be 0.")
else:
print(f"[+] Connected BSSID (target AP): {target_ap_bssid}")
else:
target_ap_bssid = None # Can't determine AP if we're not associated
print("[+] Sniffing... (waiting for SIGINT to stop)")
while running:
sniff(iface=args.monitor_iface, prn=handle_packet, store=False, timeout=5)
write_csv(args.outfile)
print("\n[+] Final APs counted on this channel:")
for bssid in sorted(aps):
ssid = ssid_map.get(bssid, "<unknown>")
print(f" - {bssid} (SSID: {ssid})")
print(f"[+] Total APsOnChannel: {len(aps)}")
reset_interface(args.monitor_iface)
if __name__ == "__main__":
main()