import json import uuid from datetime import datetime from pathlib import Path from typing import Optional, Any, OrderedDict from iottb.definitions import ReturnCodes, CAPTURE_METADATA_FILE from iottb.models.device_metadata_model import DeviceMetadata from iottb.logger import logger class CaptureMetadata: # Required Fields device_metadata: DeviceMetadata capture_id: str = lambda: str(uuid.uuid4()) device_id: str capture_dir: Path capture_file: str capture_date: str = lambda: datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower() # Statistics start_time: str stop_time: str # tcpdump packet_count: Optional[int] pcap_filter: str = "" tcpdump_command: str = "" interface: str = "" # Optional Fields device_ip_address: str = "No IP Address set" device_mac_address: Optional[str] = None app: Optional[str] = None app_version: Optional[str] = None firmware_version: Optional[str] = None def __init__(self, device_metadata: DeviceMetadata, capture_dir: Path, /, **data: Any): logger.info(f"Creating CaptureMetadata model from DeviceMetadata: {device_metadata}") self.device_metadata = device_metadata self.capture_dir = capture_dir assert capture_dir.is_dir(), f"Capture directory {capture_dir} does not exist" def build_capture_file_name(self): logger.info(f"Building capture file name") if self.app is None: logger.debug(f"No app specified") prefix = self.device_metadata.device_short_name else: logger.debug(f"App specified: {self.app}") assert str(self.app).strip() not in {"", " "}, f"app is not a valid name: {self.app}" prefix = self.app.lower().replace(" ", "_") # assert self.capture_dir is not None, f"{self.capture_dir} does not exist" filename = f"{prefix}_{str(self.capture_id)}.pcap" logger.debug(f"Capture file name: {filename}") self.capture_file = filename def save_capture_metadata_to_json(self, file_path: Path = Path(CAPTURE_METADATA_FILE)): assert self.capture_dir.is_dir(), f"capture_dir is not a directory: {self.capture_dir}" if file_path.is_file(): print(f"File {file_path} already exists, update instead.") return ReturnCodes.FILE_ALREADY_EXISTS metadata = self.to_json(indent=2) with file_path.open('w') as file: json.dump(metadata, file) return ReturnCodes.SUCCESS def to_json(self, indent=2): # TODO: Where to validate data? logger.info(f"Converting CaptureMetadata to JSON") data = {} # List of fields from CaptureData class, if fields[key]==True, then it is a required field fields = { 'capture_id': True, # 'device_id': True, 'capture_dir': True, 'capture_file': False, 'capture_date': False, 'start_time': True, 'stop_time': True, 'packet_count': False, 'pcap_filter': False, 'tcpdump_command': False, 'interface': False, 'device_ip_address': False, 'device_mac_address': False, 'app': False, 'app_version': False, 'firmware_version': False } for field, is_mandatory in fields.items(): value = getattr(self, field, None) if value not in [None, ""] or is_mandatory: if value in [None, ""] and is_mandatory: raise ValueError(f"Field {field} is required and cannot be empty.") data[field] = str(value) if not isinstance(value, str) else value logger.debug(f"Capture metadata: {data}") return json.dumps(data, indent=indent)