Refactor listener.py to enhance channel syncing and metrics logging; add command-line arguments for interface selection and output file.
This commit is contained in:
parent
3b03e5c048
commit
50aad9ef15
1 changed files with 107 additions and 105 deletions
212
listener.py
212
listener.py
|
@ -1,21 +1,40 @@
|
||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
import os
|
import os
|
||||||
import signal
|
import signal
|
||||||
import time
|
import csv
|
||||||
from collections import defaultdict
|
import sys
|
||||||
from scapy.all import sniff, Dot11, RadioTap
|
|
||||||
from dotenv import load_dotenv
|
|
||||||
from pathlib import Path
|
|
||||||
import subprocess
|
import subprocess
|
||||||
|
from scapy.all import sniff, Dot11, RadioTap
|
||||||
|
from collections import defaultdict
|
||||||
|
from datetime import datetime
|
||||||
|
from argparse import ArgumentParser
|
||||||
|
|
||||||
|
# === Globals ===
|
||||||
|
running = True
|
||||||
|
packet_count = 0
|
||||||
|
clients = set()
|
||||||
|
aps = set()
|
||||||
|
ap_signals = defaultdict(list) # BSSID -> list of dBm
|
||||||
|
ssid_map = {} # BSSID -> SSID
|
||||||
|
ssid_signals = defaultdict(list) # SSID -> list of dBm
|
||||||
|
|
||||||
|
# === Signal handling ===
|
||||||
|
def stop_sniff(signum, frame):
|
||||||
|
global running
|
||||||
|
print("[!] SIGINT received, stopping sniffing and writing metrics.")
|
||||||
|
running = False
|
||||||
|
|
||||||
|
signal.signal(signal.SIGINT, stop_sniff)
|
||||||
|
|
||||||
|
# === Channel syncing ===
|
||||||
def get_current_channel(interface):
|
def get_current_channel(interface):
|
||||||
try:
|
try:
|
||||||
output = subprocess.check_output(["iw", "dev", interface, "info"]).decode()
|
output = subprocess.check_output(["iw", "dev", interface, "info"]).decode()
|
||||||
for line in output.splitlines():
|
for line in output.splitlines():
|
||||||
if "channel" in line.lower():
|
if "channel" in line.lower():
|
||||||
return int(line.split()[1])
|
return int(line.strip().split()[1])
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"[!] Failed to determine current channel for {interface}: {e}")
|
print(f"[!] Failed to get channel for {interface}: {e}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def set_monitor_channel(interface, channel):
|
def set_monitor_channel(interface, channel):
|
||||||
|
@ -25,118 +44,101 @@ def set_monitor_channel(interface, channel):
|
||||||
except subprocess.CalledProcessError as e:
|
except subprocess.CalledProcessError as e:
|
||||||
print(f"[!] Failed to set {interface} to channel {channel}: {e}")
|
print(f"[!] Failed to set {interface} to channel {channel}: {e}")
|
||||||
|
|
||||||
# --- Settings.env auto-detection logic ---
|
# === Packet handling ===
|
||||||
SCRIPT_DIRECTORY = Path(__file__).resolve().parent
|
def parse_ssid(pkt):
|
||||||
|
|
||||||
# Try intelligent guess of real user home even under sudo
|
|
||||||
SUDO_USER = os.environ.get("SUDO_USER")
|
|
||||||
USER_HOME = Path(f"/home/{SUDO_USER}") if SUDO_USER else Path.home()
|
|
||||||
ENV_PATH = USER_HOME / "wifi_test" / "settings.env"
|
|
||||||
|
|
||||||
# Fallback if all else fails
|
|
||||||
if not ENV_PATH.exists():
|
|
||||||
ENV_PATH = Path("/home/yaro/wifi_test/settings.env")
|
|
||||||
|
|
||||||
if not ENV_PATH.exists():
|
|
||||||
print(f"[!] Failed to load settings from {ENV_PATH}")
|
|
||||||
exit(1)
|
|
||||||
|
|
||||||
load_dotenv(dotenv_path=ENV_PATH)
|
|
||||||
|
|
||||||
# Now load settings
|
|
||||||
LISTEN_INTERFACE = os.getenv("LISTEN_INTERFACE", "wlan0")
|
|
||||||
print(f"[+] Using LISTEN_INTERFACE = {LISTEN_INTERFACE}")
|
|
||||||
|
|
||||||
# === Globals ===
|
|
||||||
clients_per_channel = defaultdict(set)
|
|
||||||
aps_per_channel = defaultdict(set)
|
|
||||||
running = True
|
|
||||||
|
|
||||||
def get_channel(pkt):
|
|
||||||
# Convert frequency to channel
|
|
||||||
if not pkt.haslayer(RadioTap):
|
|
||||||
return None
|
|
||||||
try:
|
try:
|
||||||
freq = pkt[RadioTap].ChannelFrequency
|
elt = pkt.getlayer(Dot11).payload
|
||||||
if 2412 <= freq <= 2472:
|
while elt:
|
||||||
return (freq - 2407) // 5
|
if hasattr(elt, "ID") and elt.ID == 0: # SSID tag
|
||||||
elif freq == 2484:
|
return elt.info.decode(errors="ignore")
|
||||||
return 14
|
elt = elt.payload
|
||||||
elif 5180 <= freq <= 5825:
|
|
||||||
return (freq - 5000) // 5
|
|
||||||
except:
|
except:
|
||||||
return None
|
return None
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def handle_packet(pkt):
|
def handle_packet(pkt):
|
||||||
if not pkt.haslayer(Dot11):
|
global packet_count
|
||||||
return
|
packet_count += 1
|
||||||
ch = get_channel(pkt)
|
|
||||||
if ch is None:
|
if not pkt.haslayer(Dot11) or not pkt.haslayer(RadioTap):
|
||||||
return
|
return
|
||||||
|
|
||||||
dot11 = pkt[Dot11]
|
dot11 = pkt[Dot11]
|
||||||
|
|
||||||
if dot11.type == 0 and dot11.subtype in (5, 8): # Probe Response or Beacon
|
if dot11.type == 0 and dot11.subtype in (5, 8): # Probe Resp or Beacon
|
||||||
aps_per_channel[ch].add(dot11.addr2)
|
aps.add(dot11.addr2)
|
||||||
|
ssid = parse_ssid(pkt)
|
||||||
|
if ssid:
|
||||||
|
ssid_map[dot11.addr2] = ssid
|
||||||
else:
|
else:
|
||||||
if dot11.addr1:
|
if dot11.addr1:
|
||||||
clients_per_channel[ch].add(dot11.addr1)
|
clients.add(dot11.addr1)
|
||||||
if dot11.addr2:
|
if dot11.addr2:
|
||||||
clients_per_channel[ch].add(dot11.addr2)
|
clients.add(dot11.addr2)
|
||||||
|
|
||||||
def print_stats():
|
|
||||||
print("\n[+] Live Wi-Fi Stats")
|
|
||||||
for ch in sorted(set(clients_per_channel) | set(aps_per_channel)):
|
|
||||||
c = len(clients_per_channel[ch])
|
|
||||||
a = len(aps_per_channel[ch])
|
|
||||||
print(f" - Channel {ch}: {c} clients, {a} APs")
|
|
||||||
print("-" * 40)
|
|
||||||
|
|
||||||
def stop_sniff(signum, frame):
|
|
||||||
global running
|
|
||||||
print("\n[!] Caught Ctrl+C, exiting...")
|
|
||||||
running = False
|
|
||||||
|
|
||||||
# === Main ===
|
|
||||||
if __name__ == "__main__":
|
|
||||||
test_interface = os.getenv("INTERFACE", "wlan0")
|
|
||||||
monitor_interface = os.getenv("LISTEN_INTERFACE", "wlan0")
|
|
||||||
|
|
||||||
current_channel = get_current_channel(test_interface)
|
|
||||||
if current_channel:
|
|
||||||
set_monitor_channel(monitor_interface, current_channel)
|
|
||||||
else:
|
|
||||||
print("[!] Unable to determine the current channel. Exiting.")
|
|
||||||
exit(1)
|
|
||||||
|
|
||||||
signal.signal(signal.SIGINT, stop_sniff)
|
|
||||||
print(f"[+] Listening on interface {LISTEN_INTERFACE} (press Ctrl+C to stop)")
|
|
||||||
|
|
||||||
# Start sniffing in a background thread
|
|
||||||
try:
|
try:
|
||||||
last_channel = current_channel
|
signal_dbm = pkt[RadioTap].dBm_AntSignal
|
||||||
channel_check_counter = 0
|
if dot11.addr2 in aps:
|
||||||
|
ap_signals[dot11.addr2].append(signal_dbm)
|
||||||
while running:
|
ssid = ssid_map.get(dot11.addr2)
|
||||||
sniff(
|
if ssid:
|
||||||
iface=LISTEN_INTERFACE,
|
ssid_signals[ssid].append(signal_dbm)
|
||||||
prn=handle_packet,
|
except:
|
||||||
store=False,
|
|
||||||
monitor=True,
|
|
||||||
timeout=10
|
|
||||||
)
|
|
||||||
print_stats()
|
|
||||||
|
|
||||||
# Check channel every 3 loops (~30 seconds)
|
|
||||||
channel_check_counter += 1
|
|
||||||
if channel_check_counter >= 3:
|
|
||||||
channel_check_counter = 0
|
|
||||||
new_channel = get_current_channel(test_interface)
|
|
||||||
if new_channel and new_channel != last_channel:
|
|
||||||
print(f"[~] Detected channel change: {last_channel} → {new_channel}")
|
|
||||||
set_monitor_channel(monitor_interface, new_channel)
|
|
||||||
last_channel = new_channel
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
# === Metrics dump ===
|
||||||
|
def write_csv(outfile):
|
||||||
|
timestamp = datetime.utcnow().isoformat()
|
||||||
|
row = {
|
||||||
|
"Timestamp": timestamp,
|
||||||
|
"ClientsOnChannel": len(clients),
|
||||||
|
"APsOnChannel": len(aps),
|
||||||
|
"PacketCount": packet_count,
|
||||||
|
"AvgAPSignal": round(sum([sum(v)/len(v) for v in ap_signals.values() if v]) / len(ap_signals) if ap_signals else 0, 2),
|
||||||
|
"StrongestAPSignal": max([max(v) for v in ap_signals.values() if v], default=0),
|
||||||
|
"AvgSSIDSignal": round(sum([sum(v)/len(v) for v in ssid_signals.values() if v]) / len(ssid_signals) if ssid_signals else 0, 2),
|
||||||
|
"MaxSSIDSignal": max([max(v) for v in ssid_signals.values() if v], default=0),
|
||||||
|
"ClientsOnAP": "N/A",
|
||||||
|
"CiscoAvgReportedClients": "N/A",
|
||||||
|
"CiscoMaxReportedClients": "N/A",
|
||||||
|
"NumberofBSSIDsOnSSID": "N/A",
|
||||||
|
"NumberofChannelsOnSSID": "N/A"
|
||||||
|
}
|
||||||
|
|
||||||
|
new_file = not os.path.exists(outfile)
|
||||||
|
with open(outfile, "a", newline="") as f:
|
||||||
|
writer = csv.DictWriter(f, fieldnames=row.keys())
|
||||||
|
if new_file:
|
||||||
|
writer.writeheader()
|
||||||
|
writer.writerow(row)
|
||||||
|
print(f"[+] Metrics written to {outfile}")
|
||||||
|
|
||||||
|
# === Main ===
|
||||||
|
def main():
|
||||||
|
parser = ArgumentParser()
|
||||||
|
parser.add_argument("--main-iface", required=True, help="Active interface (used to determine channel)")
|
||||||
|
parser.add_argument("--monitor-iface", required=True, help="Monitor interface to sniff on")
|
||||||
|
parser.add_argument("--outfile", required=True, help="CSV file to append metrics row")
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
print(f"[+] Starting passive observer.")
|
||||||
|
print(f" Main interface: {args.main_iface}")
|
||||||
|
print(f" Monitor interface: {args.monitor_iface}")
|
||||||
|
print(f" Output file: {args.outfile}")
|
||||||
|
|
||||||
|
channel = get_current_channel(args.main_iface)
|
||||||
|
if channel:
|
||||||
|
set_monitor_channel(args.monitor_iface, channel)
|
||||||
|
else:
|
||||||
|
print("[!] Could not determine current channel. Exiting.")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
print("[+] Sniffing... (waiting for SIGINT to stop)")
|
||||||
|
|
||||||
|
while running:
|
||||||
|
sniff(iface=args.monitor_iface, prn=handle_packet, store=False, timeout=5)
|
||||||
|
|
||||||
|
write_csv(args.outfile)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue