import json import uuid from datetime import datetime from pathlib import Path from typing import Optional from iottb.definitions import ReturnCodes, CAPTURE_METADATA_FILE from iottb.models.device_metadata_model import DeviceMetadata import logging logger = logging.getLogger('iottbLogger.capture_metadata_model') logger.setLevel(logging.DEBUG) class CaptureMetadata: # Required Fields device_metadata: DeviceMetadata device_id: str capture_dir: Path capture_file: str # Statistics start_time: str stop_time: str # tcpdump packet_count: Optional[int] pcap_filter: str = '' tcpdump_command: str = '' interface: str = '' # Optional Fields device_ip_address: str = 'No IP Address set' device_mac_address: Optional[str] = None app: Optional[str] = None app_version: Optional[str] = None firmware_version: Optional[str] = None def __init__(self, device_metadata: DeviceMetadata, capture_dir: Path): logger.info(f'Creating CaptureMetadata model from DeviceMetadata: {device_metadata}') self.device_metadata = device_metadata self.capture_id = str(uuid.uuid4()) self.capture_date = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower() self.capture_dir = capture_dir assert capture_dir.is_dir(), f'Capture directory {capture_dir} does not exist' def build_capture_file_name(self): logger.info(f'Building capture file name') if self.app is None: logger.debug(f'No app specified') prefix = "iphone-14" #self.device_metadata.device_short_name else: logger.debug(f'App specified: {self.app}') assert str(self.app).strip() not in {'', ' '}, f'app is not a valid name: {self.app}' prefix = self.app.lower().replace(' ', '_') # assert self.capture_dir is not None, f'{self.capture_dir} does not exist' filename = f'{prefix}_{str(self.capture_id)}.pcap' logger.debug(f'Capture file name: {filename}') self.capture_file = filename def save_capture_metadata_to_json(self, file_path: Path = Path(CAPTURE_METADATA_FILE)): assert self.capture_dir.is_dir(), f'capture_dir is not a directory: {self.capture_dir}' if file_path.is_file(): print(f'File {file_path} already exists, update instead.') return ReturnCodes.FILE_ALREADY_EXISTS metadata = self.to_json(indent=2) with file_path.open('w') as file: json.dump(metadata, file) return ReturnCodes.SUCCESS def to_json(self, indent=2): # TODO: Where to validate data? logger.info(f'Converting CaptureMetadata to JSON') data = {} # List of fields from CaptureData class, if fields[key]==True, then it is a required field fields = { 'capture_id': True, # 'device_id': True, 'capture_dir': True, 'capture_file': False, 'capture_date': False, 'start_time': True, 'stop_time': True, 'packet_count': False, 'pcap_filter': False, 'tcpdump_command': False, 'interface': False, 'device_ip_address': False, 'device_mac_address': False, 'app': False, 'app_version': False, 'firmware_version': False } for field, is_mandatory in fields.items(): value = getattr(self, field, None) if value not in [None, ''] or is_mandatory: if value in [None, ''] and is_mandatory: raise ValueError(f'Field {field} is required and cannot be empty.') data[field] = str(value) if not isinstance(value, str) else value logger.debug(f'Capture metadata: {data}') return json.dumps(data, indent=indent)