UNTESTED REFACTORING:
Move more functionality into Metadata Model classes to ensure data is available and better passable between functions.
This commit is contained in:
@@ -3,46 +3,155 @@ import uuid
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Optional, Any
|
||||
from uuid import UUID
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from iottb.definitions import ReturnCodes
|
||||
from iottb.definitions import ReturnCodes, CAPTURE_METADATA_FILE
|
||||
from iottb.models.device_metadata_model import DeviceMetadata
|
||||
|
||||
|
||||
class CaptureMetadata(BaseModel):
|
||||
# Required Fields
|
||||
device_id: str
|
||||
device_metadata = Field(DeviceMetadata, exclude=True)
|
||||
capture_id: uuid.UUID = Field(default_factory=lambda: str(uuid.uuid4()))
|
||||
capture_dir: Path
|
||||
capture_file: str
|
||||
capture_date: str = Field(default_factory=lambda: datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower())
|
||||
|
||||
# Statistics
|
||||
start_time: str
|
||||
stop_time: str
|
||||
|
||||
# tcpdump
|
||||
packet_count: Optional[int]
|
||||
pcap_filter: str = ""
|
||||
tcpdump_command: str = ""
|
||||
interface = Field(str, default="")
|
||||
|
||||
# Optional Fields
|
||||
device_ip_address: Optional[str] = None
|
||||
device_ip_address: Optional[str] = "No IP Address set"
|
||||
device_mac_address: Optional[str] = None
|
||||
|
||||
app: Optional[str] = None
|
||||
app_version: Optional[str] = None
|
||||
firmware_version: Optional[str] = None
|
||||
|
||||
def __init__(self, device_id: str, start_time: str, stop_time: str, /, **data: Any):
|
||||
def __init__(self, device_metadata: DeviceMetadata, capture_dir: Path, /, **data: Any):
|
||||
super().__init__(**data) # Pycharms orders
|
||||
assert isinstance(device_id, str)
|
||||
assert isinstance(start_time, str)
|
||||
assert isinstance(stop_time, str)
|
||||
self.device_id = device_id
|
||||
self.device_metadata = device_metadata
|
||||
self.capture_dir = capture_dir
|
||||
assert capture_dir.is_dir()
|
||||
|
||||
# Getters
|
||||
def get_device_id(self) -> str:
|
||||
return self.device_id
|
||||
|
||||
def get_start_time(self) -> str:
|
||||
return self.start_time
|
||||
|
||||
def get_stop_time(self) -> str:
|
||||
return self.stop_time
|
||||
|
||||
def get_packet_count(self) -> int:
|
||||
return self.packet_count
|
||||
|
||||
def get_pcap_filter(self) -> str:
|
||||
return self.pcap_filter
|
||||
|
||||
def get_device_ip_address(self) -> str:
|
||||
return self.device_ip_address
|
||||
|
||||
def get_device_mac_address(self) -> str:
|
||||
return self.device_mac_address
|
||||
|
||||
def get_app(self) -> str:
|
||||
return self.app
|
||||
|
||||
def get_app_version(self) -> str:
|
||||
return self.app_version
|
||||
|
||||
def get_firmware_version(self) -> str:
|
||||
return self.firmware_version
|
||||
|
||||
def get_capture_id(self) -> UUID:
|
||||
return self.capture_id
|
||||
|
||||
def get_capture_date(self) -> str:
|
||||
return self.capture_date
|
||||
|
||||
def get_capfile_name(self):
|
||||
return self.capture_file
|
||||
|
||||
def get_device_metadata(self) -> DeviceMetadata:
|
||||
return self.device_metadata
|
||||
|
||||
def get_interface(self):
|
||||
return self.interface
|
||||
|
||||
# Setters
|
||||
def set_capture_dir(self, capture_dir: Path):
|
||||
self.capture_dir = capture_dir
|
||||
|
||||
def set_capture_file(self, capture_file: str):
|
||||
self.capture_file = capture_file
|
||||
|
||||
def set_capture_date(self, capture_date: str):
|
||||
self.capture_date = capture_date
|
||||
|
||||
def set_start_time(self, start_time: str):
|
||||
self.start_time = start_time
|
||||
|
||||
def set_stop_time(self, stop_time: str):
|
||||
self.stop_time = stop_time
|
||||
|
||||
def save_to_json(self, file_path: Path):
|
||||
def set_packet_count(self, packet_count: int):
|
||||
self.packet_count = packet_count
|
||||
|
||||
def set_pcap_filter(self, pcap_filter: str):
|
||||
self.pcap_filter = pcap_filter
|
||||
|
||||
def set_device_ip_address(self, device_ip_address: str):
|
||||
self.device_ip_address = device_ip_address
|
||||
|
||||
def set_device_mac_address(self, device_mac_address: str):
|
||||
self.device_mac_address = device_mac_address
|
||||
|
||||
def set_app(self, app: str):
|
||||
self.app = app
|
||||
|
||||
def set_app_version(self, app_version: str):
|
||||
self.app_version = app_version
|
||||
|
||||
def set_firmware_version(self, firmware_version: str):
|
||||
self.firmware_version = firmware_version
|
||||
self.device_metadata.set_device_firmware_version(firmware_version)
|
||||
|
||||
def set_interface(self, interface: str):
|
||||
self.interface = interface
|
||||
|
||||
def set_tcpdump_command(self, tcpdump_command: str):
|
||||
self.tcpdump_command = tcpdump_command
|
||||
|
||||
# Other
|
||||
|
||||
def build_capture_file_name(self):
|
||||
prefix = ""
|
||||
if self.app is None:
|
||||
prefix = self.device_metadata.get_device_short_name()
|
||||
else:
|
||||
assert str(self.app).strip() not in {"", " "}, f"app is not a valid name: {self.app}"
|
||||
prefix = self.get_app()
|
||||
# assert self.capture_dir is not None, f"{self.capture_dir} does not exist"
|
||||
filename = f"{prefix}_{str(self.capture_id)}.pcap"
|
||||
self.set_capture_file(filename)
|
||||
|
||||
def save_capture_metadata_to_json(self, file_path: Path = Path(CAPTURE_METADATA_FILE)):
|
||||
assert self.capture_dir.is_dir(), f"capture_dir is not a directory: {self.capture_dir}"
|
||||
if file_path.is_file():
|
||||
print(f"File {file_path} already exists, update instead.")
|
||||
return ReturnCodes.FILE_ALREADY_EXISTS
|
||||
metadata = self.model_dump_json(indent=2)
|
||||
metadata = self.model_dump_json(indent=2, exclude_unset=True, exclude_none=True)
|
||||
with file_path.open('w') as file:
|
||||
json.dump(metadata, file)
|
||||
return ReturnCodes.SUCCESS
|
||||
|
||||
@@ -5,8 +5,7 @@ from pathlib import Path
|
||||
from typing import Optional, List, Any
|
||||
|
||||
# iottb modules
|
||||
from iottb.definitions import ReturnCodes
|
||||
|
||||
from iottb.definitions import ReturnCodes, DEVICE_METADATA_FILE
|
||||
# 3rd party libs
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
@@ -20,6 +19,7 @@ class DeviceMetadata(BaseModel):
|
||||
device_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
|
||||
date_created: str = Field(default_factory=lambda: datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower())
|
||||
|
||||
device_root_path: Path
|
||||
# Optional Fields
|
||||
aliases: List[str] = Field(default_factory=lambda: [])
|
||||
device_type: Optional[str] = None
|
||||
@@ -29,15 +29,65 @@ class DeviceMetadata(BaseModel):
|
||||
|
||||
capture_files: Optional[List[str]] = []
|
||||
|
||||
def __init__(self, device_name: str, /, **data: Any):
|
||||
def __init__(self, device_name: str, device_root_dir: Path, /, **data: Any):
|
||||
super().__init__(**data)
|
||||
self.device_name = device_name
|
||||
self.device_short_name = device_name.lower().replace(" ", "_")
|
||||
assert dir_contains_device_metadata(device_root_dir), \
|
||||
f"Directory {device_root_dir} is missing a {DEVICE_METADATA_FILE} file"
|
||||
self.device_root_dir = device_root_dir
|
||||
|
||||
def get_device_id(self) -> str:
|
||||
return self.device_id
|
||||
|
||||
def get_device_name(self) -> str:
|
||||
return self.device_name
|
||||
|
||||
def get_device_short_name(self) -> str:
|
||||
return self.device_short_name
|
||||
|
||||
def get_device_type(self) -> str:
|
||||
return self.device_type
|
||||
|
||||
def get_device_serial_number(self) -> str:
|
||||
return self.device_serial_number
|
||||
|
||||
def get_device_firmware_version(self) -> str:
|
||||
return self.device_firmware_version
|
||||
|
||||
def get_date_updated(self) -> str:
|
||||
return self.date_updated
|
||||
|
||||
def get_capture_files(self) -> List[str]:
|
||||
return self.capture_files
|
||||
|
||||
def get_aliases(self) -> List[str]:
|
||||
return self.aliases
|
||||
|
||||
def set_device_type(self, device_type: str) -> None:
|
||||
self.device_type = device_type
|
||||
self.date_updated = datetime.now().strftime('%d-%m-%YT%H:%M:%S')
|
||||
|
||||
def set_device_serial_number(self, device_serial_number: str) -> None:
|
||||
self.device_serial_number = device_serial_number
|
||||
self.date_updated = datetime.now().strftime('%d-%m-%YT%H:%M:%S')
|
||||
|
||||
def set_device_firmware_version(self, device_firmware_version: str) -> None:
|
||||
self.device_firmware_version = device_firmware_version
|
||||
self.date_updated = datetime.now().strftime('%d-%m-%YT%H:%M:%S')
|
||||
|
||||
def set_device_name(self, device_name: str) -> None:
|
||||
self.device_name = device_name
|
||||
self.device_short_name = device_name.lower().replace(" ", "_")
|
||||
self.date_updated = datetime.now().strftime('%d-%m-%YT%H:%M:%S')
|
||||
|
||||
@classmethod
|
||||
def load_from_json(cls, file_path: Path):
|
||||
assert file_path.is_file()
|
||||
with file_path.open('r') as file:
|
||||
def load_from_json(cls, root_path: Path):
|
||||
assert root_path.is_file()
|
||||
assert root_path.name == DEVICE_METADATA_FILE
|
||||
assert dir_contains_device_metadata(root_path)
|
||||
device_meta_filename = root_path / DEVICE_METADATA_FILE
|
||||
with device_meta_filename.open('r') as file:
|
||||
metadata_json = json.load(file)
|
||||
metadata_model_obj = cls.model_validate_json(metadata_json)
|
||||
return metadata_model_obj
|
||||
@@ -65,3 +115,14 @@ class DeviceMetadata(BaseModel):
|
||||
setattr(metadata, field, value)
|
||||
metadata.date_updated = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
|
||||
pass
|
||||
|
||||
|
||||
def dir_contains_device_metadata(dir_path: Path):
|
||||
if not dir_path.is_dir():
|
||||
return False
|
||||
else:
|
||||
meta_file_path = dir_path / DEVICE_METADATA_FILE
|
||||
if not meta_file_path.is_file():
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
Reference in New Issue
Block a user