Refactor analyze_pcap function to improve SSID encryption status handling and simplify summary output

This commit is contained in:
Yaro Kasear 2025-04-22 09:00:53 -05:00
parent 7be1550278
commit d74aa8ba2c

View file

@ -300,13 +300,13 @@ def analyze_pcap(pcapng_path, start_ts, end_ts, ap_bssid, ap_channel):
ssid_clients = defaultdict(set)
ssid_hidden_status = {}
ssid_encryption_status = {}
cisco_ssid_clients = defaultdict(list)
ssid_signals = defaultdict(list)
ssid_to_bssids = defaultdict(set)
bssid_to_ssid = {}
cisco_reported_clients = []
ssid_packet_counts = defaultdict(int)
ssid_encryption_status = {}
try:
# Filter packets manually by timestamp
@ -323,39 +323,33 @@ def analyze_pcap(pcapng_path, start_ts, end_ts, ap_bssid, ap_channel):
try:
if 'radiotap' not in packet or 'wlan' not in packet:
continue
radio = packet.radiotap
wlan = packet.wlan
if not hasattr(radio.channel, 'freq'):
continue
packet_freq = int(radio.channel.freq)
packet_channel = get_channel_from_freq(packet_freq)
subtype = int(getattr(wlan, 'type_subtype', 0), 16)
if subtype not in (5, 8): # Beacon or Probe Response
continue
try:
mgt = packet.get_multiple_layers('wlan.mgt')[0]
tags = mgt._all_fields.get('wlan.tagged.all', {}).get('wlan.tag', [])
except Exception:
continue
ssid = None
hidden_ssid = False
is_open = True # Assume open until proven encrypted
capabilities = getattr(wlan, 'capabilities', None)
if capabilities:
try:
cap_int = int(capabilities, 16)
if (cap_int & 0x0010): # Privacy bit set = not open
is_open = False
except ValueError:
pass
# Determine encryption from .privacy field
privacy_bit = mgt._all_fields.get('wlan_mgt.fixed.capabilities.privacy')
is_open = (str(privacy_bit) != '1') # 1 = encrypted, 0 = open
for tag in tags:
tag_number = tag.get('wlan.tag.number')
@ -376,35 +370,21 @@ def analyze_pcap(pcapng_path, start_ts, end_ts, ap_bssid, ap_channel):
num_clients = int(tag.get('wlan.cisco.ccx1.clients'))
if ssid:
cisco_ssid_clients[ssid].append(num_clients)
cisco_reported_clients.append(num_clients)
except (TypeError, ValueError):
pass
if ssid:
ssid_hidden_status[ssid] = hidden_ssid
ssid_packet_counts[ssid] += 1
# Cisco Client Count
if tag_number == '133':
try:
num_clients = int(tag.get('wlan.cisco.ccx1.clients'))
cisco_reported_clients.append(num_clients)
except (TypeError, ValueError):
pass
if not ssid:
continue
ssid_hidden_status[ssid] = hidden_ssid
# Store all observed values
if ssid in ssid_encryption_status:
ssid_encryption_status[ssid].append(is_open)
else:
ssid_encryption_status[ssid] = [is_open]
ssid_encryption_status.setdefault(ssid, is_open)
ssid_packet_counts[ssid] += 1
bssid = getattr(wlan, 'bssid', '').lower()
if not bssid or bssid == 'ff:ff:ff:ff:ff:ff':
continue
bssid_to_ssid[bssid] = ssid
ssid_to_bssids[ssid].add(bssid)
@ -415,7 +395,6 @@ def analyze_pcap(pcapng_path, start_ts, end_ts, ap_bssid, ap_channel):
except Exception:
continue
our_ssid = bssid_to_ssid.get(ap_bssid, None)
clients_on_ap = get_clients_on_ap(filtered_packets, ap_bssid)
@ -437,15 +416,15 @@ def analyze_pcap(pcapng_path, start_ts, end_ts, ap_bssid, ap_channel):
ssid_summary.append({
'SSID': ssid,
'Hidden': ssid_hidden_status.get(ssid, False),
'Open': all(ssid_encryption_status.get(ssid, [True])),
'Open': ssid_encryption_status.get(ssid, True),
'BSSID_Count': len(bssids),
'BSSIDs': ";".join(sorted(bssids)),
'Avg_Signal': mean(signals) if signals else 0,
'Max_Signal': max(signals) if signals else 0,
'Min_Signal': min(signals) if signals else 0,
'Clients_Seen': len(ssid_clients.get(ssid, [])),
'CiscoAvgClients': round(mean(cisco_reported_clients), 2) if cisco_reported_clients else 0,
'CiscoMaxClients': max(cisco_reported_clients) if cisco_reported_clients else 0,
'CiscoAvgClients': round(mean(cisco_ssid_clients[ssid]), 2) if ssid in cisco_ssid_clients else 0,
'CiscoMaxClients': max(cisco_ssid_clients[ssid]) if ssid in cisco_ssid_clients else 0,
'PacketCount': ssid_packet_counts[ssid]
})
@ -453,11 +432,13 @@ def analyze_pcap(pcapng_path, start_ts, end_ts, ap_bssid, ap_channel):
packet_count = len(filtered_packets)
cap.close()
return (clients_on_ap, clients_on_channel, aps_on_channel,
avg_ap_signal, max_ap_signal, unlinked_devices,
cisco_avg_reported_clients, cisco_max_reported_clients, num_bssids,
average_signal, max_ssid_signal, num_channels_ssid,
ssid_summary, packet_count)
return (
clients_on_ap, clients_on_channel, aps_on_channel,
avg_ap_signal, max_ap_signal, unlinked_devices,
cisco_avg_reported_clients, cisco_max_reported_clients, num_bssids,
average_signal, max_ssid_signal, num_channels_ssid,
ssid_summary, packet_count
)
def main():
args = parse_args()