139 lines
4.5 KiB
Python
Executable file
139 lines
4.5 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
import argparse
|
|
import csv
|
|
from datetime import datetime
|
|
import pyshark
|
|
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('--csv', required=True, help='Input speedtest CSV')
|
|
parser.add_argument('--pcapng', required=True, help='Kismet-generated .pcapng file')
|
|
parser.add_argument('--output', required=True, help='Output enriched CSV')
|
|
return parser.parse_args()
|
|
|
|
def convert_timestamp_to_epoch(ts_string):
|
|
try:
|
|
return int(datetime.fromisoformat(ts_string.replace("Z", "+00:00")).timestamp())
|
|
except Exception as e:
|
|
print(f"[!] Failed to parse timestamp: {ts_string}")
|
|
return None
|
|
|
|
def get_clients_on_ap(capture, ap_bssid):
|
|
clients = set()
|
|
ap_bssid = ap_bssid.lower() # Normalize for comparison
|
|
|
|
for packet in capture:
|
|
try:
|
|
if not hasattr(packet, 'wlan'):
|
|
print("No WLAN layer found in packet")
|
|
continue
|
|
|
|
sa = getattr(packet.wlan, 'sa', '').lower()
|
|
da = getattr(packet.wlan, 'da', '').lower()
|
|
bssid = getattr(packet.wlan, 'bssid', '').lower()
|
|
print(f"Processing packet: SA={sa}, DA={da}, BSSID={bssid}")
|
|
|
|
# Count any frame *to or from* a client, if AP is involved
|
|
if bssid == ap_bssid or sa == ap_bssid or da == ap_bssid:
|
|
print("Found packet related to AP")
|
|
# If it's the AP sending, add the destination (client)
|
|
if sa == ap_bssid and da and da != ap_bssid:
|
|
print(f"Adding client: {da}")
|
|
clients.add(da)
|
|
# If it's the client sending, add the source
|
|
elif sa and sa != ap_bssid:
|
|
print(f"Adding client: {sa}")
|
|
clients.add(sa)
|
|
|
|
except AttributeError:
|
|
print("Attribute error in packet processing")
|
|
continue
|
|
|
|
return len(clients)
|
|
|
|
|
|
def analyze_pcap(pcapng_path, start_ts, end_ts, ap_bssid, ap_channel):
|
|
|
|
cap = pyshark.FileCapture(
|
|
pcapng_path,
|
|
display_filter=f'time >= {start_ts} && time <= {end_ts}',
|
|
use_json=True,
|
|
include_raw=False
|
|
)
|
|
|
|
# Show for debug purposes the packets in the filtered capture as well as the filter applied
|
|
print(f"[+] Filter applied: time >= {start_ts} && time <= {end_ts}")
|
|
|
|
clients_on_ap = get_clients_on_ap(cap, ap_bssid.lower())
|
|
|
|
# Placeholder: Logic will be added for:
|
|
# - ClientsOnChannel
|
|
# - APsOnChannel
|
|
# - CongestionScore
|
|
# - AvgAPSignal
|
|
# - StrongestAPSignal
|
|
# - UnlinkedDevices
|
|
|
|
cap.close()
|
|
|
|
return clients_on_ap, 0, 0, None, None, None, 0
|
|
|
|
def main():
|
|
args = parse_args()
|
|
cap = pyshark.FileCapture(
|
|
args.pcapng,
|
|
use_json=True,
|
|
include_raw=False,
|
|
keep_packets=False
|
|
)
|
|
|
|
# Checking if the pcapng file is valid
|
|
count = 0
|
|
try:
|
|
for packet in cap:
|
|
count += 1
|
|
if count > 0:
|
|
break
|
|
except Exception as e:
|
|
print(f"[!] Error reading pcapng file: {e}")
|
|
return
|
|
finally:
|
|
cap.close()
|
|
|
|
with open(args.csv, newline='') as infile, open(args.output, 'w', newline='', encoding='utf-8') as outfile:
|
|
reader = csv.DictReader(infile)
|
|
fieldnames = reader.fieldnames + [
|
|
'ClientsOnAP', 'ClientsOnChannel', 'APsOnChannel', 'CongestionScore',
|
|
'AvgAPSignal', 'StrongestAPSignal', 'UnlinkedDevices'
|
|
]
|
|
writer = csv.DictWriter(outfile, fieldnames=fieldnames)
|
|
writer.writeheader()
|
|
|
|
for row in reader:
|
|
tstart = convert_timestamp_to_epoch(row.get("StartTimestamp"))
|
|
tend = convert_timestamp_to_epoch(row.get("EndTimestamp"))
|
|
ap_bssid = row.get("BSSID", "").strip().lower()
|
|
ap_channel = row.get("Channel")
|
|
|
|
if not tstart or not tend:
|
|
writer.writerow(row)
|
|
continue
|
|
|
|
clients_ap, clients_chan, aps_chan, congestion, avg_signal, strongest_signal, unlinked = analyze_pcap(args.pcapng, tstart, tend, ap_bssid, ap_channel)
|
|
|
|
row.update({
|
|
'ClientsOnAP': clients_ap,
|
|
'ClientsOnChannel': clients_chan,
|
|
'APsOnChannel': aps_chan,
|
|
'CongestionScore': congestion,
|
|
'AvgAPSignal': avg_signal,
|
|
'StrongestAPSignal': strongest_signal,
|
|
'UnlinkedDevices': unlinked
|
|
})
|
|
|
|
writer.writerow(row)
|
|
|
|
print(f"[+] Enrichment complete: {args.output}")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|