diff --git a/code/iottb/models/capture_metadata_model.py b/code/iottb/models/capture_metadata_model.py index c1dc000..e605bc1 100644 --- a/code/iottb/models/capture_metadata_model.py +++ b/code/iottb/models/capture_metadata_model.py @@ -3,8 +3,6 @@ import uuid from datetime import datetime from pathlib import Path from typing import Optional, Any -from uuid import UUID - from iottb.definitions import ReturnCodes, CAPTURE_METADATA_FILE from iottb.models.device_metadata_model import DeviceMetadata @@ -14,7 +12,8 @@ from iottb.logger import logger class CaptureMetadata: # Required Fields device_metadata: DeviceMetadata - capture_id: uuid.UUID = lambda: str(uuid.uuid4()) + capture_id: str = lambda: str(uuid.uuid4()) + device_id: str capture_dir: Path capture_file: str capture_date: str = lambda: datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower() @@ -30,7 +29,7 @@ class CaptureMetadata: interface: str = "" # Optional Fields - device_ip_address: Optional[str] = "No IP Address set" + device_ip_address: str = "No IP Address set" device_mac_address: Optional[str] = None app: Optional[str] = None @@ -39,127 +38,34 @@ class CaptureMetadata: def __init__(self, device_metadata: DeviceMetadata, capture_dir: Path, /, **data: Any): logger.info(f"Creating CaptureMetadata model from DeviceMetadata: {device_metadata}") - super().__init__(**data) # Pycharms orders self.device_metadata = device_metadata + self.capture_dir = capture_dir assert capture_dir.is_dir(), f"Capture directory {capture_dir} does not exist" - # Getters - def get_device_id(self) -> str: - return self.device_metadata.get_device_id() - - def get_start_time(self) -> str: - return self.start_time - - def get_stop_time(self) -> str: - return self.stop_time - - def get_packet_count(self) -> int: - return self.packet_count - - def get_pcap_filter(self) -> str: - return self.pcap_filter - - def get_device_ip_address(self) -> str: - return self.device_ip_address - - def get_device_mac_address(self) -> str: - return self.device_mac_address - - def get_app(self) -> str: - return self.app - - def get_app_version(self) -> str: - return self.app_version - - def get_firmware_version(self) -> str: - return self.firmware_version - - def get_capture_id(self) -> UUID: - return self.capture_id - - def get_capture_date(self) -> str: - return self.capture_date - - def get_capfile_name(self): - return self.capture_file - - def get_device_metadata(self) -> DeviceMetadata: - return self.device_metadata - - def get_interface(self): - return self.interface - - # Setters - def set_capture_dir(self, capture_dir: Path): - self.capture_dir = capture_dir - - def set_capture_file(self, capture_file: str): - self.capture_file = capture_file - - def set_capture_date(self, capture_date: str): - self.capture_date = capture_date - - def set_start_time(self, start_time: str): - self.start_time = start_time - - def set_stop_time(self, stop_time: str): - self.stop_time = stop_time - - def set_packet_count(self, packet_count: int): - self.packet_count = packet_count - - def set_pcap_filter(self, pcap_filter: str): - self.pcap_filter = pcap_filter - - def set_device_ip_address(self, device_ip_address: str): - self.device_ip_address = device_ip_address - - def set_device_mac_address(self, device_mac_address: str): - self.device_mac_address = device_mac_address - - def set_app(self, app: str): - self.app = app - - def set_app_version(self, app_version: str): - self.app_version = app_version - - def set_firmware_version(self, firmware_version: str): - self.firmware_version = firmware_version - self.device_metadata.set_device_firmware_version(firmware_version) - - def set_interface(self, interface: str): - self.interface = interface - - def set_tcpdump_command(self, tcpdump_command: str): - self.tcpdump_command = tcpdump_command - - # Other - def build_capture_file_name(self): logger.info(f"Building capture file name") - prefix = "" if self.app is None: logger.debug(f"No app specified") prefix = self.device_metadata.get_device_short_name() else: logger.debug(f"App specified: {self.app}") assert str(self.app).strip() not in {"", " "}, f"app is not a valid name: {self.app}" - prefix = self.get_app() + prefix = self.app.lower().replace(" ", "_") # assert self.capture_dir is not None, f"{self.capture_dir} does not exist" filename = f"{prefix}_{str(self.capture_id)}.pcap" logger.debug(f"Capture file name: {filename}") - self.set_capture_file(filename) + self.capture_file = filename def save_capture_metadata_to_json(self, file_path: Path = Path(CAPTURE_METADATA_FILE)): assert self.capture_dir.is_dir(), f"capture_dir is not a directory: {self.capture_dir}" if file_path.is_file(): print(f"File {file_path} already exists, update instead.") return ReturnCodes.FILE_ALREADY_EXISTS - metadata = self.model_dump_json(indent=2, exclude_unset=True, exclude_none=True) + metadata = self.to_json(indent=2) with file_path.open('w') as file: json.dump(metadata, file) return ReturnCodes.SUCCESS - def model_dump_json(self, indent, exclude_unset, exclude_none): + def to_json(self, indent): pass diff --git a/code/iottb/subcommands/capture.py b/code/iottb/subcommands/capture.py index a657493..07c2b48 100644 --- a/code/iottb/subcommands/capture.py +++ b/code/iottb/subcommands/capture.py @@ -100,10 +100,10 @@ def handle_capture(args): make_capture_src_folder(capture_dir) capture_metadata = CaptureMetadata(device_data, capture_dir) - capture_metadata.set_interface(args.capture_interface) + capture_metadata.interface = args.capture_interface cmd = ['sudo', 'tcpdump', '-i', args.capture_interface] cmd = build_tcpdump_args(args, cmd, capture_metadata) - capture_metadata.set_tcpdump_command(cmd) + capture_metadata.tcpdump_command = cmd print('Executing: ' + ' '.join(cmd)) @@ -112,8 +112,8 @@ def handle_capture(args): start_time = datetime.now().strftime('%H:%M:%S') run_tcpdump(cmd) stop_time = datetime.now().strftime('%H:%M:%S') - capture_metadata.set_start_time(start_time) - capture_metadata.set_stop_time(stop_time) + capture_metadata.start_time = start_time + capture_metadata.stop_time = stop_time except KeyboardInterrupt: print("Received keyboard interrupt.") exit(ReturnCodes.ABORTED) @@ -144,15 +144,15 @@ def build_tcpdump_args(args, cmd, capture_metadata: CaptureMetadata): assert False, "Unimplemented option" if args.app_name is not None: - capture_metadata.set_app_name(args.app_name) + capture_metadata.app = args.app_name capture_metadata.build_capture_file_name() cmd.append('-w') - cmd.append(capture_metadata.get_capfile_name()) + cmd.append(capture_metadata.capture_file) if args.safe: cmd.append(f'host {args.device_ip}') # if not specified, filter 'any' implied by tcpdump - capture_metadata.set_device_ip_address(args.device_ip) + capture_metadata.device_id = args.device_ip return cmd