Remove getters and setters from CaptureMetadata class and refactor dependencies to use field access.

This commit is contained in:
Sebastian Lenzlinger 2024-05-08 00:02:08 +02:00
parent 798a32b23e
commit 6b73530943
2 changed files with 15 additions and 109 deletions

View File

@ -3,8 +3,6 @@ import uuid
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import Optional, Any from typing import Optional, Any
from uuid import UUID
from iottb.definitions import ReturnCodes, CAPTURE_METADATA_FILE from iottb.definitions import ReturnCodes, CAPTURE_METADATA_FILE
from iottb.models.device_metadata_model import DeviceMetadata from iottb.models.device_metadata_model import DeviceMetadata
@ -14,7 +12,8 @@ from iottb.logger import logger
class CaptureMetadata: class CaptureMetadata:
# Required Fields # Required Fields
device_metadata: DeviceMetadata device_metadata: DeviceMetadata
capture_id: uuid.UUID = lambda: str(uuid.uuid4()) capture_id: str = lambda: str(uuid.uuid4())
device_id: str
capture_dir: Path capture_dir: Path
capture_file: str capture_file: str
capture_date: str = lambda: datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower() capture_date: str = lambda: datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
@ -30,7 +29,7 @@ class CaptureMetadata:
interface: str = "" interface: str = ""
# Optional Fields # Optional Fields
device_ip_address: Optional[str] = "No IP Address set" device_ip_address: str = "No IP Address set"
device_mac_address: Optional[str] = None device_mac_address: Optional[str] = None
app: Optional[str] = None app: Optional[str] = None
@ -39,127 +38,34 @@ class CaptureMetadata:
def __init__(self, device_metadata: DeviceMetadata, capture_dir: Path, /, **data: Any): def __init__(self, device_metadata: DeviceMetadata, capture_dir: Path, /, **data: Any):
logger.info(f"Creating CaptureMetadata model from DeviceMetadata: {device_metadata}") logger.info(f"Creating CaptureMetadata model from DeviceMetadata: {device_metadata}")
super().__init__(**data) # Pycharms orders
self.device_metadata = device_metadata self.device_metadata = device_metadata
self.capture_dir = capture_dir self.capture_dir = capture_dir
assert capture_dir.is_dir(), f"Capture directory {capture_dir} does not exist" assert capture_dir.is_dir(), f"Capture directory {capture_dir} does not exist"
# Getters
def get_device_id(self) -> str:
return self.device_metadata.get_device_id()
def get_start_time(self) -> str:
return self.start_time
def get_stop_time(self) -> str:
return self.stop_time
def get_packet_count(self) -> int:
return self.packet_count
def get_pcap_filter(self) -> str:
return self.pcap_filter
def get_device_ip_address(self) -> str:
return self.device_ip_address
def get_device_mac_address(self) -> str:
return self.device_mac_address
def get_app(self) -> str:
return self.app
def get_app_version(self) -> str:
return self.app_version
def get_firmware_version(self) -> str:
return self.firmware_version
def get_capture_id(self) -> UUID:
return self.capture_id
def get_capture_date(self) -> str:
return self.capture_date
def get_capfile_name(self):
return self.capture_file
def get_device_metadata(self) -> DeviceMetadata:
return self.device_metadata
def get_interface(self):
return self.interface
# Setters
def set_capture_dir(self, capture_dir: Path):
self.capture_dir = capture_dir
def set_capture_file(self, capture_file: str):
self.capture_file = capture_file
def set_capture_date(self, capture_date: str):
self.capture_date = capture_date
def set_start_time(self, start_time: str):
self.start_time = start_time
def set_stop_time(self, stop_time: str):
self.stop_time = stop_time
def set_packet_count(self, packet_count: int):
self.packet_count = packet_count
def set_pcap_filter(self, pcap_filter: str):
self.pcap_filter = pcap_filter
def set_device_ip_address(self, device_ip_address: str):
self.device_ip_address = device_ip_address
def set_device_mac_address(self, device_mac_address: str):
self.device_mac_address = device_mac_address
def set_app(self, app: str):
self.app = app
def set_app_version(self, app_version: str):
self.app_version = app_version
def set_firmware_version(self, firmware_version: str):
self.firmware_version = firmware_version
self.device_metadata.set_device_firmware_version(firmware_version)
def set_interface(self, interface: str):
self.interface = interface
def set_tcpdump_command(self, tcpdump_command: str):
self.tcpdump_command = tcpdump_command
# Other
def build_capture_file_name(self): def build_capture_file_name(self):
logger.info(f"Building capture file name") logger.info(f"Building capture file name")
prefix = ""
if self.app is None: if self.app is None:
logger.debug(f"No app specified") logger.debug(f"No app specified")
prefix = self.device_metadata.get_device_short_name() prefix = self.device_metadata.get_device_short_name()
else: else:
logger.debug(f"App specified: {self.app}") logger.debug(f"App specified: {self.app}")
assert str(self.app).strip() not in {"", " "}, f"app is not a valid name: {self.app}" assert str(self.app).strip() not in {"", " "}, f"app is not a valid name: {self.app}"
prefix = self.get_app() prefix = self.app.lower().replace(" ", "_")
# assert self.capture_dir is not None, f"{self.capture_dir} does not exist" # assert self.capture_dir is not None, f"{self.capture_dir} does not exist"
filename = f"{prefix}_{str(self.capture_id)}.pcap" filename = f"{prefix}_{str(self.capture_id)}.pcap"
logger.debug(f"Capture file name: {filename}") logger.debug(f"Capture file name: {filename}")
self.set_capture_file(filename) self.capture_file = filename
def save_capture_metadata_to_json(self, file_path: Path = Path(CAPTURE_METADATA_FILE)): def save_capture_metadata_to_json(self, file_path: Path = Path(CAPTURE_METADATA_FILE)):
assert self.capture_dir.is_dir(), f"capture_dir is not a directory: {self.capture_dir}" assert self.capture_dir.is_dir(), f"capture_dir is not a directory: {self.capture_dir}"
if file_path.is_file(): if file_path.is_file():
print(f"File {file_path} already exists, update instead.") print(f"File {file_path} already exists, update instead.")
return ReturnCodes.FILE_ALREADY_EXISTS return ReturnCodes.FILE_ALREADY_EXISTS
metadata = self.model_dump_json(indent=2, exclude_unset=True, exclude_none=True) metadata = self.to_json(indent=2)
with file_path.open('w') as file: with file_path.open('w') as file:
json.dump(metadata, file) json.dump(metadata, file)
return ReturnCodes.SUCCESS return ReturnCodes.SUCCESS
def model_dump_json(self, indent, exclude_unset, exclude_none): def to_json(self, indent):
pass pass

