wifi_test/listener.py

172 lines
5.6 KiB
Python
Executable file

#!/usr/bin/env python3
import os
import signal
import csv
import sys
import subprocess
import time
from scapy.all import sniff, Dot11, RadioTap
from collections import defaultdict
from datetime import datetime
from argparse import ArgumentParser
# === Globals ===
running = True
packet_count = 0
clients = set()
aps = set()
ap_signals = defaultdict(list) # BSSID -> list of dBm
ssid_map = {} # BSSID -> SSID
ssid_signals = defaultdict(list) # SSID -> list of dBm
ap_clients = defaultdict(set)
# === Signal handling ===
def stop_sniff(signum, frame):
global running
print("[!] SIGINT received, stopping sniffing and writing metrics.")
running = False
signal.signal(signal.SIGINT, stop_sniff)
# === Channel syncing ===
def get_current_channel(interface):
try:
output = subprocess.check_output(["iw", "dev", interface, "info"]).decode()
for line in output.splitlines():
if "channel" in line.lower():
return int(line.strip().split()[1])
except Exception as e:
print(f"[!] Failed to get channel for {interface}: {e}")
return None
def set_monitor_channel(interface, channel):
try:
subprocess.check_call(["iw", "dev", interface, "set", "channel", str(channel)])
print(f"[+] Set {interface} to channel {channel}")
except subprocess.CalledProcessError as e:
print(f"[!] Failed to set {interface} to channel {channel}: {e}")
# === Packet handling ===
def parse_ssid(pkt):
try:
elt = pkt.getlayer(Dot11).payload
while elt:
if hasattr(elt, "ID") and elt.ID == 0: # SSID tag
return elt.info.decode(errors="ignore")
elt = elt.payload
except:
return None
return None
def handle_packet(pkt):
global packet_count
packet_count += 1
if not pkt.haslayer(Dot11) or not pkt.haslayer(RadioTap):
return
dot11 = pkt[Dot11]
# === Detect APs via beacons/probe responses ===
if dot11.type == 0 and dot11.subtype in (5, 8): # Probe Response or Beacon
if dot11.addr2:
aps.add(dot11.addr2)
ssid = parse_ssid(pkt)
if ssid:
ssid_map[dot11.addr2] = ssid
# === Track all seen clients ===
if dot11.addr1:
clients.add(dot11.addr1)
if dot11.addr2:
clients.add(dot11.addr2)
# === Guess client <-> AP relationships ===
if dot11.addr1 in aps and dot11.addr2:
ap_clients[dot11.addr1].add(dot11.addr2)
elif dot11.addr2 in aps and dot11.addr1:
ap_clients[dot11.addr2].add(dot11.addr1)
# === Signal strength tracking ===
try:
signal_dbm = pkt[RadioTap].dBm_AntSignal
if dot11.addr2 in aps:
ap_signals[dot11.addr2].append(signal_dbm)
ssid = ssid_map.get(dot11.addr2)
if ssid:
ssid_signals[ssid].append(signal_dbm)
except:
pass
# === Metrics dump ===
def write_csv(outfile):
timestamp = datetime.utcnow().isoformat()
row = {
"Timestamp": timestamp,
"ClientsOnChannel": len(clients),
"APsOnChannel": len(aps),
"PacketCount": packet_count,
"AvgAPSignal": round(sum([sum(v)/len(v) for v in ap_signals.values() if v]) / len(ap_signals) if ap_signals else 0, 2),
"StrongestAPSignal": max([max(v) for v in ap_signals.values() if v], default=0),
"AvgSSIDSignal": round(sum([sum(v)/len(v) for v in ssid_signals.values() if v]) / len(ssid_signals) if ssid_signals else 0, 2),
"MaxSSIDSignal": max([max(v) for v in ssid_signals.values() if v], default=0),
"ClientsOnAP": sum(len(s) for s in ap_clients.values()),
"CiscoAvgReportedClients": "N/A",
"CiscoMaxReportedClients": "N/A",
"NumberofBSSIDsOnSSID": "N/A",
"NumberofChannelsOnSSID": "N/A"
}
new_file = not os.path.exists(outfile)
with open(outfile, "a", newline="") as f:
writer = csv.DictWriter(f, fieldnames=row.keys())
if new_file:
writer.writeheader()
writer.writerow(row)
print(f"[+] Metrics written to {outfile}")
def reset_interface(interface):
print(f"[~] Resetting interface {interface} to default state...")
try:
subprocess.call(["ip", "link", "set", interface, "down"])
time.sleep(1)
subprocess.call(["iw", interface, "set", "type", "managed"])
time.sleep(1)
subprocess.call(["ip", "link", "set", interface, "up"])
time.sleep(1)
print(f"[+] Interface {interface} reset complete.")
except Exception as e:
print(f"[!] Failed to reset interface {interface}: {e}")
# === Main ===
def main():
parser = ArgumentParser()
parser.add_argument("--main-iface", required=True, help="Active interface (used to determine channel)")
parser.add_argument("--monitor-iface", required=True, help="Monitor interface to sniff on")
parser.add_argument("--outfile", required=True, help="CSV file to append metrics row")
args = parser.parse_args()
reset_interface(args.monitor_iface)
print(f"[+] Starting passive observer.")
print(f" Main interface: {args.main_iface}")
print(f" Monitor interface: {args.monitor_iface}")
print(f" Output file: {args.outfile}")
channel = get_current_channel(args.main_iface)
if channel:
set_monitor_channel(args.monitor_iface, channel)
else:
print("[!] Could not determine current channel. Exiting.")
sys.exit(1)
print("[+] Sniffing... (waiting for SIGINT to stop)")
while running:
sniff(iface=args.monitor_iface, prn=handle_packet, store=False, timeout=5)
write_csv(args.outfile)
reset_interface(args.monitor_iface)
if __name__ == "__main__":
main()