256 lines
9 KiB
Python
Executable file
256 lines
9 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
import csv
|
|
import os
|
|
import re
|
|
import signal
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
|
|
from argparse import ArgumentParser
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
from scapy.all import sniff, Dot11, RadioTap
|
|
|
|
# === Globals ===
|
|
running = True
|
|
packet_count = 0
|
|
clients = defaultdict(int)
|
|
aps = set()
|
|
ap_signals = defaultdict(list) # BSSID -> list of dBm
|
|
ssid_map = {} # BSSID -> SSID
|
|
ssid_signals = defaultdict(list) # SSID -> list of dBm
|
|
ap_clients = defaultdict(lambda: defaultdict(int))
|
|
target_ap_bssid = None
|
|
beacon_counts = defaultdict(int)
|
|
current_channel = None
|
|
include_probes = False
|
|
deadpoint_candidates = set()
|
|
unlinked_candidates = set()
|
|
|
|
# === Signal handling ===
|
|
def stop_sniff(signum, frame):
|
|
global running
|
|
print("[!] SIGINT received, stopping sniffing and writing metrics.")
|
|
running = False
|
|
|
|
signal.signal(signal.SIGINT, stop_sniff)
|
|
|
|
# === Channel syncing ===
|
|
def get_current_channel(interface):
|
|
try:
|
|
output = subprocess.check_output(["iw", "dev", interface, "info"]).decode()
|
|
for line in output.splitlines():
|
|
if "channel" in line.lower():
|
|
return int(line.strip().split()[1])
|
|
except Exception as e:
|
|
print(f"[!] Failed to get channel for {interface}: {e}")
|
|
return None
|
|
|
|
def set_monitor_channel(interface, channel):
|
|
try:
|
|
subprocess.check_call(["iw", "dev", interface, "set", "channel", str(channel)])
|
|
print(f"[+] Set {interface} to channel {channel}")
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"[!] Failed to set {interface} to channel {channel}: {e}")
|
|
|
|
# === Packet handling ===
|
|
def parse_ssid(pkt):
|
|
try:
|
|
elt = pkt.getlayer(Dot11).payload
|
|
while elt:
|
|
if hasattr(elt, "ID") and elt.ID == 0: # SSID tag
|
|
return elt.info.decode(errors="ignore")
|
|
elt = elt.payload
|
|
except:
|
|
return None
|
|
return None
|
|
|
|
def is_unicast(mac):
|
|
if not mac:
|
|
return False
|
|
return int(mac.split(":")[0], 16) % 2 == 0 and mac.lower() != "ff:ff:ff:ff:ff:ff"
|
|
|
|
def handle_packet(pkt):
|
|
global packet_count
|
|
packet_count += 1
|
|
|
|
if not pkt.haslayer(Dot11) or not pkt.haslayer(RadioTap):
|
|
return
|
|
|
|
dot11 = pkt[Dot11]
|
|
a1 = dot11.addr1.lower() if dot11.addr1 else None
|
|
a2 = dot11.addr2.lower() if dot11.addr2 else None
|
|
|
|
# === Detect APs via beacon (and optionally probe response) frames ===
|
|
if dot11.type == 0 and dot11.subtype in ([8, 5] if include_probes else [8]):
|
|
if a2 and is_unicast(a2):
|
|
beacon_counts[a2] += 1
|
|
if beacon_counts[a2] > 1:
|
|
aps.add(a2)
|
|
deadpoint_candidates.add(a2)
|
|
ssid = parse_ssid(pkt)
|
|
if ssid:
|
|
ssid_map[a2] = ssid
|
|
|
|
# === Track all seen clients ===
|
|
if dot11.type == 2:
|
|
sa = dot11.addr2.lower() if dot11.addr2 else None
|
|
da = dot11.addr1.lower() if dot11.addr1 else None
|
|
|
|
for mac in (sa, da):
|
|
if is_unicast(mac) and mac != target_ap_bssid:
|
|
clients[mac] += 1
|
|
if mac not in aps:
|
|
unlinked_candidates.add(mac)
|
|
|
|
# Track clients talking to the same AP we're connected to
|
|
if target_ap_bssid:
|
|
if a1 and a2:
|
|
if target_ap_bssid in (a1, a2):
|
|
peer = a2 if a1 == target_ap_bssid else a1
|
|
if is_unicast(peer) and peer not in aps:
|
|
ap_clients[target_ap_bssid][peer] += 1
|
|
|
|
# === Signal strength tracking ===
|
|
try:
|
|
signal_dbm = pkt[RadioTap].dBm_AntSignal
|
|
if a2 in aps:
|
|
ap_signals[a2].append(signal_dbm)
|
|
ssid = ssid_map.get(a2)
|
|
if ssid:
|
|
ssid_signals[ssid].append(signal_dbm)
|
|
except:
|
|
pass
|
|
|
|
# === Metrics dump ===
|
|
def write_csv(outfile):
|
|
timestamp = datetime.utcnow().isoformat()
|
|
row = {
|
|
"Timestamp": timestamp,
|
|
"ClientsOnChannel": len([mac for mac, count in clients.items() if count > 3]),
|
|
"APsOnChannel": len(aps),
|
|
"PacketCount": packet_count,
|
|
"AvgAPSignal": round(sum([sum(v)/len(v) for v in ap_signals.values() if v]) / len(ap_signals) if ap_signals else 0, 2),
|
|
"StrongestAPSignal": max([max(v) for v in ap_signals.values() if v], default=0),
|
|
"AvgSSIDSignal": round(sum([sum(v)/len(v) for v in ssid_signals.values() if v]) / len(ssid_signals) if ssid_signals else 0, 2),
|
|
"MaxSSIDSignal": max([max(v) for v in ssid_signals.values() if v], default=0),
|
|
"ClientsOnAP": len([mac for mac, count in ap_clients[target_ap_bssid].items() if count > 3]),
|
|
"CiscoAvgReportedClients": "N/A",
|
|
"CiscoMaxReportedClients": "N/A",
|
|
"NumberofBSSIDsOnSSID": "N/A",
|
|
"NumberofChannelsOnSSID": "N/A",
|
|
"UnlinkedDevices": len(unlinked_candidates),
|
|
"Deadpoints": len([ap for ap in deadpoint_candidates if not ap_clients.get(ap)])
|
|
}
|
|
|
|
new_file = not os.path.exists(outfile)
|
|
with open(outfile, "a", newline="") as f:
|
|
writer = csv.DictWriter(f, fieldnames=row.keys())
|
|
if new_file:
|
|
writer.writeheader()
|
|
writer.writerow(row)
|
|
print(f"[+] Metrics written to {outfile}")
|
|
|
|
def reset_interface(interface):
|
|
print(f"[~] Resetting interface {interface} to default state...")
|
|
try:
|
|
subprocess.call(["ip", "link", "set", interface, "down"])
|
|
time.sleep(1)
|
|
subprocess.call(["iw", interface, "set", "type", "monitor"])
|
|
time.sleep(1)
|
|
subprocess.call(["ip", "link", "set", interface, "up"])
|
|
time.sleep(1)
|
|
print(f"[+] Interface {interface} reset complete.")
|
|
except Exception as e:
|
|
print(f"[!] Failed to reset interface {interface}: {e}")
|
|
|
|
def get_connected_bssid(interface):
|
|
try:
|
|
out = subprocess.check_output(["iw", interface, "link"]).decode()
|
|
match = re.search(r"Connected to ([0-9A-Fa-f:]{17})", out, re.MULTILINE)
|
|
if match:
|
|
return match.group(1).lower()
|
|
print("[!] No valid MAC address found in iw link output.")
|
|
return None
|
|
except Exception as e:
|
|
print(f"[!] Failed to get connected BSSID for {interface}: {e}")
|
|
return None
|
|
|
|
def print_suspect_aps():
|
|
print("\n[?] Suspect SSIDs (possibly printers, IoT, weird stuff):")
|
|
suspects = []
|
|
keywords = ("setup", "direct-", "hp", "epson", "canon", "brother", "smart", "wifi-", "printer")
|
|
|
|
for bssid, ssid in ssid_map.items():
|
|
is_keyword_match = any(kw in ssid.lower() for kw in keywords)
|
|
is_deadpoint = bssid in deadpoint_candidates and not ap_clients.get(bssid)
|
|
|
|
if is_keyword_match or is_deadpoint:
|
|
reason = "Non-Infrastructure" if is_keyword_match else "Deadpoint"
|
|
suspects.append((bssid, ssid, reason))
|
|
|
|
if suspects:
|
|
for bssid, ssid, reason in suspects:
|
|
print(f" - {bssid} (SSID: {ssid}) [{reason}]")
|
|
else:
|
|
print(" None found (yet).")
|
|
|
|
# === Main ===
|
|
def main():
|
|
parser = ArgumentParser()
|
|
parser.add_argument("--main-iface", required=True, help="Active interface (used to determine channel)")
|
|
parser.add_argument("--monitor-iface", required=True, help="Monitor interface to sniff on")
|
|
parser.add_argument("--outfile", required=True, help="CSV file to append metrics row")
|
|
parser.add_argument("--channel", type=int, help="Channel to lock monitor interface to (overrides main iface)")
|
|
parser.add_argument("--include-probes", action="store_true", help="Include probe responses as valid APs")
|
|
|
|
args = parser.parse_args()
|
|
|
|
reset_interface(args.monitor_iface)
|
|
|
|
print(f"[+] Starting passive observer.")
|
|
print(f" Main interface: {args.main_iface}")
|
|
print(f" Monitor interface: {args.monitor_iface}")
|
|
print(f" Output file: {args.outfile}")
|
|
if args.channel:
|
|
print(f" Override channel: {args.channel}")
|
|
|
|
global target_ap_bssid
|
|
global current_channel
|
|
|
|
current_channel = args.channel or get_current_channel(args.main_iface)
|
|
if current_channel:
|
|
set_monitor_channel(args.monitor_iface, current_channel)
|
|
else:
|
|
print("[!] Could not determine current channel. Exiting.")
|
|
sys.exit(1)
|
|
|
|
if not args.channel:
|
|
target_ap_bssid = get_connected_bssid(args.main_iface)
|
|
if not target_ap_bssid:
|
|
print("[!] Could not determine connected BSSID. ClientsOnAP will be 0.")
|
|
else:
|
|
print(f"[+] Connected BSSID (target AP): {target_ap_bssid}")
|
|
else:
|
|
target_ap_bssid = None # Can't determine AP if we're not associated
|
|
|
|
print("[+] Sniffing... (waiting for SIGINT to stop)")
|
|
|
|
|
|
while running:
|
|
sniff(iface=args.monitor_iface, prn=handle_packet, store=False, timeout=5)
|
|
|
|
write_csv(args.outfile)
|
|
print("\n[+] Final APs counted on this channel:")
|
|
|
|
for bssid in sorted(aps):
|
|
ssid = ssid_map.get(bssid, "<unknown>")
|
|
print(f" - {bssid} (SSID: {ssid})")
|
|
print(f"[+] Total APsOnChannel: {len(aps)}")
|
|
print_suspect_aps()
|
|
|
|
reset_interface(args.monitor_iface)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|