View File

@ -100,10 +100,10 @@ def handle_capture(args):
make_capture_src_folder(capture_dir) make_capture_src_folder(capture_dir)
capture_metadata = CaptureMetadata(device_data, capture_dir) capture_metadata = CaptureMetadata(device_data, capture_dir)
capture_metadata.set_interface(args.capture_interface) capture_metadata.interface = args.capture_interface
cmd = ['sudo', 'tcpdump', '-i', args.capture_interface] cmd = ['sudo', 'tcpdump', '-i', args.capture_interface]
cmd = build_tcpdump_args(args, cmd, capture_metadata) cmd = build_tcpdump_args(args, cmd, capture_metadata)
capture_metadata.set_tcpdump_command(cmd) capture_metadata.tcpdump_command = cmd
print('Executing: ' + ' '.join(cmd)) print('Executing: ' + ' '.join(cmd))
@ -112,8 +112,8 @@ def handle_capture(args):
start_time = datetime.now().strftime('%H:%M:%S') start_time = datetime.now().strftime('%H:%M:%S')
run_tcpdump(cmd) run_tcpdump(cmd)
stop_time = datetime.now().strftime('%H:%M:%S') stop_time = datetime.now().strftime('%H:%M:%S')
capture_metadata.set_start_time(start_time) capture_metadata.start_time = start_time
capture_metadata.set_stop_time(stop_time) capture_metadata.stop_time = stop_time
except KeyboardInterrupt: except KeyboardInterrupt:
print("Received keyboard interrupt.") print("Received keyboard interrupt.")
exit(ReturnCodes.ABORTED) exit(ReturnCodes.ABORTED)
@ -144,15 +144,15 @@ def build_tcpdump_args(args, cmd, capture_metadata: CaptureMetadata):
assert False, "Unimplemented option" assert False, "Unimplemented option"
if args.app_name is not None: if args.app_name is not None:
capture_metadata.set_app_name(args.app_name) capture_metadata.app = args.app_name
capture_metadata.build_capture_file_name() capture_metadata.build_capture_file_name()
cmd.append('-w') cmd.append('-w')
cmd.append(capture_metadata.get_capfile_name()) cmd.append(capture_metadata.capture_file)
if args.safe: if args.safe:
cmd.append(f'host {args.device_ip}') # if not specified, filter 'any' implied by tcpdump cmd.append(f'host {args.device_ip}') # if not specified, filter 'any' implied by tcpdump
capture_metadata.set_device_ip_address(args.device_ip) capture_metadata.device_id = args.device_ip
return cmd return cmd