wifi_test/listener.py

348 lines
12 KiB
Python
Executable file

#!/usr/bin/env python3
# Note: ClientsOnAP may exceed ClientsOnChannel due to silent/low-activity clients
# receiving traffic from AP but not sending enough to cross channel activity threshold.
import csv
import os
import re
import signal
import subprocess
import sys
import time
import threading
from argparse import ArgumentParser
from collections import defaultdict
from datetime import datetime
from scapy.all import sniff, Dot11, RadioTap
# === Globals ===
running = True
packet_count = 0
clients = defaultdict(int)
aps = set()
ap_signals = defaultdict(list) # BSSID -> list of dBm
ssid_map = {} # BSSID -> SSID
ssid_signals = defaultdict(list) # SSID -> list of dBm
ap_clients = defaultdict(lambda: defaultdict(int))
target_ap_bssid = None
beacon_counts = defaultdict(int)
current_channel = None
include_probes = False
# deadpoint_candidates = set()
unlinked_candidates = set()
bssid_channels = {}
vendor_cache = {}
CHANNEL_LIST = [1, 6, 11, 36, 40, 44, 48, 52, 56, 60, 64, 100, 104, 108, 112, 116, 120, 124, 128, 132, 136, 140, 149, 153, 157, 161, 165] # Channels to hop
CHANNEL_HOP_INTERVAL = 5 # Seconds per channel
def get_channel_from_freq(freq):
if 2412 <= freq <= 2472:
return (freq - 2407) // 5
elif freq == 2484:
return 14
elif 5180 <= freq <= 5825:
return (freq - 5000) // 5
return None
# === Signal handling ===
def stop_sniff(signum, frame):
global running
print("[!] SIGINT received, stopping sniffing and writing metrics.")
running = False
signal.signal(signal.SIGINT, stop_sniff)
# === Channel syncing ===
def get_current_channel(interface):
try:
output = subprocess.check_output(["iw", "dev", interface, "info"]).decode()
for line in output.splitlines():
if "channel" in line.lower():
return int(line.strip().split()[1])
except Exception as e:
print(f"[!] Failed to get channel for {interface}: {e}")
return None
def set_monitor_channel(interface, channel):
try:
subprocess.check_call(["iw", "dev", interface, "set", "channel", str(channel)])
print(f"[+] Set {interface} to channel {channel}")
except subprocess.CalledProcessError as e:
print(f"[!] Failed to set {interface} to channel {channel}: {e}")
# === Packet handling ===
def parse_ssid(pkt):
try:
elt = pkt.getlayer(Dot11).payload
while elt:
if hasattr(elt, "ID") and elt.ID == 0: # SSID tag
return elt.info.decode(errors="ignore")
elt = elt.payload
except:
return None
return None
def is_unicast(mac):
if not mac:
return False
return int(mac.split(":")[0], 16) % 2 == 0 and mac.lower() != "ff:ff:ff:ff:ff:ff"
def handle_packet(pkt):
global packet_count
packet_count += 1
if not pkt.haslayer(Dot11) or not pkt.haslayer(RadioTap):
return
dot11 = pkt[Dot11]
a1 = dot11.addr1.lower() if dot11.addr1 else None
a2 = dot11.addr2.lower() if dot11.addr2 else None
try:
freq = pkt[RadioTap].ChannelFrequency
packet_channel = get_channel_from_freq(freq)
except:
packet_channel = current_channel or "?"
# === Detect APs via beacon (and optionally probe response) frames ===
if dot11.type == 0 and dot11.subtype in ([8, 5] if include_probes else [8]):
if a2 and is_unicast(a2):
beacon_counts[a2] += 1
if beacon_counts[a2] > 1:
aps.add(a2)
ssid = parse_ssid(pkt)
if ssid:
ssid_map[a2] = ssid
# Track channel seen on
if pkt.haslayer(RadioTap) and hasattr(pkt[RadioTap], 'ChannelFrequency'):
# Nah, be lazy
if packet_channel:
bssid_channels[a2] = packet_channel
# === Track all seen clients ===
if dot11.type == 2:
sa = dot11.addr2.lower() if dot11.addr2 else None
da = dot11.addr1.lower() if dot11.addr1 else None
for mac in (sa, da):
if is_unicast(mac) and mac != target_ap_bssid:
clients[mac] += 1
if mac not in aps:
unlinked_candidates.add(mac)
if dot11.type == 0 and dot11.subtype in [0, 1]:
sa = dot11.addr2.lower() if dot11.addr2 else None
da = dot11.addr1.lower() if dot11.addr1 else None
for mac in (sa, da):
if is_unicast(mac) and mac != target_ap_bssid:
clients[mac] += 1
if mac not in aps:
unlinked_candidates.add(mac)
if da in aps and is_unicast(sa):
ap_clients[da][sa] += 5
# Track clients talking to any known AP
if a1 in aps and is_unicast(a2):
ap_clients[a1][a2] += 1
elif a2 in aps and is_unicast(a1):
ap_clients[a2][a1] += 1
# === Signal strength tracking ===
try:
signal_dbm = pkt[RadioTap].dBm_AntSignal
if a2 in aps:
ap_signals[a2].append(signal_dbm)
ssid = ssid_map.get(a2)
if ssid:
ssid_signals[ssid].append(signal_dbm)
except:
pass
# === Metrics dump ===
def write_csv(outfile):
timestamp = datetime.utcnow().isoformat()
row = {
"Timestamp": timestamp,
"ClientsOnAP": len([mac for mac, count in ap_clients[target_ap_bssid].items() if count > 3]),
"ClientsOnChannel": len([mac for mac, count in clients.items() if count > 3]),
"APsOnChannel": len(aps),
"AvgAPSignal": round(sum([sum(v)/len(v) for v in ap_signals.values() if v]) / len(ap_signals) if ap_signals else 0, 2),
"StrongestAPSignal": max([max(v) for v in ap_signals.values() if v], default=0),
"UnlinkedDevices": len(unlinked_candidates),
"NumberofBSSIDsOnSSID": len([
bssid for bssid, ssid in ssid_map.items()
if target_ap_bssid in ssid_map and ssid == ssid_map.get(target_ap_bssid)
]) if target_ap_bssid and target_ap_bssid in ssid_map else 0,
"AvgSSIDSignal": round(sum([sum(v)/len(v) for v in ssid_signals.values() if v]) / len(ssid_signals) if ssid_signals else 0, 2),
"MaxSSIDSignal": max([max(v) for v in ssid_signals.values() if v], default=0),
"NumberofChannelsOnSSID": len(set([
bssid_channels.get(bssid)
for bssid, ssid in ssid_map.items()
if target_ap_bssid in ssid_map and ssid == ssid_map.get(target_ap_bssid)
])) if target_ap_bssid and target_ap_bssid in ssid_map else 0,
"PacketCount": packet_count,
"Deadpoints": len([ap for ap in aps if is_deadpoint(ap)])
}
new_file = not os.path.exists(outfile)
with open(outfile, "a", newline="") as f:
writer = csv.DictWriter(f, fieldnames=row.keys())
if new_file:
writer.writeheader()
writer.writerow(row)
print(f"[+] Metrics written to {outfile}")
def reset_interface(interface):
print(f"[~] Resetting interface {interface} to default state...")
try:
subprocess.call(["ip", "link", "set", interface, "down"])
time.sleep(1)
subprocess.call(["iw", interface, "set", "type", "monitor"])
time.sleep(1)
subprocess.call(["ip", "link", "set", interface, "up"])
time.sleep(1)
print(f"[+] Interface {interface} reset complete.")
except Exception as e:
print(f"[!] Failed to reset interface {interface}: {e}")
def get_connected_bssid(interface):
try:
out = subprocess.check_output(["iw", interface, "link"]).decode()
match = re.search(r"Connected to ([0-9A-Fa-f:]{17})", out, re.MULTILINE)
if match:
return match.group(1).lower()
print("[!] No valid MAC address found in iw link output.")
return None
except Exception as e:
print(f"[!] Failed to get connected BSSID for {interface}: {e}")
return None
def print_suspect_aps():
print("\n[?] Suspect SSIDs (possibly printers, IoT, weird stuff):")
keywords = ("setup", "direct-", "hp", "epson", "canon", "brother", "smart", "wifi-", "printer")
suspects = []
for bssid in aps:
ssid = ssid_map.get(bssid, "<unknown>")
flags = []
if any(kw in ssid.lower() for kw in keywords):
flags.append("Likely non-AP")
if is_deadpoint(bssid):
flags.append("Deadpoint")
if flags:
suspects.append((bssid, ssid, flags))
if suspects:
for bssid, ssid, flags in suspects:
ch = bssid_channels.get(bssid, "?")
print(f" - {bssid} (SSID: {ssid}, Channel: {ch}) <-- {' + '.join(flags)}")
else:
print(" None found (yet).")
def channel_hopper(interface):
global running
i = 0
while running:
channel = CHANNEL_LIST[i % len(CHANNEL_LIST)]
set_monitor_channel(interface, channel)
i += 1
time.sleep(CHANNEL_HOP_INTERVAL)
def is_deadpoint(ap_bssid):
return sum(ap_clients[ap_bssid].values()) < 2 # No meaningful client interaction
# === Main ===
def main():
parser = ArgumentParser()
parser.add_argument("--main-iface", required=True, help="Active interface (used to determine channel)")
parser.add_argument("--monitor-iface", required=True, help="Monitor interface to sniff on")
parser.add_argument("--outfile", required=True, help="CSV file to append metrics row")
parser.add_argument("--include-probes", action="store_true", help="Include probe responses as valid APs")
group = parser.add_mutually_exclusive_group()
group.add_argument("--channel", type=int, help="Channel to lock monitor interface to")
group.add_argument("--channel-hop", action="store_true", help="Enable channel hopping")
args = parser.parse_args()
reset_interface(args.monitor_iface)
print(f"[+] Starting passive observer.")
print(f" Main interface: {args.main_iface}")
print(f" Monitor interface: {args.monitor_iface}")
print(f" Output file: {args.outfile}")
if args.channel:
print(f" Override channel: {args.channel}")
global target_ap_bssid
global current_channel
current_channel = args.channel or get_current_channel(args.main_iface)
if current_channel:
set_monitor_channel(args.monitor_iface, current_channel)
else:
print("[!] Could not determine current channel. Exiting.")
sys.exit(1)
if not args.channel:
target_ap_bssid = get_connected_bssid(args.main_iface)
if not target_ap_bssid:
print("[!] Could not determine connected BSSID. ClientsOnAP will be 0.")
else:
print(f"[+] Connected BSSID (target AP): {target_ap_bssid}")
else:
target_ap_bssid = None # Can't determine AP if we're not associated
print("[+] Sniffing... (waiting for SIGINT to stop)")
if args.channel_hop:
hopper_thread = threading.Thread(target=channel_hopper, args=(args.monitor_iface,))
hopper_thread.daemon = True
hopper_thread.start()
while running:
sniff(iface=args.monitor_iface, prn=handle_packet, store=False, timeout=5)
write_csv(args.outfile)
print("\n[+] Final APs counted on this channel:")
for bssid in sorted(aps):
ssid = ssid_map.get(bssid, "<unknown>")
ch = bssid_channels.get(bssid, "?")
print(f" - {bssid} (SSID: {ssid}, Channel: {ch})")
print(f"[+] Total APsOnChannel: {len(aps)}")
print_suspect_aps()
print("\n[+] Vendor Summary (known APs):")
for bssid in sorted(aps):
vendor = get_mac_vendor(bssid)
print(f" {bssid}{vendor}")
reset_interface(args.monitor_iface)
def get_mac_vendor(mac):
prefix = mac.upper()[0:8].replace(":", "-")
if prefix in vendor_cache:
return vendor_cache[prefix]
try:
if not os.path.exists("oui.txt"):
print("[~] Downloading IEEE OUI list...")
import urllib.request
url = "http://standards-oui.ieee.org/oui/oui.txt"
urllib.request.urlretrieve(url, "oui.txt")
with open("oui.txt", "r", encoding="utf-8", errors="ignore") as f:
for line in f:
if "(hex)" in line and prefix in line:
parts = line.strip().split()
if len(parts) >= 3 and parts[0] == prefix and parts[1] == "(hex)":
vendor = " ".join(parts[2:]).strip()
vendor_cache[prefix] = vendor
return vendor
except Exception as e:
print(f"[ERROR] Vendor lookup failed: {e}")
vendor_cache[prefix] = "Unknown"
return "Unknown"
if __name__ == "__main__":
main()