345 lines
12 KiB
Python
Executable file
345 lines
12 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
import csv
|
|
import os
|
|
import re
|
|
import signal
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import threading
|
|
|
|
from argparse import ArgumentParser
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
from scapy.all import sniff, Dot11, RadioTap
|
|
|
|
# === Globals ===
|
|
running = True
|
|
packet_count = 0
|
|
clients = defaultdict(int)
|
|
aps = set()
|
|
ap_signals = defaultdict(list) # BSSID -> list of dBm
|
|
ssid_map = {} # BSSID -> SSID
|
|
ssid_signals = defaultdict(list) # SSID -> list of dBm
|
|
ap_clients = defaultdict(lambda: defaultdict(int))
|
|
target_ap_bssid = None
|
|
beacon_counts = defaultdict(int)
|
|
current_channel = None
|
|
include_probes = False
|
|
# deadpoint_candidates = set()
|
|
unlinked_candidates = set()
|
|
bssid_channels = {}
|
|
vendor_cache = {}
|
|
|
|
CHANNEL_LIST = [1, 6, 11, 36, 40, 44, 48, 52, 56, 60, 64, 100, 104, 108, 112, 116, 120, 124, 128, 132, 136, 140, 149, 153, 157, 161, 165] # Channels to hop
|
|
CHANNEL_HOP_INTERVAL = 5 # Seconds per channel
|
|
|
|
def get_channel_from_freq(freq):
|
|
if 2412 <= freq <= 2472:
|
|
return (freq - 2407) // 5
|
|
elif freq == 2484:
|
|
return 14
|
|
elif 5180 <= freq <= 5825:
|
|
return (freq - 5000) // 5
|
|
return None
|
|
|
|
# === Signal handling ===
|
|
def stop_sniff(signum, frame):
|
|
global running
|
|
print("[!] SIGINT received, stopping sniffing and writing metrics.")
|
|
running = False
|
|
|
|
signal.signal(signal.SIGINT, stop_sniff)
|
|
|
|
# === Channel syncing ===
|
|
def get_current_channel(interface):
|
|
try:
|
|
output = subprocess.check_output(["iw", "dev", interface, "info"]).decode()
|
|
for line in output.splitlines():
|
|
if "channel" in line.lower():
|
|
return int(line.strip().split()[1])
|
|
except Exception as e:
|
|
print(f"[!] Failed to get channel for {interface}: {e}")
|
|
return None
|
|
|
|
def set_monitor_channel(interface, channel):
|
|
try:
|
|
subprocess.check_call(["iw", "dev", interface, "set", "channel", str(channel)])
|
|
print(f"[+] Set {interface} to channel {channel}")
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"[!] Failed to set {interface} to channel {channel}: {e}")
|
|
|
|
# === Packet handling ===
|
|
def parse_ssid(pkt):
|
|
try:
|
|
elt = pkt.getlayer(Dot11).payload
|
|
while elt:
|
|
if hasattr(elt, "ID") and elt.ID == 0: # SSID tag
|
|
return elt.info.decode(errors="ignore")
|
|
elt = elt.payload
|
|
except:
|
|
return None
|
|
return None
|
|
|
|
def is_unicast(mac):
|
|
if not mac:
|
|
return False
|
|
return int(mac.split(":")[0], 16) % 2 == 0 and mac.lower() != "ff:ff:ff:ff:ff:ff"
|
|
|
|
def handle_packet(pkt):
|
|
global packet_count
|
|
packet_count += 1
|
|
|
|
if not pkt.haslayer(Dot11) or not pkt.haslayer(RadioTap):
|
|
return
|
|
|
|
dot11 = pkt[Dot11]
|
|
a1 = dot11.addr1.lower() if dot11.addr1 else None
|
|
a2 = dot11.addr2.lower() if dot11.addr2 else None
|
|
|
|
try:
|
|
freq = pkt[RadioTap].ChannelFrequency
|
|
packet_channel = get_channel_from_freq(freq)
|
|
except:
|
|
packet_channel = current_channel or "?"
|
|
|
|
# === Detect APs via beacon (and optionally probe response) frames ===
|
|
if dot11.type == 0 and dot11.subtype in ([8, 5] if include_probes else [8]):
|
|
if a2 and is_unicast(a2):
|
|
beacon_counts[a2] += 1
|
|
if beacon_counts[a2] > 1:
|
|
aps.add(a2)
|
|
ssid = parse_ssid(pkt)
|
|
if ssid:
|
|
ssid_map[a2] = ssid
|
|
# Track channel seen on
|
|
if pkt.haslayer(RadioTap) and hasattr(pkt[RadioTap], 'ChannelFrequency'):
|
|
# Nah, be lazy
|
|
if packet_channel:
|
|
bssid_channels[a2] = packet_channel
|
|
|
|
# === Track all seen clients ===
|
|
if dot11.type == 2:
|
|
sa = dot11.addr2.lower() if dot11.addr2 else None
|
|
da = dot11.addr1.lower() if dot11.addr1 else None
|
|
|
|
for mac in (sa, da):
|
|
if is_unicast(mac) and mac != target_ap_bssid:
|
|
clients[mac] += 1
|
|
if mac not in aps:
|
|
unlinked_candidates.add(mac)
|
|
|
|
if dot11.type == 0 and dot11.subtype in [0, 1]:
|
|
sa = dot11.addr2.lower() if dot11.addr2 else None
|
|
da = dot11.addr1.lower() if dot11.addr1 else None
|
|
for mac in (sa, da):
|
|
if is_unicast(mac) and mac != target_ap_bssid:
|
|
clients[mac] += 1
|
|
if mac not in aps:
|
|
unlinked_candidates.add(mac)
|
|
if da in aps and is_unicast(sa):
|
|
ap_clients[da][sa] += 5
|
|
|
|
# Track clients talking to any known AP
|
|
if a1 in aps and is_unicast(a2):
|
|
ap_clients[a1][a2] += 1
|
|
elif a2 in aps and is_unicast(a1):
|
|
ap_clients[a2][a1] += 1
|
|
|
|
# === Signal strength tracking ===
|
|
try:
|
|
signal_dbm = pkt[RadioTap].dBm_AntSignal
|
|
if a2 in aps:
|
|
ap_signals[a2].append(signal_dbm)
|
|
ssid = ssid_map.get(a2)
|
|
if ssid:
|
|
ssid_signals[ssid].append(signal_dbm)
|
|
except:
|
|
pass
|
|
|
|
# === Metrics dump ===
|
|
def write_csv(outfile):
|
|
timestamp = datetime.utcnow().isoformat()
|
|
row = {
|
|
"Timestamp": timestamp,
|
|
"ClientsOnAP": len([mac for mac, count in ap_clients[target_ap_bssid].items() if count > 3]),
|
|
"ClientsOnChannel": len([
|
|
mac for mac, count in clients.items()
|
|
if count > 3 and mac in ap_clients and current_channel == bssid_channels.get(mac)
|
|
]),
|
|
"APsOnChannel": len(aps),
|
|
"AvgAPSignal": round(sum([sum(v)/len(v) for v in ap_signals.values() if v]) / len(ap_signals) if ap_signals else 0, 2),
|
|
"StrongestAPSignal": max([max(v) for v in ap_signals.values() if v], default=0),
|
|
"UnlinkedDevices": len(unlinked_candidates),
|
|
"NumberofBSSIDsOnSSID": len([
|
|
bssid for bssid, ssid in ssid_map.items()
|
|
if target_ap_bssid in ssid_map and ssid == ssid_map.get(target_ap_bssid)
|
|
]) if target_ap_bssid and target_ap_bssid in ssid_map else 0,
|
|
"AvgSSIDSignal": round(sum([sum(v)/len(v) for v in ssid_signals.values() if v]) / len(ssid_signals) if ssid_signals else 0, 2),
|
|
"MaxSSIDSignal": max([max(v) for v in ssid_signals.values() if v], default=0),
|
|
"NumberofChannelsOnSSID": len(set([
|
|
bssid_channels.get(bssid)
|
|
for bssid, ssid in ssid_map.items()
|
|
if target_ap_bssid in ssid_map and ssid == ssid_map.get(target_ap_bssid)
|
|
])) if target_ap_bssid and target_ap_bssid in ssid_map else 0,
|
|
"PacketCount": packet_count,
|
|
"Deadpoints": len([ap for ap in aps if is_deadpoint(ap)])
|
|
}
|
|
|
|
new_file = not os.path.exists(outfile)
|
|
with open(outfile, "a", newline="") as f:
|
|
writer = csv.DictWriter(f, fieldnames=row.keys())
|
|
if new_file:
|
|
writer.writeheader()
|
|
writer.writerow(row)
|
|
print(f"[+] Metrics written to {outfile}")
|
|
|
|
def reset_interface(interface):
|
|
print(f"[~] Resetting interface {interface} to default state...")
|
|
try:
|
|
subprocess.call(["ip", "link", "set", interface, "down"])
|
|
time.sleep(1)
|
|
subprocess.call(["iw", interface, "set", "type", "monitor"])
|
|
time.sleep(1)
|
|
subprocess.call(["ip", "link", "set", interface, "up"])
|
|
time.sleep(1)
|
|
print(f"[+] Interface {interface} reset complete.")
|
|
except Exception as e:
|
|
print(f"[!] Failed to reset interface {interface}: {e}")
|
|
|
|
def get_connected_bssid(interface):
|
|
try:
|
|
out = subprocess.check_output(["iw", interface, "link"]).decode()
|
|
match = re.search(r"Connected to ([0-9A-Fa-f:]{17})", out, re.MULTILINE)
|
|
if match:
|
|
return match.group(1).lower()
|
|
print("[!] No valid MAC address found in iw link output.")
|
|
return None
|
|
except Exception as e:
|
|
print(f"[!] Failed to get connected BSSID for {interface}: {e}")
|
|
return None
|
|
|
|
def print_suspect_aps():
|
|
print("\n[?] Suspect SSIDs (possibly printers, IoT, weird stuff):")
|
|
keywords = ("setup", "direct-", "hp", "epson", "canon", "brother", "smart", "wifi-", "printer")
|
|
suspects = []
|
|
|
|
for bssid in aps:
|
|
ssid = ssid_map.get(bssid, "<unknown>")
|
|
flags = []
|
|
if any(kw in ssid.lower() for kw in keywords):
|
|
flags.append("Likely non-AP")
|
|
if is_deadpoint(bssid):
|
|
flags.append("Deadpoint")
|
|
if flags:
|
|
suspects.append((bssid, ssid, flags))
|
|
|
|
if suspects:
|
|
for bssid, ssid, flags in suspects:
|
|
ch = bssid_channels.get(bssid, "?")
|
|
print(f" - {bssid} (SSID: {ssid}, Channel: {ch}) <-- {' + '.join(flags)}")
|
|
else:
|
|
print(" None found (yet).")
|
|
|
|
def channel_hopper(interface):
|
|
global running
|
|
i = 0
|
|
while running:
|
|
channel = CHANNEL_LIST[i % len(CHANNEL_LIST)]
|
|
set_monitor_channel(interface, channel)
|
|
i += 1
|
|
time.sleep(CHANNEL_HOP_INTERVAL)
|
|
|
|
def is_deadpoint(ap_bssid):
|
|
return sum(ap_clients[ap_bssid].values()) < 2 # No meaningful client interaction
|
|
|
|
# === Main ===
|
|
def main():
|
|
parser = ArgumentParser()
|
|
parser.add_argument("--main-iface", required=True, help="Active interface (used to determine channel)")
|
|
parser.add_argument("--monitor-iface", required=True, help="Monitor interface to sniff on")
|
|
parser.add_argument("--outfile", required=True, help="CSV file to append metrics row")
|
|
parser.add_argument("--include-probes", action="store_true", help="Include probe responses as valid APs")
|
|
group = parser.add_mutually_exclusive_group()
|
|
group.add_argument("--channel", type=int, help="Channel to lock monitor interface to")
|
|
group.add_argument("--channel-hop", action="store_true", help="Enable channel hopping")
|
|
|
|
args = parser.parse_args()
|
|
|
|
reset_interface(args.monitor_iface)
|
|
|
|
print(f"[+] Starting passive observer.")
|
|
print(f" Main interface: {args.main_iface}")
|
|
print(f" Monitor interface: {args.monitor_iface}")
|
|
print(f" Output file: {args.outfile}")
|
|
if args.channel:
|
|
print(f" Override channel: {args.channel}")
|
|
|
|
global target_ap_bssid
|
|
global current_channel
|
|
|
|
current_channel = args.channel or get_current_channel(args.main_iface)
|
|
if current_channel:
|
|
set_monitor_channel(args.monitor_iface, current_channel)
|
|
else:
|
|
print("[!] Could not determine current channel. Exiting.")
|
|
sys.exit(1)
|
|
|
|
if not args.channel:
|
|
target_ap_bssid = get_connected_bssid(args.main_iface)
|
|
if not target_ap_bssid:
|
|
print("[!] Could not determine connected BSSID. ClientsOnAP will be 0.")
|
|
else:
|
|
print(f"[+] Connected BSSID (target AP): {target_ap_bssid}")
|
|
else:
|
|
target_ap_bssid = None # Can't determine AP if we're not associated
|
|
|
|
print("[+] Sniffing... (waiting for SIGINT to stop)")
|
|
if args.channel_hop:
|
|
hopper_thread = threading.Thread(target=channel_hopper, args=(args.monitor_iface,))
|
|
hopper_thread.daemon = True
|
|
hopper_thread.start()
|
|
|
|
while running:
|
|
sniff(iface=args.monitor_iface, prn=handle_packet, store=False, timeout=5)
|
|
|
|
write_csv(args.outfile)
|
|
print("\n[+] Final APs counted on this channel:")
|
|
|
|
for bssid in sorted(aps):
|
|
ssid = ssid_map.get(bssid, "<unknown>")
|
|
ch = bssid_channels.get(bssid, "?")
|
|
print(f" - {bssid} (SSID: {ssid}, Channel: {ch})")
|
|
print(f"[+] Total APsOnChannel: {len(aps)}")
|
|
print_suspect_aps()
|
|
|
|
print("\n[+] Vendor Summary (known APs):")
|
|
for bssid in sorted(aps):
|
|
vendor = get_mac_vendor(bssid)
|
|
print(f" {bssid} → {vendor}")
|
|
|
|
reset_interface(args.monitor_iface)
|
|
|
|
def get_mac_vendor(mac):
|
|
prefix = mac.upper()[0:8].replace(":", "-")
|
|
if prefix in vendor_cache:
|
|
return vendor_cache[prefix]
|
|
try:
|
|
if not os.path.exists("oui.txt"):
|
|
print("[~] Downloading IEEE OUI list...")
|
|
import urllib.request
|
|
url = "http://standards-oui.ieee.org/oui/oui.txt"
|
|
urllib.request.urlretrieve(url, "oui.txt")
|
|
with open("oui.txt", "r", encoding="utf-8", errors="ignore") as f:
|
|
for line in f:
|
|
if prefix in line:
|
|
vendor = line.strip().split("\t")[-1]
|
|
vendor_cache[prefix] = vendor
|
|
return vendor
|
|
except Exception as e:
|
|
pass
|
|
vendor_cache[prefix] = "Unknown"
|
|
return "Unknown"
|
|
|
|
if __name__ == "__main__":
|
|
main()
|