2024-bsc-sebastian-lenzlinger/code/iottb/models/capture_metadata_model.py
2024-05-07 19:24:56 +00:00

158 lines
4.8 KiB
Python

import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Optional, Any
from uuid import UUID
from pydantic import BaseModel, Field
from iottb.definitions import ReturnCodes, CAPTURE_METADATA_FILE
from iottb.models.device_metadata_model import DeviceMetadata
class CaptureMetadata(BaseModel):
# Required Fields
device_metadata: DeviceMetadata = Field(exclude=True)
capture_id: uuid.UUID = Field(default_factory=lambda: str(uuid.uuid4()))
capture_dir: Path
capture_file: str
capture_date: str = Field(default_factory=lambda: datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower())
# Statistics
start_time: str
stop_time: str
# tcpdump
packet_count: Optional[int]
pcap_filter: str = ""
tcpdump_command: str = ""
interface: str = ""
# Optional Fields
device_ip_address: Optional[str] = "No IP Address set"
device_mac_address: Optional[str] = None
app: Optional[str] = None
app_version: Optional[str] = None
firmware_version: Optional[str] = None
def __init__(self, device_metadata: DeviceMetadata, capture_dir: Path, /, **data: Any):
super().__init__(**data) # Pycharms orders
self.device_metadata = device_metadata
self.capture_dir = capture_dir
assert capture_dir.is_dir()
# Getters
def get_device_id(self) -> str:
return self.device_id
def get_start_time(self) -> str:
return self.start_time
def get_stop_time(self) -> str:
return self.stop_time
def get_packet_count(self) -> int:
return self.packet_count
def get_pcap_filter(self) -> str:
return self.pcap_filter
def get_device_ip_address(self) -> str:
return self.device_ip_address
def get_device_mac_address(self) -> str:
return self.device_mac_address
def get_app(self) -> str:
return self.app
def get_app_version(self) -> str:
return self.app_version
def get_firmware_version(self) -> str:
return self.firmware_version
def get_capture_id(self) -> UUID:
return self.capture_id
def get_capture_date(self) -> str:
return self.capture_date
def get_capfile_name(self):
return self.capture_file
def get_device_metadata(self) -> DeviceMetadata:
return self.device_metadata
def get_interface(self):
return self.interface
# Setters
def set_capture_dir(self, capture_dir: Path):
self.capture_dir = capture_dir
def set_capture_file(self, capture_file: str):
self.capture_file = capture_file
def set_capture_date(self, capture_date: str):
self.capture_date = capture_date
def set_start_time(self, start_time: str):
self.start_time = start_time
def set_stop_time(self, stop_time: str):
self.stop_time = stop_time
def set_packet_count(self, packet_count: int):
self.packet_count = packet_count
def set_pcap_filter(self, pcap_filter: str):
self.pcap_filter = pcap_filter
def set_device_ip_address(self, device_ip_address: str):
self.device_ip_address = device_ip_address
def set_device_mac_address(self, device_mac_address: str):
self.device_mac_address = device_mac_address
def set_app(self, app: str):
self.app = app
def set_app_version(self, app_version: str):
self.app_version = app_version
def set_firmware_version(self, firmware_version: str):
self.firmware_version = firmware_version
self.device_metadata.set_device_firmware_version(firmware_version)
def set_interface(self, interface: str):
self.interface = interface
def set_tcpdump_command(self, tcpdump_command: str):
self.tcpdump_command = tcpdump_command
# Other
def build_capture_file_name(self):
prefix = ""
if self.app is None:
prefix = self.device_metadata.get_device_short_name()
else:
assert str(self.app).strip() not in {"", " "}, f"app is not a valid name: {self.app}"
prefix = self.get_app()
# assert self.capture_dir is not None, f"{self.capture_dir} does not exist"
filename = f"{prefix}_{str(self.capture_id)}.pcap"
self.set_capture_file(filename)
def save_capture_metadata_to_json(self, file_path: Path = Path(CAPTURE_METADATA_FILE)):
assert self.capture_dir.is_dir(), f"capture_dir is not a directory: {self.capture_dir}"
if file_path.is_file():
print(f"File {file_path} already exists, update instead.")
return ReturnCodes.FILE_ALREADY_EXISTS
metadata = self.model_dump_json(indent=2, exclude_unset=True, exclude_none=True)
with file_path.open('w') as file:
json.dump(metadata, file)
return ReturnCodes.SUCCESS