From 6b7353094370179ee362dd10d862974776ece976 Mon Sep 17 00:00:00 2001 From: Sebastian Lenzlinger Date: Wed, 8 May 2024 00:02:08 +0200 Subject: [PATCH 01/12] Remove getters and setters from CaptureMetadata class and refactor dependencies to use field access. --- code/iottb/models/capture_metadata_model.py | 110 ++------------------ code/iottb/subcommands/capture.py | 14 +-- 2 files changed, 15 insertions(+), 109 deletions(-) diff --git a/code/iottb/models/capture_metadata_model.py b/code/iottb/models/capture_metadata_model.py index c1dc000..e605bc1 100644 --- a/code/iottb/models/capture_metadata_model.py +++ b/code/iottb/models/capture_metadata_model.py @@ -3,8 +3,6 @@ import uuid from datetime import datetime from pathlib import Path from typing import Optional, Any -from uuid import UUID - from iottb.definitions import ReturnCodes, CAPTURE_METADATA_FILE from iottb.models.device_metadata_model import DeviceMetadata @@ -14,7 +12,8 @@ from iottb.logger import logger class CaptureMetadata: # Required Fields device_metadata: DeviceMetadata - capture_id: uuid.UUID = lambda: str(uuid.uuid4()) + capture_id: str = lambda: str(uuid.uuid4()) + device_id: str capture_dir: Path capture_file: str capture_date: str = lambda: datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower() @@ -30,7 +29,7 @@ class CaptureMetadata: interface: str = "" # Optional Fields - device_ip_address: Optional[str] = "No IP Address set" + device_ip_address: str = "No IP Address set" device_mac_address: Optional[str] = None app: Optional[str] = None @@ -39,127 +38,34 @@ class CaptureMetadata: def __init__(self, device_metadata: DeviceMetadata, capture_dir: Path, /, **data: Any): logger.info(f"Creating CaptureMetadata model from DeviceMetadata: {device_metadata}") - super().__init__(**data) # Pycharms orders self.device_metadata = device_metadata + self.capture_dir = capture_dir assert capture_dir.is_dir(), f"Capture directory {capture_dir} does not exist" - # Getters - def get_device_id(self) -> str: - return self.device_metadata.get_device_id() - - def get_start_time(self) -> str: - return self.start_time - - def get_stop_time(self) -> str: - return self.stop_time - - def get_packet_count(self) -> int: - return self.packet_count - - def get_pcap_filter(self) -> str: - return self.pcap_filter - - def get_device_ip_address(self) -> str: - return self.device_ip_address - - def get_device_mac_address(self) -> str: - return self.device_mac_address - - def get_app(self) -> str: - return self.app - - def get_app_version(self) -> str: - return self.app_version - - def get_firmware_version(self) -> str: - return self.firmware_version - - def get_capture_id(self) -> UUID: - return self.capture_id - - def get_capture_date(self) -> str: - return self.capture_date - - def get_capfile_name(self): - return self.capture_file - - def get_device_metadata(self) -> DeviceMetadata: - return self.device_metadata - - def get_interface(self): - return self.interface - - # Setters - def set_capture_dir(self, capture_dir: Path): - self.capture_dir = capture_dir - - def set_capture_file(self, capture_file: str): - self.capture_file = capture_file - - def set_capture_date(self, capture_date: str): - self.capture_date = capture_date - - def set_start_time(self, start_time: str): - self.start_time = start_time - - def set_stop_time(self, stop_time: str): - self.stop_time = stop_time - - def set_packet_count(self, packet_count: int): - self.packet_count = packet_count - - def set_pcap_filter(self, pcap_filter: str): - self.pcap_filter = pcap_filter - - def set_device_ip_address(self, device_ip_address: str): - self.device_ip_address = device_ip_address - - def set_device_mac_address(self, device_mac_address: str): - self.device_mac_address = device_mac_address - - def set_app(self, app: str): - self.app = app - - def set_app_version(self, app_version: str): - self.app_version = app_version - - def set_firmware_version(self, firmware_version: str): - self.firmware_version = firmware_version - self.device_metadata.set_device_firmware_version(firmware_version) - - def set_interface(self, interface: str): - self.interface = interface - - def set_tcpdump_command(self, tcpdump_command: str): - self.tcpdump_command = tcpdump_command - - # Other - def build_capture_file_name(self): logger.info(f"Building capture file name") - prefix = "" if self.app is None: logger.debug(f"No app specified") prefix = self.device_metadata.get_device_short_name() else: logger.debug(f"App specified: {self.app}") assert str(self.app).strip() not in {"", " "}, f"app is not a valid name: {self.app}" - prefix = self.get_app() + prefix = self.app.lower().replace(" ", "_") # assert self.capture_dir is not None, f"{self.capture_dir} does not exist" filename = f"{prefix}_{str(self.capture_id)}.pcap" logger.debug(f"Capture file name: {filename}") - self.set_capture_file(filename) + self.capture_file = filename def save_capture_metadata_to_json(self, file_path: Path = Path(CAPTURE_METADATA_FILE)): assert self.capture_dir.is_dir(), f"capture_dir is not a directory: {self.capture_dir}" if file_path.is_file(): print(f"File {file_path} already exists, update instead.") return ReturnCodes.FILE_ALREADY_EXISTS - metadata = self.model_dump_json(indent=2, exclude_unset=True, exclude_none=True) + metadata = self.to_json(indent=2) with file_path.open('w') as file: json.dump(metadata, file) return ReturnCodes.SUCCESS - def model_dump_json(self, indent, exclude_unset, exclude_none): + def to_json(self, indent): pass diff --git a/code/iottb/subcommands/capture.py b/code/iottb/subcommands/capture.py index a657493..07c2b48 100644 --- a/code/iottb/subcommands/capture.py +++ b/code/iottb/subcommands/capture.py @@ -100,10 +100,10 @@ def handle_capture(args): make_capture_src_folder(capture_dir) capture_metadata = CaptureMetadata(device_data, capture_dir) - capture_metadata.set_interface(args.capture_interface) + capture_metadata.interface = args.capture_interface cmd = ['sudo', 'tcpdump', '-i', args.capture_interface] cmd = build_tcpdump_args(args, cmd, capture_metadata) - capture_metadata.set_tcpdump_command(cmd) + capture_metadata.tcpdump_command = cmd print('Executing: ' + ' '.join(cmd)) @@ -112,8 +112,8 @@ def handle_capture(args): start_time = datetime.now().strftime('%H:%M:%S') run_tcpdump(cmd) stop_time = datetime.now().strftime('%H:%M:%S') - capture_metadata.set_start_time(start_time) - capture_metadata.set_stop_time(stop_time) + capture_metadata.start_time = start_time + capture_metadata.stop_time = stop_time except KeyboardInterrupt: print("Received keyboard interrupt.") exit(ReturnCodes.ABORTED) @@ -144,15 +144,15 @@ def build_tcpdump_args(args, cmd, capture_metadata: CaptureMetadata): assert False, "Unimplemented option" if args.app_name is not None: - capture_metadata.set_app_name(args.app_name) + capture_metadata.app = args.app_name capture_metadata.build_capture_file_name() cmd.append('-w') - cmd.append(capture_metadata.get_capfile_name()) + cmd.append(capture_metadata.capture_file) if args.safe: cmd.append(f'host {args.device_ip}') # if not specified, filter 'any' implied by tcpdump - capture_metadata.set_device_ip_address(args.device_ip) + capture_metadata.device_id = args.device_ip return cmd From 799414ad39181af1c05fb1838bce8429a514528e Mon Sep 17 00:00:00 2001 From: Sebastian Lenzlinger Date: Wed, 8 May 2024 00:03:18 +0200 Subject: [PATCH 02/12] Remove getters and setters from DeviceMetadata class and change dependencies to use field access. --- code/iottb/models/capture_metadata_model.py | 2 +- code/iottb/models/device_metadata_model.py | 44 --------------------- 2 files changed, 1 insertion(+), 45 deletions(-) diff --git a/code/iottb/models/capture_metadata_model.py b/code/iottb/models/capture_metadata_model.py index e605bc1..4f11325 100644 --- a/code/iottb/models/capture_metadata_model.py +++ b/code/iottb/models/capture_metadata_model.py @@ -47,7 +47,7 @@ class CaptureMetadata: logger.info(f"Building capture file name") if self.app is None: logger.debug(f"No app specified") - prefix = self.device_metadata.get_device_short_name() + prefix = self.device_metadata.device_short_name else: logger.debug(f"App specified: {self.app}") assert str(self.app).strip() not in {"", " "}, f"app is not a valid name: {self.app}" diff --git a/code/iottb/models/device_metadata_model.py b/code/iottb/models/device_metadata_model.py index eabe874..2359e4b 100644 --- a/code/iottb/models/device_metadata_model.py +++ b/code/iottb/models/device_metadata_model.py @@ -40,50 +40,6 @@ class DeviceMetadata: logger.debug(f"Device root dir: {device_root_dir}") logger.info(f"Initialized DeviceMetadata model: {device_name}") - def get_device_id(self) -> str: - return self.device_id - - def get_device_name(self) -> str: - return self.device_name - - def get_device_short_name(self) -> str: - return self.device_short_name - - def get_device_type(self) -> str: - return self.device_type - - def get_device_serial_number(self) -> str: - return self.device_serial_number - - def get_device_firmware_version(self) -> str: - return self.device_firmware_version - - def get_date_updated(self) -> str: - return self.date_updated - - def get_capture_files(self) -> List[str]: - return self.capture_files - - def get_aliases(self) -> List[str]: - return self.aliases - - def set_device_type(self, device_type: str) -> None: - self.device_type = device_type - self.date_updated = datetime.now().strftime('%d-%m-%YT%H:%M:%S') - - def set_device_serial_number(self, device_serial_number: str) -> None: - self.device_serial_number = device_serial_number - self.date_updated = datetime.now().strftime('%d-%m-%YT%H:%M:%S') - - def set_device_firmware_version(self, device_firmware_version: str) -> None: - self.device_firmware_version = device_firmware_version - self.date_updated = datetime.now().strftime('%d-%m-%YT%H:%M:%S') - - def set_device_name(self, device_name: str) -> None: - self.device_name = device_name - self.device_short_name = device_name.lower().replace(" ", "_") - self.date_updated = datetime.now().strftime('%d-%m-%YT%H:%M:%S') - @classmethod def load_from_json(cls, device_file_path: Path): logger.info(f"Loading DeviceMetadata from JSON file: {device_file_path}") From 2681ee9a8e9f24662fc3129e3dd582ec44d275ef Mon Sep 17 00:00:00 2001 From: Sebastian Lenzlinger Date: Wed, 8 May 2024 00:31:25 +0200 Subject: [PATCH 03/12] Refactor function name to reflect that not using pydantic anymore. --- archive/metadata_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/archive/metadata_utils.py b/archive/metadata_utils.py index c0cc93e..48b3d4e 100644 --- a/archive/metadata_utils.py +++ b/archive/metadata_utils.py @@ -48,7 +48,7 @@ def get_device_metadata(file_path: Path) -> DeviceMetadata | None: with file_path.open('r') as f: device_metadata_json = json.load(f) try: - device_metadata = DeviceMetadata.model_validate_json(device_metadata_json) + device_metadata = DeviceMetadata.from_json(device_metadata_json) return device_metadata except ValueError as e: print(f"Validation error for device metadata: {e}") From cb1ad33cae140cea5525d4d7f71b4b3cd0fa2311 Mon Sep 17 00:00:00 2001 From: Sebastian Lenzlinger Date: Wed, 8 May 2024 00:31:57 +0200 Subject: [PATCH 04/12] Implement custom to_json function for CaptureMetadata --- code/iottb/models/capture_metadata_model.py | 37 +++++++++++++++++++-- 1 file changed, 34 insertions(+), 3 deletions(-) diff --git a/code/iottb/models/capture_metadata_model.py b/code/iottb/models/capture_metadata_model.py index 4f11325..90987ef 100644 --- a/code/iottb/models/capture_metadata_model.py +++ b/code/iottb/models/capture_metadata_model.py @@ -2,7 +2,7 @@ import json import uuid from datetime import datetime from pathlib import Path -from typing import Optional, Any +from typing import Optional, Any, OrderedDict from iottb.definitions import ReturnCodes, CAPTURE_METADATA_FILE from iottb.models.device_metadata_model import DeviceMetadata @@ -67,5 +67,36 @@ class CaptureMetadata: json.dump(metadata, file) return ReturnCodes.SUCCESS - def to_json(self, indent): - pass + def to_json(self, indent=2): + # TODO: Where to validate data? + logger.info(f"Converting CaptureMetadata to JSON") + data = {} + + # List of fields from CaptureData class, if fields[key]==True, then it is a required field + fields = { + 'capture_id': True, # + 'device_id': True, + 'capture_dir': True, + 'capture_file': False, + 'capture_date': False, + 'start_time': True, + 'stop_time': True, + 'packet_count': False, + 'pcap_filter': False, + 'tcpdump_command': False, + 'interface': False, + 'device_ip_address': False, + 'device_mac_address': False, + 'app': False, + 'app_version': False, + 'firmware_version': False + } + + for field, is_mandatory in fields.items(): + value = getattr(self, field, None) + if value not in [None, ""] or is_mandatory: + if value in [None, ""] and is_mandatory: + raise ValueError(f"Field {field} is required and cannot be empty.") + data[field] = str(value) if not isinstance(value, str) else value + logger.debug(f"Capture metadata: {data}") + return json.dumps(data, indent=indent) From 27ae736f11a71641d3cd96c38dcc744866eb489c Mon Sep 17 00:00:00 2001 From: Sebastian Lenzlinger Date: Wed, 8 May 2024 00:40:29 +0200 Subject: [PATCH 05/12] Implement custom to_json function for DeviceMetadata --- code/iottb/models/device_metadata_model.py | 34 ++++++++++++++++++---- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/code/iottb/models/device_metadata_model.py b/code/iottb/models/device_metadata_model.py index 2359e4b..7e3dd79 100644 --- a/code/iottb/models/device_metadata_model.py +++ b/code/iottb/models/device_metadata_model.py @@ -49,7 +49,7 @@ class DeviceMetadata: with device_meta_filename.open('r') as file: metadata_json = json.load(file) - metadata_model_obj = cls.model_validate_json(metadata_json) + metadata_model_obj = cls.from_json(metadata_json) return metadata_model_obj def save_to_json(self, file_path: Path): @@ -57,18 +57,42 @@ class DeviceMetadata: if file_path.is_file(): print(f"File {file_path} already exists, update instead.") return ReturnCodes.FILE_ALREADY_EXISTS - metadata = self.model_dump_json(indent=2) + metadata = self.to_json(indent=2) with file_path.open('w') as file: json.dump(metadata, file) return ReturnCodes.SUCCESS @classmethod - def model_validate_json(cls, metadata_json): + def from_json(cls, metadata_json): pass - def model_dump_json(self, indent): - pass + def to_json(self, indent=2): + # TODO: atm almost exact copy as in CaptureMetadata + data = {} + + fields = { + "device_name": True, + "device_short_name": True, + "device_id": True, + "date_created": True, + "device_root_path": True, + "aliases": False, + "device_type": False, + "device_serial_number": False, + "device_firmware_version": False, + "date_updated": True, + "capture_files": False, + } + + for field, is_mandatory in fields.items(): + value = getattr(self, field, None) + if value not in [None, ""] or is_mandatory: + if value in [None, ""] and is_mandatory: + raise ValueError(f"Field {field} is required and cannot be empty.") + data[field] = str(value) if not isinstance(value, Path) else value + logger.debug(f"Device metadata: {data}") + return json.dumps(data, indent=indent) def dir_contains_device_metadata(dir_path: Path): From 73771be70d184e50490848e8797d17f04e9fdc6a Mon Sep 17 00:00:00 2001 From: Sebastian Lenzlinger Date: Wed, 8 May 2024 00:52:25 +0200 Subject: [PATCH 06/12] Add preliminary implementation to load DeviceMetadata from data. --- code/iottb/models/device_metadata_model.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/code/iottb/models/device_metadata_model.py b/code/iottb/models/device_metadata_model.py index 7e3dd79..72d063c 100644 --- a/code/iottb/models/device_metadata_model.py +++ b/code/iottb/models/device_metadata_model.py @@ -7,6 +7,7 @@ from typing import Optional, List, Any # iottb modules from iottb.definitions import ReturnCodes, DEVICE_METADATA_FILE from iottb.logger import logger + # 3rd party libs IMMUTABLE_FIELDS = {"device_name", "device_short_name", "device_id", "date_created"} @@ -62,10 +63,10 @@ class DeviceMetadata: json.dump(metadata, file) return ReturnCodes.SUCCESS - @classmethod def from_json(cls, metadata_json): - pass + if isinstance(metadata_json, dict): + return DeviceMetadata(**metadata_json) def to_json(self, indent=2): # TODO: atm almost exact copy as in CaptureMetadata From a21312ee61a781d52a22aa4cbf96337b78056414 Mon Sep 17 00:00:00 2001 From: Sebastian Lenzlinger Date: Wed, 8 May 2024 01:38:53 +0200 Subject: [PATCH 07/12] Corrections --- code/iottb/logger.py | 3 +-- code/iottb/models/device_metadata_model.py | 25 +++++++++++-------- code/iottb/subcommands/add_device.py | 5 ++-- .../test_initialize_device_root_dir.py | 0 code/tests/subcommands/test_sniff.py | 0 5 files changed, 18 insertions(+), 15 deletions(-) delete mode 100644 code/tests/subcommands/test_initialize_device_root_dir.py delete mode 100644 code/tests/subcommands/test_sniff.py diff --git a/code/iottb/logger.py b/code/iottb/logger.py index 243d467..2d60e6d 100644 --- a/code/iottb/logger.py +++ b/code/iottb/logger.py @@ -2,7 +2,6 @@ import logging import sys from logging.handlers import RotatingFileHandler - def setup_logging(): logger_obj = logging.getLogger('iottbLogger') logger_obj.setLevel(logging.DEBUG) @@ -14,7 +13,7 @@ def setup_logging(): console_handler.setLevel(logging.DEBUG) file_fmt = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') - console_fmt = logging.Formatter('%(name)s - %(name)s - %(levelname)s - %(message)s') + console_fmt = logging.Formatter('%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s') file_handler.setFormatter(file_fmt) console_handler.setFormatter(console_fmt) diff --git a/code/iottb/models/device_metadata_model.py b/code/iottb/models/device_metadata_model.py index 72d063c..67bda78 100644 --- a/code/iottb/models/device_metadata_model.py +++ b/code/iottb/models/device_metadata_model.py @@ -2,12 +2,11 @@ import json import uuid from datetime import datetime from pathlib import Path -from typing import Optional, List, Any +from typing import Optional, List # iottb modules from iottb.definitions import ReturnCodes, DEVICE_METADATA_FILE from iottb.logger import logger - # 3rd party libs IMMUTABLE_FIELDS = {"device_name", "device_short_name", "device_id", "date_created"} @@ -17,8 +16,8 @@ class DeviceMetadata: # Required fields device_name: str device_short_name: str - device_id: str = lambda: str(uuid.uuid4()) - date_created: str = lambda: datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower() + device_id: str + date_created: str device_root_path: Path # Optional Fields @@ -30,15 +29,18 @@ class DeviceMetadata: capture_files: Optional[List[str]] = [] - def __init__(self, device_name: str, device_root_dir: Path): + def __init__(self, device_name: str, device_root_path: Path): self.device_name = device_name self.device_short_name = device_name.lower().replace(" ", "_") - # assert dir_contains_device_metadata(device_root_dir), \ - # f"Directory {device_root_dir} is missing a {DEVICE_METADATA_FILE} file" - self.device_root_dir = device_root_dir + self.device_id = str(uuid.uuid4()) + self.date_created = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower() + self.device_root_path = device_root_path + if not self.device_root_path or not self.device_root_path.is_dir(): + logger.error(f"Invalid device root path: {device_root_path}") + raise ValueError(f"Invalid device root path: {device_root_path}") logger.debug(f"Device name: {device_name}") logger.debug(f"Device short_name: {self.device_short_name}") - logger.debug(f"Device root dir: {device_root_dir}") + logger.debug(f"Device root dir: {device_root_path}") logger.info(f"Initialized DeviceMetadata model: {device_name}") @classmethod @@ -82,7 +84,7 @@ class DeviceMetadata: "device_type": False, "device_serial_number": False, "device_firmware_version": False, - "date_updated": True, + "date_updated": False, "capture_files": False, } @@ -90,8 +92,9 @@ class DeviceMetadata: value = getattr(self, field, None) if value not in [None, ""] or is_mandatory: if value in [None, ""] and is_mandatory: + logger.debug(f"Mandatory field {field}: {value}") raise ValueError(f"Field {field} is required and cannot be empty.") - data[field] = str(value) if not isinstance(value, Path) else value + data[field] = str(value) if not isinstance(value, str) else value logger.debug(f"Device metadata: {data}") return json.dumps(data, indent=indent) diff --git a/code/iottb/subcommands/add_device.py b/code/iottb/subcommands/add_device.py index 4dd189d..3cf1b4a 100644 --- a/code/iottb/subcommands/add_device.py +++ b/code/iottb/subcommands/add_device.py @@ -29,11 +29,12 @@ def handle_add(args): metadata = DeviceMetadata(device_name, args.root_dir) file_path = args.root_dir / DEVICE_METADATA_FILE - response = input(f"Confirm device metadata: {metadata.model_dump()} [y/N]") + + response = input(f"Confirm device metadata: {metadata.to_json()} [y/N]") if response.lower() not in definitions.AFFIRMATIVE_USER_RESPONSE.add(""): configure_metadata() assert False, "TODO implement dynamic setup" - + args.root_dir.mkdir(parents=True, exist_ok=True) # else metadata.save_to_file will fail TODO: unclear if metadata.save_to_json(file_path) == ReturnCodes.FILE_ALREADY_EXISTS: print("Directory already contains a device metadata file. Aborting operation.") return ReturnCodes.ABORTED diff --git a/code/tests/subcommands/test_initialize_device_root_dir.py b/code/tests/subcommands/test_initialize_device_root_dir.py deleted file mode 100644 index e69de29..0000000 diff --git a/code/tests/subcommands/test_sniff.py b/code/tests/subcommands/test_sniff.py deleted file mode 100644 index e69de29..0000000 From 266a669e5e4b2147ea187edd494ad75cf2efd57d Mon Sep 17 00:00:00 2001 From: Sebastian Lenzlinger Date: Wed, 8 May 2024 02:36:07 +0200 Subject: [PATCH 08/12] Add test for device metadata file creation and fixes until test passed. --- code/iottb/logger.py | 1 + code/iottb/subcommands/add_device.py | 36 ++++++++++++++-------- code/tests/subcommands/test_add_device.py | 37 +++++++++++++++++++++++ 3 files changed, 61 insertions(+), 13 deletions(-) create mode 100644 code/tests/subcommands/test_add_device.py diff --git a/code/iottb/logger.py b/code/iottb/logger.py index 2d60e6d..ea6add2 100644 --- a/code/iottb/logger.py +++ b/code/iottb/logger.py @@ -2,6 +2,7 @@ import logging import sys from logging.handlers import RotatingFileHandler + def setup_logging(): logger_obj = logging.getLogger('iottbLogger') logger_obj.setLevel(logging.DEBUG) diff --git a/code/iottb/subcommands/add_device.py b/code/iottb/subcommands/add_device.py index 3cf1b4a..62c74bf 100644 --- a/code/iottb/subcommands/add_device.py +++ b/code/iottb/subcommands/add_device.py @@ -19,26 +19,36 @@ def setup_init_device_root_parser(subparsers): def handle_add(args): logger.info(f"Add device handler called with args {args}") + args.root_dir.mkdir(parents=True, exist_ok=True) # else metadata.save_to_file will fail TODO: unclear what to assume + if args.guided: - logger.debug("Guided setup") + logger.debug("begin guided setup") metadata = guided_setup(args.root_dir) + logger.debug("guided setup complete") else: logger.debug("Setup through passed args: setup") - device_name = args.name - args.root_dir.mkdir(parents=True, exist_ok=True) - metadata = DeviceMetadata(device_name, args.root_dir) + if not args.name: + logger.error("No device name specified with unguided setup.") + return ReturnCodes.ERROR + metadata = DeviceMetadata(args.name, args.root_dir) file_path = args.root_dir / DEVICE_METADATA_FILE - - response = input(f"Confirm device metadata: {metadata.to_json()} [y/N]") - if response.lower() not in definitions.AFFIRMATIVE_USER_RESPONSE.add(""): - configure_metadata() - assert False, "TODO implement dynamic setup" - args.root_dir.mkdir(parents=True, exist_ok=True) # else metadata.save_to_file will fail TODO: unclear - if metadata.save_to_json(file_path) == ReturnCodes.FILE_ALREADY_EXISTS: - print("Directory already contains a device metadata file. Aborting operation.") + if file_path.exists(): + print("Directory already contains a metadata file. Aborting.") return ReturnCodes.ABORTED - assert Path(file_path).exists(), f"{file_path} does not exist" + serialized_metadata = metadata.to_json() + response = input(f"Confirm device metadata: {serialized_metadata} [y/N]") + logger.debug(f"response: {response}") + if response not in definitions.AFFIRMATIVE_USER_RESPONSE: + print("Adding device aborted by user.") + return ReturnCodes.ABORTED + + logger.debug(f"Device metadata file {file_path}") + if metadata.save_to_json(file_path) == ReturnCodes.FILE_ALREADY_EXISTS: + logger.error("File exists after checking, which should not happen.") + return ReturnCodes.ABORTED + + print("Device metadata successfully created.") return ReturnCodes.SUCCESS diff --git a/code/tests/subcommands/test_add_device.py b/code/tests/subcommands/test_add_device.py new file mode 100644 index 0000000..a7fedcf --- /dev/null +++ b/code/tests/subcommands/test_add_device.py @@ -0,0 +1,37 @@ +import sys +import unittest +from io import StringIO +from unittest.mock import patch, MagicMock +from pathlib import Path +from iottb.definitions import DEVICE_METADATA_FILE +import shutil +from iottb.__main__ import main + + +class TestDeviceSetup(unittest.TestCase): + def setUp(self): + self.test_dir = Path("/tmp/iottbtest/test_add_device") + self.test_dir.mkdir(parents=True, exist_ok=True) + # self.captured_output = StringIO() + # sys.stdout = self.captured_output + + def tearDown(self): + # shutil.rmtree(str(self.test_dir)) + for item in self.test_dir.iterdir(): + if item.is_dir(): + item.rmdir() + else: + item.unlink() + self.test_dir.rmdir() + # sys.stdout = sys.__stdout__ + + @patch("builtins.input", side_effect=["iPhone 14", "y", "y"]) + def test_guided_device_setup(self, mock_input): + sys.argv = ['__main__.py', 'add', '--root_dir', str(self.test_dir), '--guided'] + main() + expected_file = self.test_dir / DEVICE_METADATA_FILE + self.assertTrue(expected_file.exists()), f"Expected file not created: {expected_file}" + + +if __name__ == '__main__': + unittest.main() From e569eb3e5b62771eb93c6bebabe2751cfe11f0d8 Mon Sep 17 00:00:00 2001 From: Sebastian Lenzlinger Date: Wed, 8 May 2024 02:46:14 +0200 Subject: [PATCH 09/12] Replace all double quotes strings with single quoted strings. --- archive/functions_dump.py | 36 +++++----- archive/metadata.py | 4 +- archive/metadata_utils.py | 24 +++---- code/iottb/__main__.py | 10 +-- code/iottb/definitions.py | 16 ++--- code/iottb/models/capture_metadata_model.py | 42 ++++++------ code/iottb/models/device_metadata_model.py | 60 ++++++++--------- code/iottb/subcommands/add_device.py | 54 +++++++-------- code/iottb/subcommands/capture.py | 74 ++++++++++----------- code/iottb/utils/capture_metadata_utils.py | 20 +++--- code/iottb/utils/capture_utils.py | 12 ++-- code/iottb/utils/tcpdump_utils.py | 12 ++-- code/iottb/utils/utils.py | 2 +- code/tests/subcommands/test_add_device.py | 6 +- 14 files changed, 186 insertions(+), 186 deletions(-) diff --git a/archive/functions_dump.py b/archive/functions_dump.py index 856146a..4d94ad9 100644 --- a/archive/functions_dump.py +++ b/archive/functions_dump.py @@ -2,31 +2,31 @@ def setup_sniff_tcpdump_parser(parser_sniff): # arguments which will be passed to tcpdump parser_sniff_tcpdump = parser_sniff.add_argument_group('tcpdump arguments') # TODO: tcpdump_parser.add_argument('-c', '--count', re) - parser_sniff_tcpdump.add_argument("-a", "--ip-address=", help="IP address of the device to sniff", dest="device_ip") - parser_sniff_tcpdump.add_argument("-i", "--interface=", help="Interface of the capture device.", dest="capture_interface",default="") - parser_sniff_tcpdump.add_argument("-I", "--monitor-mode", help="Put interface into monitor mode", - action="store_true") - parser_sniff_tcpdump.add_argument("-n", help="Deactivate name resolution. Option is set by default.", - action="store_true") - parser_sniff_tcpdump.add_argument("-#", "--number", - help="Print packet number at beginning of line. Set by default.", - action="store_true") - parser_sniff_tcpdump.add_argument("-e", help="Print link layer headers. Option is set by default.", - action="store_true") - parser_sniff_tcpdump.add_argument("-t", action="count", default=0, - help="Please see tcpdump manual for details. Unused by default.") + parser_sniff_tcpdump.add_argument('-a', '--ip-address=', help='IP address of the device to sniff', dest='device_ip') + parser_sniff_tcpdump.add_argument('-i', '--interface=', help='Interface of the capture device.', dest='capture_interface',default='') + parser_sniff_tcpdump.add_argument('-I', '--monitor-mode', help='Put interface into monitor mode', + action='store_true') + parser_sniff_tcpdump.add_argument('-n', help='Deactivate name resolution. Option is set by default.', + action='store_true') + parser_sniff_tcpdump.add_argument('-#', '--number', + help='Print packet number at beginning of line. Set by default.', + action='store_true') + parser_sniff_tcpdump.add_argument('-e', help='Print link layer headers. Option is set by default.', + action='store_true') + parser_sniff_tcpdump.add_argument('-t', action='count', default=0, + help='Please see tcpdump manual for details. Unused by default.') def setup_sniff_parser(subparsers): - # create parser for "sniff" command - parser_sniff = subparsers.add_parser("sniff", help="Start tcpdump capture.") + # create parser for 'sniff' command + parser_sniff = subparsers.add_parser('sniff', help='Start tcpdump capture.') setup_sniff_tcpdump_parser(parser_sniff) setup_pcap_filter_parser(parser_sniff) cap_size_group = parser_sniff.add_mutually_exclusive_group(required=True) - cap_size_group.add_argument("-c", "--count", type=int, help="Number of packets to capture.", default=0) - cap_size_group.add_argument("--mins", type=int, help="Time in minutes to capture.", default=60) + cap_size_group.add_argument('-c', '--count', type=int, help='Number of packets to capture.', default=0) + cap_size_group.add_argument('--mins', type=int, help='Time in minutes to capture.', default=60) def setup_pcap_filter_parser(parser_sniff): - parser_pcap_filter = parser_sniff.add_argument_parser("pcap-filter expression") + parser_pcap_filter = parser_sniff.add_argument_parser('pcap-filter expression') pass diff --git a/archive/metadata.py b/archive/metadata.py index eeb2888..b537017 100644 --- a/archive/metadata.py +++ b/archive/metadata.py @@ -15,5 +15,5 @@ class Metadata: def create_metadata(filename, unique_id, device_details): - date_string = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S") - meta_filename = f"meta_{date_string}_{unique_id}.json" + date_string = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S') + meta_filename = f'meta_{date_string}_{unique_id}.json' diff --git a/archive/metadata_utils.py b/archive/metadata_utils.py index 48b3d4e..25d5ca0 100644 --- a/archive/metadata_utils.py +++ b/archive/metadata_utils.py @@ -8,33 +8,33 @@ from iottb.definitions import DEVICE_METADATA_FILE def write_device_metadata_to_file(metadata: DeviceMetadata, device_path: Path): - """Write the device metadata to a JSON file in the specified directory.""" - meta_file_path = device_path / "meta.json" + '''Write the device metadata to a JSON file in the specified directory.''' + meta_file_path = device_path / 'meta.json' meta_file_path.write_text(metadata.json(indent=2)) def confirm_device_metadata(metadata: DeviceMetadata) -> bool: - """Display device metadata for user confirmation.""" + '''Display device metadata for user confirmation.''' print(metadata.json(indent=2)) - return input("Confirm device metadata? (y/n): ").strip().lower() == 'y' + return input('Confirm device metadata? (y/n): ').strip().lower() == 'y' def get_device_metadata_from_user() -> DeviceMetadata: - """Prompt the user to enter device details and return a populated DeviceMetadata object.""" - device_name = input("Device name: ") - device_short_name = device_name.lower().replace(" ", "-") + '''Prompt the user to enter device details and return a populated DeviceMetadata object.''' + device_name = input('Device name: ') + device_short_name = device_name.lower().replace(' ', '-') return DeviceMetadata(device_name=device_name, device_short_name=device_short_name) def initialize_device_root_dir(device_name: str) -> Path: - """Create and return the path for the device directory.""" + '''Create and return the path for the device directory.''' device_path = Path.cwd() / device_name device_path.mkdir(exist_ok=True) return device_path def write_metadata(metadata: BaseModel, device_name: str): - """Write device metadata to a JSON file.""" + '''Write device metadata to a JSON file.''' meta_path = Path.cwd() / device_name / DEVICE_METADATA_FILE meta_path.parent.mkdir(parents=True, exist_ok=True) with meta_path.open('w') as f: @@ -42,7 +42,7 @@ def write_metadata(metadata: BaseModel, device_name: str): def get_device_metadata(file_path: Path) -> DeviceMetadata | None: - """Fetch device metadata from a JSON file.""" + '''Fetch device metadata from a JSON file.''' if dev_metadata_exists(file_path): with file_path.open('r') as f: @@ -51,10 +51,10 @@ def get_device_metadata(file_path: Path) -> DeviceMetadata | None: device_metadata = DeviceMetadata.from_json(device_metadata_json) return device_metadata except ValueError as e: - print(f"Validation error for device metadata: {e}") + print(f'Validation error for device metadata: {e}') else: # TODO Decide what to do (e.g. search for file etc) - print(f"No device metadata at {file_path}") + print(f'No device metadata at {file_path}') return None diff --git a/code/iottb/__main__.py b/code/iottb/__main__.py index 70fe0d6..d647cb0 100644 --- a/code/iottb/__main__.py +++ b/code/iottb/__main__.py @@ -10,8 +10,8 @@ from iottb.subcommands.add_device import setup_init_device_root_parser ###################### def setup_argparse(): # create top level parser - root_parser = argparse.ArgumentParser(prog="iottb") - subparsers = root_parser.add_subparsers(title="subcommands", required=True, dest="command") + root_parser = argparse.ArgumentParser(prog='iottb') + subparsers = root_parser.add_subparsers(title='subcommands', required=True, dest='command') setup_capture_parser(subparsers) setup_init_device_root_parser(subparsers) @@ -27,12 +27,12 @@ def main(): try: args.func(args) except KeyboardInterrupt: - print("Received keyboard interrupt. Exiting...") + print('Received keyboard interrupt. Exiting...') exit(1) except Exception as e: - print(f"Error: {e}") + print(f'Error: {e}') # create_capture_directory(args.device_name) -if __name__ == "__main__": +if __name__ == '__main__': main() diff --git a/code/iottb/definitions.py b/code/iottb/definitions.py index 507e1a0..3e6e6a9 100644 --- a/code/iottb/definitions.py +++ b/code/iottb/definitions.py @@ -1,16 +1,16 @@ from datetime import datetime from enum import Flag, unique, global_enum -DEVICE_METADATA_FILE = "device_metadata.json" -CAPTURE_METADATA_FILE = "capture_metadata.json" -TODAY_DATE_STRING = datetime.now().strftime("%d%b%Y").lower() # TODO convert to function in utils or so +DEVICE_METADATA_FILE = 'device_metadata.json' +CAPTURE_METADATA_FILE = 'capture_metadata.json' +TODAY_DATE_STRING = datetime.now().strftime('%d%b%Y').lower() # TODO convert to function in utils or so -CAPTURE_FOLDER_BASENAME = "capture_###" +CAPTURE_FOLDER_BASENAME = 'capture_###' -AFFIRMATIVE_USER_RESPONSE = {"yes", "y", "true", "Y", "Yes", "YES"} -NEGATIVE_USER_RESPONSE = {"no", "n", "N", "No"} -YES_DEFAULT = AFFIRMATIVE_USER_RESPONSE.union({"", " "}) -NO_DEFAULT = NEGATIVE_USER_RESPONSE.union({"", " "}) +AFFIRMATIVE_USER_RESPONSE = {'yes', 'y', 'true', 'Y', 'Yes', 'YES'} +NEGATIVE_USER_RESPONSE = {'no', 'n', 'N', 'No'} +YES_DEFAULT = AFFIRMATIVE_USER_RESPONSE.union({'', ' '}) +NO_DEFAULT = NEGATIVE_USER_RESPONSE.union({'', ' '}) @unique diff --git a/code/iottb/models/capture_metadata_model.py b/code/iottb/models/capture_metadata_model.py index 90987ef..ba2fcd1 100644 --- a/code/iottb/models/capture_metadata_model.py +++ b/code/iottb/models/capture_metadata_model.py @@ -24,12 +24,12 @@ class CaptureMetadata: # tcpdump packet_count: Optional[int] - pcap_filter: str = "" - tcpdump_command: str = "" - interface: str = "" + pcap_filter: str = '' + tcpdump_command: str = '' + interface: str = '' # Optional Fields - device_ip_address: str = "No IP Address set" + device_ip_address: str = 'No IP Address set' device_mac_address: Optional[str] = None app: Optional[str] = None @@ -37,30 +37,30 @@ class CaptureMetadata: firmware_version: Optional[str] = None def __init__(self, device_metadata: DeviceMetadata, capture_dir: Path, /, **data: Any): - logger.info(f"Creating CaptureMetadata model from DeviceMetadata: {device_metadata}") + logger.info(f'Creating CaptureMetadata model from DeviceMetadata: {device_metadata}') self.device_metadata = device_metadata self.capture_dir = capture_dir - assert capture_dir.is_dir(), f"Capture directory {capture_dir} does not exist" + assert capture_dir.is_dir(), f'Capture directory {capture_dir} does not exist' def build_capture_file_name(self): - logger.info(f"Building capture file name") + logger.info(f'Building capture file name') if self.app is None: - logger.debug(f"No app specified") + logger.debug(f'No app specified') prefix = self.device_metadata.device_short_name else: - logger.debug(f"App specified: {self.app}") - assert str(self.app).strip() not in {"", " "}, f"app is not a valid name: {self.app}" - prefix = self.app.lower().replace(" ", "_") - # assert self.capture_dir is not None, f"{self.capture_dir} does not exist" - filename = f"{prefix}_{str(self.capture_id)}.pcap" - logger.debug(f"Capture file name: {filename}") + logger.debug(f'App specified: {self.app}') + assert str(self.app).strip() not in {'', ' '}, f'app is not a valid name: {self.app}' + prefix = self.app.lower().replace(' ', '_') + # assert self.capture_dir is not None, f'{self.capture_dir} does not exist' + filename = f'{prefix}_{str(self.capture_id)}.pcap' + logger.debug(f'Capture file name: {filename}') self.capture_file = filename def save_capture_metadata_to_json(self, file_path: Path = Path(CAPTURE_METADATA_FILE)): - assert self.capture_dir.is_dir(), f"capture_dir is not a directory: {self.capture_dir}" + assert self.capture_dir.is_dir(), f'capture_dir is not a directory: {self.capture_dir}' if file_path.is_file(): - print(f"File {file_path} already exists, update instead.") + print(f'File {file_path} already exists, update instead.') return ReturnCodes.FILE_ALREADY_EXISTS metadata = self.to_json(indent=2) with file_path.open('w') as file: @@ -69,7 +69,7 @@ class CaptureMetadata: def to_json(self, indent=2): # TODO: Where to validate data? - logger.info(f"Converting CaptureMetadata to JSON") + logger.info(f'Converting CaptureMetadata to JSON') data = {} # List of fields from CaptureData class, if fields[key]==True, then it is a required field @@ -94,9 +94,9 @@ class CaptureMetadata: for field, is_mandatory in fields.items(): value = getattr(self, field, None) - if value not in [None, ""] or is_mandatory: - if value in [None, ""] and is_mandatory: - raise ValueError(f"Field {field} is required and cannot be empty.") + if value not in [None, ''] or is_mandatory: + if value in [None, ''] and is_mandatory: + raise ValueError(f'Field {field} is required and cannot be empty.') data[field] = str(value) if not isinstance(value, str) else value - logger.debug(f"Capture metadata: {data}") + logger.debug(f'Capture metadata: {data}') return json.dumps(data, indent=indent) diff --git a/code/iottb/models/device_metadata_model.py b/code/iottb/models/device_metadata_model.py index 67bda78..359aa96 100644 --- a/code/iottb/models/device_metadata_model.py +++ b/code/iottb/models/device_metadata_model.py @@ -9,7 +9,7 @@ from iottb.definitions import ReturnCodes, DEVICE_METADATA_FILE from iottb.logger import logger # 3rd party libs -IMMUTABLE_FIELDS = {"device_name", "device_short_name", "device_id", "date_created"} +IMMUTABLE_FIELDS = {'device_name', 'device_short_name', 'device_id', 'date_created'} class DeviceMetadata: @@ -31,23 +31,23 @@ class DeviceMetadata: def __init__(self, device_name: str, device_root_path: Path): self.device_name = device_name - self.device_short_name = device_name.lower().replace(" ", "_") + self.device_short_name = device_name.lower().replace(' ', '_') self.device_id = str(uuid.uuid4()) self.date_created = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower() self.device_root_path = device_root_path if not self.device_root_path or not self.device_root_path.is_dir(): - logger.error(f"Invalid device root path: {device_root_path}") - raise ValueError(f"Invalid device root path: {device_root_path}") - logger.debug(f"Device name: {device_name}") - logger.debug(f"Device short_name: {self.device_short_name}") - logger.debug(f"Device root dir: {device_root_path}") - logger.info(f"Initialized DeviceMetadata model: {device_name}") + logger.error(f'Invalid device root path: {device_root_path}') + raise ValueError(f'Invalid device root path: {device_root_path}') + logger.debug(f'Device name: {device_name}') + logger.debug(f'Device short_name: {self.device_short_name}') + logger.debug(f'Device root dir: {device_root_path}') + logger.info(f'Initialized DeviceMetadata model: {device_name}') @classmethod def load_from_json(cls, device_file_path: Path): - logger.info(f"Loading DeviceMetadata from JSON file: {device_file_path}") - assert device_file_path.is_file(), f"{device_file_path} is not a file" - assert device_file_path.name == DEVICE_METADATA_FILE, f"{device_file_path} is not a {DEVICE_METADATA_FILE}" + logger.info(f'Loading DeviceMetadata from JSON file: {device_file_path}') + assert device_file_path.is_file(), f'{device_file_path} is not a file' + assert device_file_path.name == DEVICE_METADATA_FILE, f'{device_file_path} is not a {DEVICE_METADATA_FILE}' device_meta_filename = device_file_path with device_meta_filename.open('r') as file: @@ -56,9 +56,9 @@ class DeviceMetadata: return metadata_model_obj def save_to_json(self, file_path: Path): - logger.info(f"Saving DeviceMetadata to JSON file: {file_path}") + logger.info(f'Saving DeviceMetadata to JSON file: {file_path}') if file_path.is_file(): - print(f"File {file_path} already exists, update instead.") + print(f'File {file_path} already exists, update instead.') return ReturnCodes.FILE_ALREADY_EXISTS metadata = self.to_json(indent=2) with file_path.open('w') as file: @@ -75,27 +75,27 @@ class DeviceMetadata: data = {} fields = { - "device_name": True, - "device_short_name": True, - "device_id": True, - "date_created": True, - "device_root_path": True, - "aliases": False, - "device_type": False, - "device_serial_number": False, - "device_firmware_version": False, - "date_updated": False, - "capture_files": False, + 'device_name': True, + 'device_short_name': True, + 'device_id': True, + 'date_created': True, + 'device_root_path': True, + 'aliases': False, + 'device_type': False, + 'device_serial_number': False, + 'device_firmware_version': False, + 'date_updated': False, + 'capture_files': False, } for field, is_mandatory in fields.items(): value = getattr(self, field, None) - if value not in [None, ""] or is_mandatory: - if value in [None, ""] and is_mandatory: - logger.debug(f"Mandatory field {field}: {value}") - raise ValueError(f"Field {field} is required and cannot be empty.") + if value not in [None, ''] or is_mandatory: + if value in [None, ''] and is_mandatory: + logger.debug(f'Mandatory field {field}: {value}') + raise ValueError(f'Field {field} is required and cannot be empty.') data[field] = str(value) if not isinstance(value, str) else value - logger.debug(f"Device metadata: {data}") + logger.debug(f'Device metadata: {data}') return json.dumps(data, indent=indent) @@ -104,7 +104,7 @@ def dir_contains_device_metadata(dir_path: Path): return False else: meta_file_path = dir_path / DEVICE_METADATA_FILE - print(f"Device metadata file path {str(meta_file_path)}") + print(f'Device metadata file path {str(meta_file_path)}') if not meta_file_path.is_file(): return False else: diff --git a/code/iottb/subcommands/add_device.py b/code/iottb/subcommands/add_device.py index 62c74bf..ea9f7b7 100644 --- a/code/iottb/subcommands/add_device.py +++ b/code/iottb/subcommands/add_device.py @@ -8,47 +8,47 @@ from iottb.utils.device_metadata_utils import * def setup_init_device_root_parser(subparsers): - parser = subparsers.add_parser("add-device", aliases=["add-device-root", "add"]) - parser.add_argument("--root_dir", type=pathlib.Path, default=pathlib.Path.cwd()) + parser = subparsers.add_parser('add-device', aliases=['add-device-root', 'add']) + parser.add_argument('--root_dir', type=pathlib.Path, default=pathlib.Path.cwd()) group = parser.add_mutually_exclusive_group() - group.add_argument("--guided", action="store_true", help="Guided setup", default=False) - group.add_argument("--name", action="store", type=str, help="name of device") + group.add_argument('--guided', action='store_true', help='Guided setup', default=False) + group.add_argument('--name', action='store', type=str, help='name of device') parser.set_defaults(func=handle_add) def handle_add(args): - logger.info(f"Add device handler called with args {args}") + logger.info(f'Add device handler called with args {args}') args.root_dir.mkdir(parents=True, exist_ok=True) # else metadata.save_to_file will fail TODO: unclear what to assume if args.guided: - logger.debug("begin guided setup") + logger.debug('begin guided setup') metadata = guided_setup(args.root_dir) - logger.debug("guided setup complete") + logger.debug('guided setup complete') else: - logger.debug("Setup through passed args: setup") + logger.debug('Setup through passed args: setup') if not args.name: - logger.error("No device name specified with unguided setup.") + logger.error('No device name specified with unguided setup.') return ReturnCodes.ERROR metadata = DeviceMetadata(args.name, args.root_dir) file_path = args.root_dir / DEVICE_METADATA_FILE if file_path.exists(): - print("Directory already contains a metadata file. Aborting.") + print('Directory already contains a metadata file. Aborting.') return ReturnCodes.ABORTED serialized_metadata = metadata.to_json() - response = input(f"Confirm device metadata: {serialized_metadata} [y/N]") - logger.debug(f"response: {response}") + response = input(f'Confirm device metadata: {serialized_metadata} [y/N]') + logger.debug(f'response: {response}') if response not in definitions.AFFIRMATIVE_USER_RESPONSE: - print("Adding device aborted by user.") + print('Adding device aborted by user.') return ReturnCodes.ABORTED - logger.debug(f"Device metadata file {file_path}") + logger.debug(f'Device metadata file {file_path}') if metadata.save_to_json(file_path) == ReturnCodes.FILE_ALREADY_EXISTS: - logger.error("File exists after checking, which should not happen.") + logger.error('File exists after checking, which should not happen.') return ReturnCodes.ABORTED - print("Device metadata successfully created.") + print('Device metadata successfully created.') return ReturnCodes.SUCCESS @@ -57,17 +57,17 @@ def configure_metadata(): def guided_setup(device_root) -> DeviceMetadata: - logger.info("Guided setup") - response = "N" - device_name = "" - while response.upper() == "N": - device_name = input("Please enter name of device: ") - response = input(f"Confirm device name: {device_name} [y/N] ") - if device_name == "" or device_name is None: - print("Name cannot be empty") - logger.warning("Name cannot be empty") - logger.debug(f"Response is {response}") - logger.debug(f"Device name is {device_name}") + logger.info('Guided setup') + response = 'N' + device_name = '' + while response.upper() == 'N': + device_name = input('Please enter name of device: ') + response = input(f'Confirm device name: {device_name} [y/N] ') + if device_name == '' or device_name is None: + print('Name cannot be empty') + logger.warning('Name cannot be empty') + logger.debug(f'Response is {response}') + logger.debug(f'Device name is {device_name}') return DeviceMetadata(device_name, device_root) diff --git a/code/iottb/subcommands/capture.py b/code/iottb/subcommands/capture.py index 07c2b48..a2020f7 100644 --- a/code/iottb/subcommands/capture.py +++ b/code/iottb/subcommands/capture.py @@ -10,31 +10,31 @@ from iottb.utils.capture_utils import get_capture_src_folder, make_capture_src_f def setup_capture_parser(subparsers): parser = subparsers.add_parser('sniff', help='Sniff packets with tcpdump') # metadata args - parser.add_argument("-a", "--ip-address", help="IP address of the device to sniff", dest="device_ip") + parser.add_argument('-a', '--ip-address', help='IP address of the device to sniff', dest='device_ip') # tcpdump args - parser.add_argument("device_root", help="Root folder for device to sniff", + parser.add_argument('device_root', help='Root folder for device to sniff', type=Path, default=Path.cwd()) - parser.add_argument("-s", "--safe", help="Ensure correct device root folder before sniffing", action="store_true") - parser.add_argument("--app", help="Application name to sniff", dest="app_name", default=None) + parser.add_argument('-s', '--safe', help='Ensure correct device root folder before sniffing', action='store_true') + parser.add_argument('--app', help='Application name to sniff', dest='app_name', default=None) parser_sniff_tcpdump = parser.add_argument_group('tcpdump arguments') - parser_sniff_tcpdump.add_argument("-i", "--interface", help="Interface to capture on.", dest="capture_interface", + parser_sniff_tcpdump.add_argument('-i', '--interface', help='Interface to capture on.', dest='capture_interface', required=True) - parser_sniff_tcpdump.add_argument("-I", "--monitor-mode", help="Put interface into monitor mode", - action="store_true") - parser_sniff_tcpdump.add_argument("-n", help="Deactivate name resolution. True by default.", - action="store_true", dest="no_name_resolution") - parser_sniff_tcpdump.add_argument("-#", "--number", - help="Print packet number at beginning of line. True by default.", - action="store_true") - parser_sniff_tcpdump.add_argument("-e", help="Print link layer headers. True by default.", - action="store_true", dest="print_link_layer") - parser_sniff_tcpdump.add_argument("-t", action="count", default=0, - help="Please see tcpdump manual for details. Unused by default.") + parser_sniff_tcpdump.add_argument('-I', '--monitor-mode', help='Put interface into monitor mode', + action='store_true') + parser_sniff_tcpdump.add_argument('-n', help='Deactivate name resolution. True by default.', + action='store_true', dest='no_name_resolution') + parser_sniff_tcpdump.add_argument('-#', '--number', + help='Print packet number at beginning of line. True by default.', + action='store_true') + parser_sniff_tcpdump.add_argument('-e', help='Print link layer headers. True by default.', + action='store_true', dest='print_link_layer') + parser_sniff_tcpdump.add_argument('-t', action='count', default=0, + help='Please see tcpdump manual for details. Unused by default.') cap_size_group = parser.add_mutually_exclusive_group(required=False) - cap_size_group.add_argument("-c", "--count", type=int, help="Number of packets to capture.", default=1000) - cap_size_group.add_argument("--mins", type=int, help="Time in minutes to capture.", default=1) + cap_size_group.add_argument('-c', '--count', type=int, help='Number of packets to capture.', default=1000) + cap_size_group.add_argument('--mins', type=int, help='Time in minutes to capture.', default=1) parser.set_defaults(func=handle_capture) @@ -45,25 +45,25 @@ def cwd_is_device_root_dir() -> bool: def start_guided_device_root_dir_setup(): - assert False, "Not implemented" + assert False, 'Not implemented' def handle_metadata(): assert not cwd_is_device_root_dir() - print(f"Unable to find {DEVICE_METADATA_FILE} in current working directory") - print("You need to setup a device root directory before using this command") - response = input("Would you like to be guided through the setup? [y/n]") - if response.lower() == "y": + print(f'Unable to find {DEVICE_METADATA_FILE} in current working directory') + print('You need to setup a device root directory before using this command') + response = input('Would you like to be guided through the setup? [y/n]') + if response.lower() == 'y': start_guided_device_root_dir_setup() else: - print("'iottb init-device-root --help' for more information.") + print(''iottb init-device-root --help' for more information.') exit(ReturnCodes.ABORTED) # device_id = handle_capture_metadata() return ReturnCodes.SUCCESS def get_device_metadata_from_file(device_metadata_filename: Path) -> str: - assert device_metadata_filename.is_file(), f"Device metadata file '{device_metadata_filename} does not exist" + assert device_metadata_filename.is_file(), f'Device metadata file '{device_metadata_filename} does not exist' device_metadata = DeviceMetadata.load_from_json(device_metadata_filename) return device_metadata @@ -73,26 +73,26 @@ def run_tcpdump(cmd): try: p = subprocess.run(cmd, capture_output=True, text=True, check=True) if p.returncode != 0: - print(f"Error running tcpdump {p.stderr}") + print(f'Error running tcpdump {p.stderr}') else: - print(f"tcpdump run successfully\n: {p.stdout}") + print(f'tcpdump run successfully\n: {p.stdout}') except KeyboardInterrupt: pass def handle_capture(args): - assert args.device_root is not None, f"Device root directory is required" - assert dir_contains_device_metadata(args.device_root), f"Device metadata file '{args.device_root}' does not exist" + assert args.device_root is not None, f'Device root directory is required' + assert dir_contains_device_metadata(args.device_root), f'Device metadata file '{args.device_root}' does not exist' # get device metadata if args.safe and not dir_contains_device_metadata(args.device_root): - print(f"Supplied folder contains no device metadata. " - f"Please setup a device root directory before using this command") + print(f'Supplied folder contains no device metadata. ' + f'Please setup a device root directory before using this command') exit(ReturnCodes.ABORTED) elif dir_contains_device_metadata(args.device_root): device_metadata_filename = args.device_root / DEVICE_METADATA_FILE device_data = DeviceMetadata.load_from_json(device_metadata_filename) else: - name = input("Please enter a device name: ") + name = input('Please enter a device name: ') args.device_root.mkdir(parents=True, exist_ok=True) device_data = DeviceMetadata(name, args.device_root) # start constructing environment for capture @@ -115,13 +115,13 @@ def handle_capture(args): capture_metadata.start_time = start_time capture_metadata.stop_time = stop_time except KeyboardInterrupt: - print("Received keyboard interrupt.") + print('Received keyboard interrupt.') exit(ReturnCodes.ABORTED) except subprocess.CalledProcessError as e: - print(f"Failed to capture packet: {e}") + print(f'Failed to capture packet: {e}') exit(ReturnCodes.FAILURE) except Exception as e: - print(f"Failed to capture packet: {e}") + print(f'Failed to capture packet: {e}') exit(ReturnCodes.FAILURE) return ReturnCodes.SUCCESS @@ -141,7 +141,7 @@ def build_tcpdump_args(args, cmd, capture_metadata: CaptureMetadata): cmd.append('-c') cmd.append(str(args.count)) elif args.mins: - assert False, "Unimplemented option" + assert False, 'Unimplemented option' if args.app_name is not None: capture_metadata.app = args.app_name @@ -162,7 +162,7 @@ def build_tcpdump_args(args, cmd, capture_metadata: CaptureMetadata): # if args.app_name is not None: # capture_file_prefix = args.app_name # capture_metadata.set_app(args.app_name) -# capfile_name = capture_file_prefix + "_" + str(capture_metadata.get_capture_id()) + ".pcap" +# capfile_name = capture_file_prefix + '_' + str(capture_metadata.get_capture_id()) + '.pcap' # capture_metadata.set_capture_file(capfile_name) # capfile_abs_path = capture_dir / capfile_name # capture_metadata.set_capture_file(capfile_name) diff --git a/code/iottb/utils/capture_metadata_utils.py b/code/iottb/utils/capture_metadata_utils.py index 2332127..2c9eba6 100644 --- a/code/iottb/utils/capture_metadata_utils.py +++ b/code/iottb/utils/capture_metadata_utils.py @@ -9,12 +9,12 @@ def set_device_ip_address(ip_addr: str, file_path: Path): assert file_path.is_file() with file_path.open('r') as f: data = json.load(f) - current_ip = data["device_ip_address"] + current_ip = data['device_ip_address'] if current_ip is not None: - print(f"Device IP Address is set to {current_ip}") - response = input(f"Do you want to change the recorded IP address to {ip_addr}? [Y/N] ") - if response.upper() == "N": - print("Aborting change to device IP address") + print(f'Device IP Address is set to {current_ip}') + response = input(f'Do you want to change the recorded IP address to {ip_addr}? [Y/N] ') + if response.upper() == 'N': + print('Aborting change to device IP address') return ReturnCodes.ABORTED with file_path.open('w') as f: json.dump(data, f) @@ -26,12 +26,12 @@ def set_device_mac_address(mac_addr: str, file_path: Path): assert file_path.is_file() with file_path.open('r') as f: data = json.load(f) - current_mac = data["device_mac_address"] + current_mac = data['device_mac_address'] if current_mac is not None: - print(f"Device MAC Address is set to {current_mac}") - response = input(f"Do you want to change the recorded MAC address to {mac_addr}? [Y/N] ") - if response.upper() == "N": - print("Aborting change to device MAC address") + print(f'Device MAC Address is set to {current_mac}') + response = input(f'Do you want to change the recorded MAC address to {mac_addr}? [Y/N] ') + if response.upper() == 'N': + print('Aborting change to device MAC address') return ReturnCodes.ABORTED with file_path.open('w') as f: json.dump(data, f) diff --git a/code/iottb/utils/capture_utils.py b/code/iottb/utils/capture_utils.py index 03b3c2f..8c4d60f 100644 --- a/code/iottb/utils/capture_utils.py +++ b/code/iottb/utils/capture_utils.py @@ -16,29 +16,29 @@ def get_capture_date_folder(device_root: Path): try: today_folder.mkdir() except FileExistsError: - print(f"Folder {today_folder} already exists") + print(f'Folder {today_folder} already exists') return today_folder - raise FileNotFoundError(f"Given path {device_root} is not a device root directory") + raise FileNotFoundError(f'Given path {device_root} is not a device root directory') def get_capture_src_folder(device_folder: Path): - assert device_folder.is_dir(), f"Given path {device_folder} is not a folder" + assert device_folder.is_dir(), f'Given path {device_folder} is not a folder' today_iso = get_iso_date() max_sequence_number = 1 for d in device_folder.iterdir(): if d.is_dir() and d.name.startswith(f'{today_iso}_capture_'): name = d.name - num = int(name.split("_")[2]) + num = int(name.split('_')[2]) max_sequence_number = max(max_sequence_number, num) next_sequence_number = max_sequence_number + 1 - return device_folder.joinpath(f"{today_iso}_capture_{next_sequence_number:03}") + return device_folder.joinpath(f'{today_iso}_capture_{next_sequence_number:03}') def make_capture_src_folder(capture_src_folder: Path): try: capture_src_folder.mkdir() except FileExistsError: - print(f"Folder {capture_src_folder} already exists") + print(f'Folder {capture_src_folder} already exists') finally: return capture_src_folder diff --git a/code/iottb/utils/tcpdump_utils.py b/code/iottb/utils/tcpdump_utils.py index c83be66..ff28cf6 100644 --- a/code/iottb/utils/tcpdump_utils.py +++ b/code/iottb/utils/tcpdump_utils.py @@ -5,25 +5,25 @@ from typing import Optional def check_installed() -> bool: - """Check if tcpdump is installed and available on the system path.""" + '''Check if tcpdump is installed and available on the system path.''' return shutil.which('tcpdump') is not None def ensure_installed(): - """Ensure that tcpdump is installed, raise an error if not.""" + '''Ensure that tcpdump is installed, raise an error if not.''' if not check_installed(): - raise RuntimeError("tcpdump is not installed. Please install it to continue.") + raise RuntimeError('tcpdump is not installed. Please install it to continue.') def list_interfaces() -> str: - """List available network interfaces using tcpdump.""" + '''List available network interfaces using tcpdump.''' ensure_installed() try: result = subprocess.run(['tcpdump', '--list-interfaces'], capture_output=True, text=True, check=True) return result.stdout except subprocess.CalledProcessError as e: - print(f"Failed to list interfaces: {e}") - return "" + print(f'Failed to list interfaces: {e}') + return '' def is_valid_ipv4(ip: str) -> bool: diff --git a/code/iottb/utils/utils.py b/code/iottb/utils/utils.py index 0b0770d..f28fa5b 100644 --- a/code/iottb/utils/utils.py +++ b/code/iottb/utils/utils.py @@ -13,6 +13,6 @@ def subfolder_exists(parent: Path, child: str): def generate_unique_string_with_prefix(prefix: str): - return prefix + "_" + str(uuid.uuid4()) + return prefix + '_' + str(uuid.uuid4()) diff --git a/code/tests/subcommands/test_add_device.py b/code/tests/subcommands/test_add_device.py index a7fedcf..206d157 100644 --- a/code/tests/subcommands/test_add_device.py +++ b/code/tests/subcommands/test_add_device.py @@ -10,7 +10,7 @@ from iottb.__main__ import main class TestDeviceSetup(unittest.TestCase): def setUp(self): - self.test_dir = Path("/tmp/iottbtest/test_add_device") + self.test_dir = Path('/tmp/iottbtest/test_add_device') self.test_dir.mkdir(parents=True, exist_ok=True) # self.captured_output = StringIO() # sys.stdout = self.captured_output @@ -25,12 +25,12 @@ class TestDeviceSetup(unittest.TestCase): self.test_dir.rmdir() # sys.stdout = sys.__stdout__ - @patch("builtins.input", side_effect=["iPhone 14", "y", "y"]) + @patch('builtins.input', side_effect=['iPhone 14', 'y', 'y']) def test_guided_device_setup(self, mock_input): sys.argv = ['__main__.py', 'add', '--root_dir', str(self.test_dir), '--guided'] main() expected_file = self.test_dir / DEVICE_METADATA_FILE - self.assertTrue(expected_file.exists()), f"Expected file not created: {expected_file}" + self.assertTrue(expected_file.exists()), f'Expected file not created: {expected_file}' if __name__ == '__main__': From 2e95bd2fd2e502a1558eef41ac336e6c0f246838 Mon Sep 17 00:00:00 2001 From: Sebastian Lenzlinger Date: Wed, 8 May 2024 02:52:37 +0200 Subject: [PATCH 10/12] Fix places where quote replacement lead to issues. --- code/iottb/models/capture_metadata_model.py | 4 ++-- code/iottb/subcommands/capture.py | 6 +++--- code/iottb/utils/tcpdump_utils.py | 6 +++--- code/tests/subcommands/test_add_device.py | 6 ++++++ 4 files changed, 14 insertions(+), 8 deletions(-) diff --git a/code/iottb/models/capture_metadata_model.py b/code/iottb/models/capture_metadata_model.py index ba2fcd1..4213212 100644 --- a/code/iottb/models/capture_metadata_model.py +++ b/code/iottb/models/capture_metadata_model.py @@ -2,7 +2,7 @@ import json import uuid from datetime import datetime from pathlib import Path -from typing import Optional, Any, OrderedDict +from typing import Optional from iottb.definitions import ReturnCodes, CAPTURE_METADATA_FILE from iottb.models.device_metadata_model import DeviceMetadata @@ -36,7 +36,7 @@ class CaptureMetadata: app_version: Optional[str] = None firmware_version: Optional[str] = None - def __init__(self, device_metadata: DeviceMetadata, capture_dir: Path, /, **data: Any): + def __init__(self, device_metadata: DeviceMetadata, capture_dir: Path): logger.info(f'Creating CaptureMetadata model from DeviceMetadata: {device_metadata}') self.device_metadata = device_metadata diff --git a/code/iottb/subcommands/capture.py b/code/iottb/subcommands/capture.py index a2020f7..b1d76a9 100644 --- a/code/iottb/subcommands/capture.py +++ b/code/iottb/subcommands/capture.py @@ -56,14 +56,14 @@ def handle_metadata(): if response.lower() == 'y': start_guided_device_root_dir_setup() else: - print(''iottb init-device-root --help' for more information.') + print('\'iottb init-device-root --help\' for more information.') exit(ReturnCodes.ABORTED) # device_id = handle_capture_metadata() return ReturnCodes.SUCCESS def get_device_metadata_from_file(device_metadata_filename: Path) -> str: - assert device_metadata_filename.is_file(), f'Device metadata file '{device_metadata_filename} does not exist' + assert device_metadata_filename.is_file(), f'Device metadata file f"{device_metadata_filename}" does not exist' device_metadata = DeviceMetadata.load_from_json(device_metadata_filename) return device_metadata @@ -82,7 +82,7 @@ def run_tcpdump(cmd): def handle_capture(args): assert args.device_root is not None, f'Device root directory is required' - assert dir_contains_device_metadata(args.device_root), f'Device metadata file '{args.device_root}' does not exist' + assert dir_contains_device_metadata(args.device_root), f'Device metadata file \'{args.device_root}\' does not exist' # get device metadata if args.safe and not dir_contains_device_metadata(args.device_root): print(f'Supplied folder contains no device metadata. ' diff --git a/code/iottb/utils/tcpdump_utils.py b/code/iottb/utils/tcpdump_utils.py index ff28cf6..d9df1d5 100644 --- a/code/iottb/utils/tcpdump_utils.py +++ b/code/iottb/utils/tcpdump_utils.py @@ -5,18 +5,18 @@ from typing import Optional def check_installed() -> bool: - '''Check if tcpdump is installed and available on the system path.''' + """Check if tcpdump is installed and available on the system path.""" return shutil.which('tcpdump') is not None def ensure_installed(): - '''Ensure that tcpdump is installed, raise an error if not.''' + """Ensure that tcpdump is installed, raise an error if not.""" if not check_installed(): raise RuntimeError('tcpdump is not installed. Please install it to continue.') def list_interfaces() -> str: - '''List available network interfaces using tcpdump.''' + """List available network interfaces using tcpdump.""" ensure_installed() try: result = subprocess.run(['tcpdump', '--list-interfaces'], capture_output=True, text=True, check=True) diff --git a/code/tests/subcommands/test_add_device.py b/code/tests/subcommands/test_add_device.py index 206d157..1c82ec9 100644 --- a/code/tests/subcommands/test_add_device.py +++ b/code/tests/subcommands/test_add_device.py @@ -32,6 +32,12 @@ class TestDeviceSetup(unittest.TestCase): expected_file = self.test_dir / DEVICE_METADATA_FILE self.assertTrue(expected_file.exists()), f'Expected file not created: {expected_file}' + def test_device_setup(self): + sys.argv = ['__main__.py', 'add', '--root_dir', str(self.test_dir), '--name', 'iPhone 14'] + main() + expected_file = self.test_dir / DEVICE_METADATA_FILE + self.assertTrue(expected_file.exists()), f'Expected file not created: {expected_file}' + if __name__ == '__main__': unittest.main() From 7ffbdda7ea105782a5e4d603838c247df07d707e Mon Sep 17 00:00:00 2001 From: Sebastian Lenzlinger Date: Wed, 8 May 2024 02:56:49 +0200 Subject: [PATCH 11/12] Add test case to add-device subcommand --- code/tests/subcommands/test_add_device.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/code/tests/subcommands/test_add_device.py b/code/tests/subcommands/test_add_device.py index 1c82ec9..d2b4044 100644 --- a/code/tests/subcommands/test_add_device.py +++ b/code/tests/subcommands/test_add_device.py @@ -32,7 +32,8 @@ class TestDeviceSetup(unittest.TestCase): expected_file = self.test_dir / DEVICE_METADATA_FILE self.assertTrue(expected_file.exists()), f'Expected file not created: {expected_file}' - def test_device_setup(self): + @patch('builtins.input', side_effect=['y']) # need mock_input else wont work + def test_device_setup(self, mock_input): sys.argv = ['__main__.py', 'add', '--root_dir', str(self.test_dir), '--name', 'iPhone 14'] main() expected_file = self.test_dir / DEVICE_METADATA_FILE From 64788a1997d839fa1f9e8b76451559a300b717dc Mon Sep 17 00:00:00 2001 From: Sebastian Lenzlinger Date: Wed, 8 May 2024 03:06:20 +0200 Subject: [PATCH 12/12] Move unused modules into archive. --- {code/iottb/utils => archive}/capture_metadata_utils.py | 0 {code/iottb/utils => archive}/device_metadata_utils.py | 0 code/iottb/subcommands/add_device.py | 6 ++++-- code/tests/subcommands/test_add_device.py | 2 +- code/tests/utils/test_capture_metadata_utils.py | 6 +----- 5 files changed, 6 insertions(+), 8 deletions(-) rename {code/iottb/utils => archive}/capture_metadata_utils.py (100%) rename {code/iottb/utils => archive}/device_metadata_utils.py (100%) diff --git a/code/iottb/utils/capture_metadata_utils.py b/archive/capture_metadata_utils.py similarity index 100% rename from code/iottb/utils/capture_metadata_utils.py rename to archive/capture_metadata_utils.py diff --git a/code/iottb/utils/device_metadata_utils.py b/archive/device_metadata_utils.py similarity index 100% rename from code/iottb/utils/device_metadata_utils.py rename to archive/device_metadata_utils.py diff --git a/code/iottb/subcommands/add_device.py b/code/iottb/subcommands/add_device.py index ea9f7b7..b919215 100644 --- a/code/iottb/subcommands/add_device.py +++ b/code/iottb/subcommands/add_device.py @@ -1,11 +1,13 @@ +import logging import pathlib from iottb import definitions -from iottb.definitions import DEVICE_METADATA_FILE, ReturnCodes +from iottb.definitions import DEVICE_METADATA_FILE from iottb.logger import logger from iottb.models.device_metadata_model import DeviceMetadata -from iottb.utils.device_metadata_utils import * +from archive.device_metadata_utils import * +logger.setLevel(logging.INFO) # Since module currently passes all tests def setup_init_device_root_parser(subparsers): parser = subparsers.add_parser('add-device', aliases=['add-device-root', 'add']) diff --git a/code/tests/subcommands/test_add_device.py b/code/tests/subcommands/test_add_device.py index d2b4044..0d78f82 100644 --- a/code/tests/subcommands/test_add_device.py +++ b/code/tests/subcommands/test_add_device.py @@ -8,7 +8,7 @@ import shutil from iottb.__main__ import main -class TestDeviceSetup(unittest.TestCase): +class TestDeviceMetadataFileCreation(unittest.TestCase): def setUp(self): self.test_dir = Path('/tmp/iottbtest/test_add_device') self.test_dir.mkdir(parents=True, exist_ok=True) diff --git a/code/tests/utils/test_capture_metadata_utils.py b/code/tests/utils/test_capture_metadata_utils.py index 71278c3..139597f 100644 --- a/code/tests/utils/test_capture_metadata_utils.py +++ b/code/tests/utils/test_capture_metadata_utils.py @@ -1,6 +1,2 @@ -import json -from pathlib import Path -from unittest.mock import mock_open, patch -import pytest -from iottb.utils.capture_metadata_utils import set_device_ip_address \ No newline at end of file +