72 lines
2.5 KiB
Python
72 lines
2.5 KiB
Python
import json
|
|
import uuid
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional, Any
|
|
|
|
from iottb.definitions import ReturnCodes, CAPTURE_METADATA_FILE
|
|
from iottb.models.device_metadata_model import DeviceMetadata
|
|
from iottb.logger import logger
|
|
|
|
|
|
class CaptureMetadata:
|
|
# Required Fields
|
|
device_metadata: DeviceMetadata
|
|
capture_id: str = lambda: str(uuid.uuid4())
|
|
device_id: str
|
|
capture_dir: Path
|
|
capture_file: str
|
|
capture_date: str = lambda: datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
|
|
|
|
# Statistics
|
|
start_time: str
|
|
stop_time: str
|
|
|
|
# tcpdump
|
|
packet_count: Optional[int]
|
|
pcap_filter: str = ""
|
|
tcpdump_command: str = ""
|
|
interface: str = ""
|
|
|
|
# Optional Fields
|
|
device_ip_address: str = "No IP Address set"
|
|
device_mac_address: Optional[str] = None
|
|
|
|
app: Optional[str] = None
|
|
app_version: Optional[str] = None
|
|
firmware_version: Optional[str] = None
|
|
|
|
def __init__(self, device_metadata: DeviceMetadata, capture_dir: Path, /, **data: Any):
|
|
logger.info(f"Creating CaptureMetadata model from DeviceMetadata: {device_metadata}")
|
|
self.device_metadata = device_metadata
|
|
|
|
self.capture_dir = capture_dir
|
|
assert capture_dir.is_dir(), f"Capture directory {capture_dir} does not exist"
|
|
|
|
def build_capture_file_name(self):
|
|
logger.info(f"Building capture file name")
|
|
if self.app is None:
|
|
logger.debug(f"No app specified")
|
|
prefix = self.device_metadata.device_short_name
|
|
else:
|
|
logger.debug(f"App specified: {self.app}")
|
|
assert str(self.app).strip() not in {"", " "}, f"app is not a valid name: {self.app}"
|
|
prefix = self.app.lower().replace(" ", "_")
|
|
# assert self.capture_dir is not None, f"{self.capture_dir} does not exist"
|
|
filename = f"{prefix}_{str(self.capture_id)}.pcap"
|
|
logger.debug(f"Capture file name: {filename}")
|
|
self.capture_file = filename
|
|
|
|
def save_capture_metadata_to_json(self, file_path: Path = Path(CAPTURE_METADATA_FILE)):
|
|
assert self.capture_dir.is_dir(), f"capture_dir is not a directory: {self.capture_dir}"
|
|
if file_path.is_file():
|
|
print(f"File {file_path} already exists, update instead.")
|
|
return ReturnCodes.FILE_ALREADY_EXISTS
|
|
metadata = self.to_json(indent=2)
|
|
with file_path.open('w') as file:
|
|
json.dump(metadata, file)
|
|
return ReturnCodes.SUCCESS
|
|
|
|
def to_json(self, indent):
|
|
pass
|