129 lines
3.8 KiB
Python
Executable file
129 lines
3.8 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
import os
|
|
import signal
|
|
import time
|
|
from collections import defaultdict
|
|
from scapy.all import sniff, Dot11, RadioTap
|
|
from dotenv import load_dotenv
|
|
from pathlib import Path
|
|
import subprocess
|
|
|
|
def get_current_channel(interface):
|
|
try:
|
|
output = subprocess.check_output(["iw", "dev", interface, "info"]).decode()
|
|
for line in output.splitlines():
|
|
if "channel" in line.lower():
|
|
return int(line.split()[1])
|
|
except Exception as e:
|
|
print(f"[!] Failed to determine current channel for {interface}: {e}")
|
|
return None
|
|
|
|
def set_monitor_channel(interface, channel):
|
|
try:
|
|
subprocess.check_call(["iw", "dev", interface, "set", "channel", str(channel)])
|
|
print(f"[+] Set {interface} to channel {channel}")
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"[!] Failed to set {interface} to channel {channel}: {e}")
|
|
|
|
# --- Settings.env auto-detection logic ---
|
|
SCRIPT_DIRECTORY = Path(__file__).resolve().parent
|
|
|
|
# Try intelligent guess of real user home even under sudo
|
|
SUDO_USER = os.environ.get("SUDO_USER")
|
|
USER_HOME = Path(f"/home/{SUDO_USER}") if SUDO_USER else Path.home()
|
|
ENV_PATH = USER_HOME / "wifi_test" / "settings.env"
|
|
|
|
# Fallback if all else fails
|
|
if not ENV_PATH.exists():
|
|
ENV_PATH = Path("/home/yaro/wifi_test/settings.env")
|
|
|
|
if not ENV_PATH.exists():
|
|
print(f"[!] Failed to load settings from {ENV_PATH}")
|
|
exit(1)
|
|
|
|
load_dotenv(dotenv_path=ENV_PATH)
|
|
|
|
# Now load settings
|
|
LISTEN_INTERFACE = os.getenv("LISTEN_INTERFACE", "wlan0")
|
|
print(f"[+] Using LISTEN_INTERFACE = {LISTEN_INTERFACE}")
|
|
|
|
# === Globals ===
|
|
clients_per_channel = defaultdict(set)
|
|
aps_per_channel = defaultdict(set)
|
|
running = True
|
|
|
|
def get_channel(pkt):
|
|
# Convert frequency to channel
|
|
if not pkt.haslayer(RadioTap):
|
|
return None
|
|
try:
|
|
freq = pkt[RadioTap].ChannelFrequency
|
|
if 2412 <= freq <= 2472:
|
|
return (freq - 2407) // 5
|
|
elif freq == 2484:
|
|
return 14
|
|
elif 5180 <= freq <= 5825:
|
|
return (freq - 5000) // 5
|
|
except:
|
|
return None
|
|
return None
|
|
|
|
def handle_packet(pkt):
|
|
if not pkt.haslayer(Dot11):
|
|
return
|
|
ch = get_channel(pkt)
|
|
if ch is None:
|
|
return
|
|
|
|
dot11 = pkt[Dot11]
|
|
|
|
if dot11.type == 0 and dot11.subtype in (5, 8): # Probe Response or Beacon
|
|
aps_per_channel[ch].add(dot11.addr2)
|
|
else:
|
|
if dot11.addr1:
|
|
clients_per_channel[ch].add(dot11.addr1)
|
|
if dot11.addr2:
|
|
clients_per_channel[ch].add(dot11.addr2)
|
|
|
|
def print_stats():
|
|
print("\n[+] Live Wi-Fi Stats")
|
|
for ch in sorted(set(clients_per_channel) | set(aps_per_channel)):
|
|
c = len(clients_per_channel[ch])
|
|
a = len(aps_per_channel[ch])
|
|
print(f" - Channel {ch}: {c} clients, {a} APs")
|
|
print("-" * 40)
|
|
|
|
def stop_sniff(signum, frame):
|
|
global running
|
|
print("\n[!] Caught Ctrl+C, exiting...")
|
|
running = False
|
|
|
|
# === Main ===
|
|
if __name__ == "__main__":
|
|
test_interface = os.getenv("INTERFACE", "wlan0")
|
|
monitor_interface = os.getenv("LISTEN_INTERFACE", "wlan0")
|
|
|
|
current_channel = get_current_channel(test_interface)
|
|
if current_channel:
|
|
set_monitor_channel(monitor_interface, current_channel)
|
|
else:
|
|
print("[!] Unable to determine the current channel. Exiting.")
|
|
exit(1)
|
|
|
|
signal.signal(signal.SIGINT, stop_sniff)
|
|
print(f"[+] Listening on interface {LISTEN_INTERFACE} (press Ctrl+C to stop)")
|
|
|
|
# Start sniffing in a background thread
|
|
try:
|
|
while running:
|
|
sniff(
|
|
iface=LISTEN_INTERFACE,
|
|
prn=handle_packet,
|
|
store=False,
|
|
monitor=True,
|
|
timeout=10
|
|
)
|
|
print_stats()
|
|
except KeyboardInterrupt:
|
|
pass
|
|
|