import json import uuid from datetime import datetime from pathlib import Path from typing import Optional, Any from uuid import UUID from pydantic import BaseModel, Field from iottb.definitions import ReturnCodes, CAPTURE_METADATA_FILE from iottb.models.device_metadata_model import DeviceMetadata class CaptureMetadata(BaseModel): # Required Fields device_metadata: DeviceMetadata = Field(exclude=True) capture_id: uuid.UUID = Field(default_factory=lambda: str(uuid.uuid4())) capture_dir: Path capture_file: str capture_date: str = Field(default_factory=lambda: datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()) # Statistics start_time: str stop_time: str # tcpdump packet_count: Optional[int] pcap_filter: str = "" tcpdump_command: str = "" interface: str = "" # Optional Fields device_ip_address: Optional[str] = "No IP Address set" device_mac_address: Optional[str] = None app: Optional[str] = None app_version: Optional[str] = None firmware_version: Optional[str] = None def __init__(self, device_metadata: DeviceMetadata, capture_dir: Path, /, **data: Any): super().__init__(**data) # Pycharms orders self.device_metadata = device_metadata self.capture_dir = capture_dir assert capture_dir.is_dir() # Getters def get_device_id(self) -> str: return self.device_id def get_start_time(self) -> str: return self.start_time def get_stop_time(self) -> str: return self.stop_time def get_packet_count(self) -> int: return self.packet_count def get_pcap_filter(self) -> str: return self.pcap_filter def get_device_ip_address(self) -> str: return self.device_ip_address def get_device_mac_address(self) -> str: return self.device_mac_address def get_app(self) -> str: return self.app def get_app_version(self) -> str: return self.app_version def get_firmware_version(self) -> str: return self.firmware_version def get_capture_id(self) -> UUID: return self.capture_id def get_capture_date(self) -> str: return self.capture_date def get_capfile_name(self): return self.capture_file def get_device_metadata(self) -> DeviceMetadata: return self.device_metadata def get_interface(self): return self.interface # Setters def set_capture_dir(self, capture_dir: Path): self.capture_dir = capture_dir def set_capture_file(self, capture_file: str): self.capture_file = capture_file def set_capture_date(self, capture_date: str): self.capture_date = capture_date def set_start_time(self, start_time: str): self.start_time = start_time def set_stop_time(self, stop_time: str): self.stop_time = stop_time def set_packet_count(self, packet_count: int): self.packet_count = packet_count def set_pcap_filter(self, pcap_filter: str): self.pcap_filter = pcap_filter def set_device_ip_address(self, device_ip_address: str): self.device_ip_address = device_ip_address def set_device_mac_address(self, device_mac_address: str): self.device_mac_address = device_mac_address def set_app(self, app: str): self.app = app def set_app_version(self, app_version: str): self.app_version = app_version def set_firmware_version(self, firmware_version: str): self.firmware_version = firmware_version self.device_metadata.set_device_firmware_version(firmware_version) def set_interface(self, interface: str): self.interface = interface def set_tcpdump_command(self, tcpdump_command: str): self.tcpdump_command = tcpdump_command # Other def build_capture_file_name(self): prefix = "" if self.app is None: prefix = self.device_metadata.get_device_short_name() else: assert str(self.app).strip() not in {"", " "}, f"app is not a valid name: {self.app}" prefix = self.get_app() # assert self.capture_dir is not None, f"{self.capture_dir} does not exist" filename = f"{prefix}_{str(self.capture_id)}.pcap" self.set_capture_file(filename) def save_capture_metadata_to_json(self, file_path: Path = Path(CAPTURE_METADATA_FILE)): assert self.capture_dir.is_dir(), f"capture_dir is not a directory: {self.capture_dir}" if file_path.is_file(): print(f"File {file_path} already exists, update instead.") return ReturnCodes.FILE_ALREADY_EXISTS metadata = self.model_dump_json(indent=2, exclude_unset=True, exclude_none=True) with file_path.open('w') as file: json.dump(metadata, file) return ReturnCodes.SUCCESS