import json import uuid from datetime import datetime from pathlib import Path from typing import Optional, Any from uuid import UUID from iottb.definitions import ReturnCodes, CAPTURE_METADATA_FILE from iottb.models.device_metadata_model import DeviceMetadata from iottb.logger import logger class CaptureMetadata: # Required Fields device_metadata: DeviceMetadata capture_id: uuid.UUID = lambda: str(uuid.uuid4()) capture_dir: Path capture_file: str capture_date: str = lambda: datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower() # Statistics start_time: str stop_time: str # tcpdump packet_count: Optional[int] pcap_filter: str = "" tcpdump_command: str = "" interface: str = "" # Optional Fields device_ip_address: Optional[str] = "No IP Address set" device_mac_address: Optional[str] = None app: Optional[str] = None app_version: Optional[str] = None firmware_version: Optional[str] = None def __init__(self, device_metadata: DeviceMetadata, capture_dir: Path, /, **data: Any): logger.info(f"Creating CaptureMetadata model from DeviceMetadata: {device_metadata}") super().__init__(**data) # Pycharms orders self.device_metadata = device_metadata self.capture_dir = capture_dir assert capture_dir.is_dir(), f"Capture directory {capture_dir} does not exist" # Getters def get_device_id(self) -> str: return self.device_metadata.get_device_id() def get_start_time(self) -> str: return self.start_time def get_stop_time(self) -> str: return self.stop_time def get_packet_count(self) -> int: return self.packet_count def get_pcap_filter(self) -> str: return self.pcap_filter def get_device_ip_address(self) -> str: return self.device_ip_address def get_device_mac_address(self) -> str: return self.device_mac_address def get_app(self) -> str: return self.app def get_app_version(self) -> str: return self.app_version def get_firmware_version(self) -> str: return self.firmware_version def get_capture_id(self) -> UUID: return self.capture_id def get_capture_date(self) -> str: return self.capture_date def get_capfile_name(self): return self.capture_file def get_device_metadata(self) -> DeviceMetadata: return self.device_metadata def get_interface(self): return self.interface # Setters def set_capture_dir(self, capture_dir: Path): self.capture_dir = capture_dir def set_capture_file(self, capture_file: str): self.capture_file = capture_file def set_capture_date(self, capture_date: str): self.capture_date = capture_date def set_start_time(self, start_time: str): self.start_time = start_time def set_stop_time(self, stop_time: str): self.stop_time = stop_time def set_packet_count(self, packet_count: int): self.packet_count = packet_count def set_pcap_filter(self, pcap_filter: str): self.pcap_filter = pcap_filter def set_device_ip_address(self, device_ip_address: str): self.device_ip_address = device_ip_address def set_device_mac_address(self, device_mac_address: str): self.device_mac_address = device_mac_address def set_app(self, app: str): self.app = app def set_app_version(self, app_version: str): self.app_version = app_version def set_firmware_version(self, firmware_version: str): self.firmware_version = firmware_version self.device_metadata.set_device_firmware_version(firmware_version) def set_interface(self, interface: str): self.interface = interface def set_tcpdump_command(self, tcpdump_command: str): self.tcpdump_command = tcpdump_command # Other def build_capture_file_name(self): logger.info(f"Building capture file name") prefix = "" if self.app is None: logger.debug(f"No app specified") prefix = self.device_metadata.get_device_short_name() else: logger.debug(f"App specified: {self.app}") assert str(self.app).strip() not in {"", " "}, f"app is not a valid name: {self.app}" prefix = self.get_app() # assert self.capture_dir is not None, f"{self.capture_dir} does not exist" filename = f"{prefix}_{str(self.capture_id)}.pcap" logger.debug(f"Capture file name: {filename}") self.set_capture_file(filename) def save_capture_metadata_to_json(self, file_path: Path = Path(CAPTURE_METADATA_FILE)): assert self.capture_dir.is_dir(), f"capture_dir is not a directory: {self.capture_dir}" if file_path.is_file(): print(f"File {file_path} already exists, update instead.") return ReturnCodes.FILE_ALREADY_EXISTS metadata = self.model_dump_json(indent=2, exclude_unset=True, exclude_none=True) with file_path.open('w') as file: json.dump(metadata, file) return ReturnCodes.SUCCESS def model_dump_json(self, indent, exclude_unset, exclude_none): pass