103 lines
3.7 KiB
Python
103 lines
3.7 KiB
Python
import json
|
|
import uuid
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from iottb.definitions import ReturnCodes, CAPTURE_METADATA_FILE
|
|
from iottb.models.device_metadata_model import DeviceMetadata
|
|
from iottb.logger import logger
|
|
|
|
|
|
class CaptureMetadata:
|
|
# Required Fields
|
|
device_metadata: DeviceMetadata
|
|
capture_id: str = lambda: str(uuid.uuid4())
|
|
device_id: str
|
|
capture_dir: Path
|
|
capture_file: str
|
|
capture_date: str = lambda: datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
|
|
|
|
# Statistics
|
|
start_time: str
|
|
stop_time: str
|
|
|
|
# tcpdump
|
|
packet_count: Optional[int]
|
|
pcap_filter: str = ''
|
|
tcpdump_command: str = ''
|
|
interface: str = ''
|
|
|
|
# Optional Fields
|
|
device_ip_address: str = 'No IP Address set'
|
|
device_mac_address: Optional[str] = None
|
|
|
|
app: Optional[str] = None
|
|
app_version: Optional[str] = None
|
|
firmware_version: Optional[str] = None
|
|
|
|
def __init__(self, device_metadata: DeviceMetadata, capture_dir: Path):
|
|
logger.info(f'Creating CaptureMetadata model from DeviceMetadata: {device_metadata}')
|
|
self.device_metadata = device_metadata
|
|
|
|
self.capture_dir = capture_dir
|
|
assert capture_dir.is_dir(), f'Capture directory {capture_dir} does not exist'
|
|
|
|
def build_capture_file_name(self):
|
|
logger.info(f'Building capture file name')
|
|
if self.app is None:
|
|
logger.debug(f'No app specified')
|
|
prefix = self.device_metadata.device_short_name
|
|
else:
|
|
logger.debug(f'App specified: {self.app}')
|
|
assert str(self.app).strip() not in {'', ' '}, f'app is not a valid name: {self.app}'
|
|
prefix = self.app.lower().replace(' ', '_')
|
|
# assert self.capture_dir is not None, f'{self.capture_dir} does not exist'
|
|
filename = f'{prefix}_{str(self.capture_id)}.pcap'
|
|
logger.debug(f'Capture file name: {filename}')
|
|
self.capture_file = filename
|
|
|
|
def save_capture_metadata_to_json(self, file_path: Path = Path(CAPTURE_METADATA_FILE)):
|
|
assert self.capture_dir.is_dir(), f'capture_dir is not a directory: {self.capture_dir}'
|
|
if file_path.is_file():
|
|
print(f'File {file_path} already exists, update instead.')
|
|
return ReturnCodes.FILE_ALREADY_EXISTS
|
|
metadata = self.to_json(indent=2)
|
|
with file_path.open('w') as file:
|
|
json.dump(metadata, file)
|
|
return ReturnCodes.SUCCESS
|
|
|
|
def to_json(self, indent=2):
|
|
# TODO: Where to validate data?
|
|
logger.info(f'Converting CaptureMetadata to JSON')
|
|
data = {}
|
|
|
|
# List of fields from CaptureData class, if fields[key]==True, then it is a required field
|
|
fields = {
|
|
'capture_id': True, #
|
|
'device_id': True,
|
|
'capture_dir': True,
|
|
'capture_file': False,
|
|
'capture_date': False,
|
|
'start_time': True,
|
|
'stop_time': True,
|
|
'packet_count': False,
|
|
'pcap_filter': False,
|
|
'tcpdump_command': False,
|
|
'interface': False,
|
|
'device_ip_address': False,
|
|
'device_mac_address': False,
|
|
'app': False,
|
|
'app_version': False,
|
|
'firmware_version': False
|
|
}
|
|
|
|
for field, is_mandatory in fields.items():
|
|
value = getattr(self, field, None)
|
|
if value not in [None, ''] or is_mandatory:
|
|
if value in [None, ''] and is_mandatory:
|
|
raise ValueError(f'Field {field} is required and cannot be empty.')
|
|
data[field] = str(value) if not isinstance(value, str) else value
|
|
logger.debug(f'Capture metadata: {data}')
|
|
return json.dumps(data, indent=indent)
|