Merge branch 'refs/heads/dev-logger'

# Conflicts:
#	code/iottb/logger.py
#	code/iottb/models/capture_metadata_model.py
#	code/iottb/models/device_metadata_model.py
#	code/iottb/subcommands/add_device.py
This commit is contained in:
Sebastian Lenzlinger 2024-05-08 03:11:23 +02:00
commit 5c031d8157
19 changed files with 326 additions and 335 deletions

View File

@ -9,12 +9,12 @@ def set_device_ip_address(ip_addr: str, file_path: Path):
assert file_path.is_file()
with file_path.open('r') as f:
data = json.load(f)
current_ip = data["device_ip_address"]
current_ip = data['device_ip_address']
if current_ip is not None:
print(f"Device IP Address is set to {current_ip}")
response = input(f"Do you want to change the recorded IP address to {ip_addr}? [Y/N] ")
if response.upper() == "N":
print("Aborting change to device IP address")
print(f'Device IP Address is set to {current_ip}')
response = input(f'Do you want to change the recorded IP address to {ip_addr}? [Y/N] ')
if response.upper() == 'N':
print('Aborting change to device IP address')
return ReturnCodes.ABORTED
with file_path.open('w') as f:
json.dump(data, f)
@ -26,12 +26,12 @@ def set_device_mac_address(mac_addr: str, file_path: Path):
assert file_path.is_file()
with file_path.open('r') as f:
data = json.load(f)
current_mac = data["device_mac_address"]
current_mac = data['device_mac_address']
if current_mac is not None:
print(f"Device MAC Address is set to {current_mac}")
response = input(f"Do you want to change the recorded MAC address to {mac_addr}? [Y/N] ")
if response.upper() == "N":
print("Aborting change to device MAC address")
print(f'Device MAC Address is set to {current_mac}')
response = input(f'Do you want to change the recorded MAC address to {mac_addr}? [Y/N] ')
if response.upper() == 'N':
print('Aborting change to device MAC address')
return ReturnCodes.ABORTED
with file_path.open('w') as f:
json.dump(data, f)

View File

@ -2,31 +2,31 @@ def setup_sniff_tcpdump_parser(parser_sniff):
# arguments which will be passed to tcpdump
parser_sniff_tcpdump = parser_sniff.add_argument_group('tcpdump arguments')
# TODO: tcpdump_parser.add_argument('-c', '--count', re)
parser_sniff_tcpdump.add_argument("-a", "--ip-address=", help="IP address of the device to sniff", dest="device_ip")
parser_sniff_tcpdump.add_argument("-i", "--interface=", help="Interface of the capture device.", dest="capture_interface",default="")
parser_sniff_tcpdump.add_argument("-I", "--monitor-mode", help="Put interface into monitor mode",
action="store_true")
parser_sniff_tcpdump.add_argument("-n", help="Deactivate name resolution. Option is set by default.",
action="store_true")
parser_sniff_tcpdump.add_argument("-#", "--number",
help="Print packet number at beginning of line. Set by default.",
action="store_true")
parser_sniff_tcpdump.add_argument("-e", help="Print link layer headers. Option is set by default.",
action="store_true")
parser_sniff_tcpdump.add_argument("-t", action="count", default=0,
help="Please see tcpdump manual for details. Unused by default.")
parser_sniff_tcpdump.add_argument('-a', '--ip-address=', help='IP address of the device to sniff', dest='device_ip')
parser_sniff_tcpdump.add_argument('-i', '--interface=', help='Interface of the capture device.', dest='capture_interface',default='')
parser_sniff_tcpdump.add_argument('-I', '--monitor-mode', help='Put interface into monitor mode',
action='store_true')
parser_sniff_tcpdump.add_argument('-n', help='Deactivate name resolution. Option is set by default.',
action='store_true')
parser_sniff_tcpdump.add_argument('-#', '--number',
help='Print packet number at beginning of line. Set by default.',
action='store_true')
parser_sniff_tcpdump.add_argument('-e', help='Print link layer headers. Option is set by default.',
action='store_true')
parser_sniff_tcpdump.add_argument('-t', action='count', default=0,
help='Please see tcpdump manual for details. Unused by default.')
def setup_sniff_parser(subparsers):
# create parser for "sniff" command
parser_sniff = subparsers.add_parser("sniff", help="Start tcpdump capture.")
# create parser for 'sniff' command
parser_sniff = subparsers.add_parser('sniff', help='Start tcpdump capture.')
setup_sniff_tcpdump_parser(parser_sniff)
setup_pcap_filter_parser(parser_sniff)
cap_size_group = parser_sniff.add_mutually_exclusive_group(required=True)
cap_size_group.add_argument("-c", "--count", type=int, help="Number of packets to capture.", default=0)
cap_size_group.add_argument("--mins", type=int, help="Time in minutes to capture.", default=60)
cap_size_group.add_argument('-c', '--count', type=int, help='Number of packets to capture.', default=0)
cap_size_group.add_argument('--mins', type=int, help='Time in minutes to capture.', default=60)
def setup_pcap_filter_parser(parser_sniff):
parser_pcap_filter = parser_sniff.add_argument_parser("pcap-filter expression")
parser_pcap_filter = parser_sniff.add_argument_parser('pcap-filter expression')
pass

View File

@ -15,5 +15,5 @@ class Metadata:
def create_metadata(filename, unique_id, device_details):
date_string = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
meta_filename = f"meta_{date_string}_{unique_id}.json"
date_string = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
meta_filename = f'meta_{date_string}_{unique_id}.json'

View File

@ -8,33 +8,33 @@ from iottb.definitions import DEVICE_METADATA_FILE
def write_device_metadata_to_file(metadata: DeviceMetadata, device_path: Path):
"""Write the device metadata to a JSON file in the specified directory."""
meta_file_path = device_path / "meta.json"
'''Write the device metadata to a JSON file in the specified directory.'''
meta_file_path = device_path / 'meta.json'
meta_file_path.write_text(metadata.json(indent=2))
def confirm_device_metadata(metadata: DeviceMetadata) -> bool:
"""Display device metadata for user confirmation."""
'''Display device metadata for user confirmation.'''
print(metadata.json(indent=2))
return input("Confirm device metadata? (y/n): ").strip().lower() == 'y'
return input('Confirm device metadata? (y/n): ').strip().lower() == 'y'
def get_device_metadata_from_user() -> DeviceMetadata:
"""Prompt the user to enter device details and return a populated DeviceMetadata object."""
device_name = input("Device name: ")
device_short_name = device_name.lower().replace(" ", "-")
'''Prompt the user to enter device details and return a populated DeviceMetadata object.'''
device_name = input('Device name: ')
device_short_name = device_name.lower().replace(' ', '-')
return DeviceMetadata(device_name=device_name, device_short_name=device_short_name)
def initialize_device_root_dir(device_name: str) -> Path:
"""Create and return the path for the device directory."""
'''Create and return the path for the device directory.'''
device_path = Path.cwd() / device_name
device_path.mkdir(exist_ok=True)
return device_path
def write_metadata(metadata: BaseModel, device_name: str):
"""Write device metadata to a JSON file."""
'''Write device metadata to a JSON file.'''
meta_path = Path.cwd() / device_name / DEVICE_METADATA_FILE
meta_path.parent.mkdir(parents=True, exist_ok=True)
with meta_path.open('w') as f:
@ -42,19 +42,19 @@ def write_metadata(metadata: BaseModel, device_name: str):
def get_device_metadata(file_path: Path) -> DeviceMetadata | None:
"""Fetch device metadata from a JSON file."""
'''Fetch device metadata from a JSON file.'''
if dev_metadata_exists(file_path):
with file_path.open('r') as f:
device_metadata_json = json.load(f)
try:
device_metadata = DeviceMetadata.model_validate_json(device_metadata_json)
device_metadata = DeviceMetadata.from_json(device_metadata_json)
return device_metadata
except ValueError as e:
print(f"Validation error for device metadata: {e}")
print(f'Validation error for device metadata: {e}')
else:
# TODO Decide what to do (e.g. search for file etc)
print(f"No device metadata at {file_path}")
print(f'No device metadata at {file_path}')
return None

View File

@ -10,8 +10,8 @@ from iottb.subcommands.add_device import setup_init_device_root_parser
######################
def setup_argparse():
# create top level parser
root_parser = argparse.ArgumentParser(prog="iottb")
subparsers = root_parser.add_subparsers(title="subcommands", required=True, dest="command")
root_parser = argparse.ArgumentParser(prog='iottb')
subparsers = root_parser.add_subparsers(title='subcommands', required=True, dest='command')
setup_capture_parser(subparsers)
setup_init_device_root_parser(subparsers)
@ -27,12 +27,12 @@ def main():
try:
args.func(args)
except KeyboardInterrupt:
print("Received keyboard interrupt. Exiting...")
print('Received keyboard interrupt. Exiting...')
exit(1)
except Exception as e:
print(f"Error: {e}")
print(f'Error: {e}')
# create_capture_directory(args.device_name)
if __name__ == "__main__":
if __name__ == '__main__':
main()

View File

@ -1,16 +1,16 @@
from datetime import datetime
from enum import Flag, unique, global_enum
DEVICE_METADATA_FILE = "device_metadata.json"
CAPTURE_METADATA_FILE = "capture_metadata.json"
TODAY_DATE_STRING = datetime.now().strftime("%d%b%Y").lower() # TODO convert to function in utils or so
DEVICE_METADATA_FILE = 'device_metadata.json'
CAPTURE_METADATA_FILE = 'capture_metadata.json'
TODAY_DATE_STRING = datetime.now().strftime('%d%b%Y').lower() # TODO convert to function in utils or so
CAPTURE_FOLDER_BASENAME = "capture_###"
CAPTURE_FOLDER_BASENAME = 'capture_###'
AFFIRMATIVE_USER_RESPONSE = {"yes", "y", "true", "Y", "Yes", "YES"}
NEGATIVE_USER_RESPONSE = {"no", "n", "N", "No"}
YES_DEFAULT = AFFIRMATIVE_USER_RESPONSE.union({"", " "})
NO_DEFAULT = NEGATIVE_USER_RESPONSE.union({"", " "})
AFFIRMATIVE_USER_RESPONSE = {'yes', 'y', 'true', 'Y', 'Yes', 'YES'}
NEGATIVE_USER_RESPONSE = {'no', 'n', 'N', 'No'}
YES_DEFAULT = AFFIRMATIVE_USER_RESPONSE.union({'', ' '})
NO_DEFAULT = NEGATIVE_USER_RESPONSE.union({'', ' '})
@unique

View File

@ -5,16 +5,16 @@ from logging.handlers import RotatingFileHandler
def setup_logging():
logger_obj = logging.getLogger('iottbLogger')
logger_obj.setLevel(logging.INFO)
logger_obj.setLevel(logging.DEBUG)
file_handler = RotatingFileHandler('iottb.log')
console_handler = logging.StreamHandler(sys.stdout)
file_handler.setLevel(logging.DEBUG)
console_handler.setLevel(logging.INFO)
file_handler.setLevel(logging.INFO)
console_handler.setLevel(logging.DEBUG)
file_fmt = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_fmt = logging.Formatter('%(name)s - %(levelname)s - %(message)s')
file_fmt = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
console_fmt = logging.Formatter('%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s')
file_handler.setFormatter(file_fmt)
console_handler.setFormatter(console_fmt)

View File

@ -2,22 +2,21 @@ import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Optional, Any
from uuid import UUID
from pydantic import BaseModel, Field
from typing import Optional
from iottb.definitions import ReturnCodes, CAPTURE_METADATA_FILE
from iottb.models.device_metadata_model import DeviceMetadata
from iottb.logger import logger
class CaptureMetadata(BaseModel):
class CaptureMetadata:
# Required Fields
device_metadata: DeviceMetadata = Field(exclude=True)
capture_id: uuid.UUID = Field(default_factory=lambda: str(uuid.uuid4()))
device_metadata: DeviceMetadata
capture_id: str = lambda: str(uuid.uuid4())
device_id: str
capture_dir: Path
capture_file: str
capture_date: str = Field(default_factory=lambda: datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower())
capture_date: str = lambda: datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
# Statistics
start_time: str
@ -25,133 +24,79 @@ class CaptureMetadata(BaseModel):
# tcpdump
packet_count: Optional[int]
pcap_filter: str = ""
tcpdump_command: str = ""
interface: str = ""
pcap_filter: str = ''
tcpdump_command: str = ''
interface: str = ''
# Optional Fields
device_ip_address: Optional[str] = "No IP Address set"
device_ip_address: str = 'No IP Address set'
device_mac_address: Optional[str] = None
app: Optional[str] = None
app_version: Optional[str] = None
firmware_version: Optional[str] = None
def __init__(self, device_metadata: DeviceMetadata, capture_dir: Path, /, **data: Any):
super().__init__(**data) # Pycharms orders
def __init__(self, device_metadata: DeviceMetadata, capture_dir: Path):
logger.info(f'Creating CaptureMetadata model from DeviceMetadata: {device_metadata}')
self.device_metadata = device_metadata
self.capture_dir = capture_dir
assert capture_dir.is_dir()
# Getters
def get_device_id(self) -> str:
return self.device_id
def get_start_time(self) -> str:
return self.start_time
def get_stop_time(self) -> str:
return self.stop_time
def get_packet_count(self) -> int:
return self.packet_count
def get_pcap_filter(self) -> str:
return self.pcap_filter
def get_device_ip_address(self) -> str:
return self.device_ip_address
def get_device_mac_address(self) -> str:
return self.device_mac_address
def get_app(self) -> str:
return self.app
def get_app_version(self) -> str:
return self.app_version
def get_firmware_version(self) -> str:
return self.firmware_version
def get_capture_id(self) -> UUID:
return self.capture_id
def get_capture_date(self) -> str:
return self.capture_date
def get_capfile_name(self):
return self.capture_file
def get_device_metadata(self) -> DeviceMetadata:
return self.device_metadata
def get_interface(self):
return self.interface
# Setters
def set_capture_dir(self, capture_dir: Path):
self.capture_dir = capture_dir
def set_capture_file(self, capture_file: str):
self.capture_file = capture_file
def set_capture_date(self, capture_date: str):
self.capture_date = capture_date
def set_start_time(self, start_time: str):
self.start_time = start_time
def set_stop_time(self, stop_time: str):
self.stop_time = stop_time
def set_packet_count(self, packet_count: int):
self.packet_count = packet_count
def set_pcap_filter(self, pcap_filter: str):
self.pcap_filter = pcap_filter
def set_device_ip_address(self, device_ip_address: str):
self.device_ip_address = device_ip_address
def set_device_mac_address(self, device_mac_address: str):
self.device_mac_address = device_mac_address
def set_app(self, app: str):
self.app = app
def set_app_version(self, app_version: str):
self.app_version = app_version
def set_firmware_version(self, firmware_version: str):
self.firmware_version = firmware_version
self.device_metadata.set_device_firmware_version(firmware_version)
def set_interface(self, interface: str):
self.interface = interface
def set_tcpdump_command(self, tcpdump_command: str):
self.tcpdump_command = tcpdump_command
# Other
assert capture_dir.is_dir(), f'Capture directory {capture_dir} does not exist'
def build_capture_file_name(self):
prefix = ""
logger.info(f'Building capture file name')
if self.app is None:
prefix = self.device_metadata.get_device_short_name()
logger.debug(f'No app specified')
prefix = self.device_metadata.device_short_name
else:
assert str(self.app).strip() not in {"", " "}, f"app is not a valid name: {self.app}"
prefix = self.get_app()
# assert self.capture_dir is not None, f"{self.capture_dir} does not exist"
filename = f"{prefix}_{str(self.capture_id)}.pcap"
self.set_capture_file(filename)
logger.debug(f'App specified: {self.app}')
assert str(self.app).strip() not in {'', ' '}, f'app is not a valid name: {self.app}'
prefix = self.app.lower().replace(' ', '_')
# assert self.capture_dir is not None, f'{self.capture_dir} does not exist'
filename = f'{prefix}_{str(self.capture_id)}.pcap'
logger.debug(f'Capture file name: {filename}')
self.capture_file = filename
def save_capture_metadata_to_json(self, file_path: Path = Path(CAPTURE_METADATA_FILE)):
assert self.capture_dir.is_dir(), f"capture_dir is not a directory: {self.capture_dir}"
assert self.capture_dir.is_dir(), f'capture_dir is not a directory: {self.capture_dir}'
if file_path.is_file():
print(f"File {file_path} already exists, update instead.")
print(f'File {file_path} already exists, update instead.')
return ReturnCodes.FILE_ALREADY_EXISTS
metadata = self.model_dump_json(indent=2, exclude_unset=True, exclude_none=True)
metadata = self.to_json(indent=2)
with file_path.open('w') as file:
json.dump(metadata, file)
return ReturnCodes.SUCCESS
def to_json(self, indent=2):
# TODO: Where to validate data?
logger.info(f'Converting CaptureMetadata to JSON')
data = {}
# List of fields from CaptureData class, if fields[key]==True, then it is a required field
fields = {
'capture_id': True, #
'device_id': True,
'capture_dir': True,
'capture_file': False,
'capture_date': False,
'start_time': True,
'stop_time': True,
'packet_count': False,
'pcap_filter': False,
'tcpdump_command': False,
'interface': False,
'device_ip_address': False,
'device_mac_address': False,
'app': False,
'app_version': False,
'firmware_version': False
}
for field, is_mandatory in fields.items():
value = getattr(self, field, None)
if value not in [None, ''] or is_mandatory:
if value in [None, ''] and is_mandatory:
raise ValueError(f'Field {field} is required and cannot be empty.')
data[field] = str(value) if not isinstance(value, str) else value
logger.debug(f'Capture metadata: {data}')
return json.dumps(data, indent=indent)

View File

@ -2,26 +2,26 @@ import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Optional, List, Any
from typing import Optional, List
# iottb modules
from iottb.definitions import ReturnCodes, DEVICE_METADATA_FILE
from iottb.logger import logger
# 3rd party libs
from pydantic import BaseModel, Field
IMMUTABLE_FIELDS = {"device_name", "device_short_name", "device_id", "date_created"}
IMMUTABLE_FIELDS = {'device_name', 'device_short_name', 'device_id', 'date_created'}
class DeviceMetadata(BaseModel):
class DeviceMetadata:
# Required fields
device_name: str
device_short_name: str
device_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
date_created: str = Field(default_factory=lambda: datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower())
device_id: str
date_created: str
device_root_path: Path
# Optional Fields
aliases: List[str] = Field(default_factory=lambda: [])
aliases: Optional[List[str]] = None
device_type: Optional[str] = None
device_serial_number: Optional[str] = None
device_firmware_version: Optional[str] = None
@ -29,91 +29,74 @@ class DeviceMetadata(BaseModel):
capture_files: Optional[List[str]] = []
def __init__(self, device_name: str, device_root_dir: Path, /, **data: Any):
super().__init__(**data)
def __init__(self, device_name: str, device_root_path: Path):
self.device_name = device_name
self.device_short_name = device_name.lower().replace(" ", "_")
# assert dir_contains_device_metadata(device_root_dir), \
# f"Directory {device_root_dir} is missing a {DEVICE_METADATA_FILE} file"
self.device_root_dir = device_root_dir
def get_device_id(self) -> str:
return self.device_id
def get_device_name(self) -> str:
return self.device_name
def get_device_short_name(self) -> str:
return self.device_short_name
def get_device_type(self) -> str:
return self.device_type
def get_device_serial_number(self) -> str:
return self.device_serial_number
def get_device_firmware_version(self) -> str:
return self.device_firmware_version
def get_date_updated(self) -> str:
return self.date_updated
def get_capture_files(self) -> List[str]:
return self.capture_files
def get_aliases(self) -> List[str]:
return self.aliases
def set_device_type(self, device_type: str) -> None:
self.device_type = device_type
self.date_updated = datetime.now().strftime('%d-%m-%YT%H:%M:%S')
def set_device_serial_number(self, device_serial_number: str) -> None:
self.device_serial_number = device_serial_number
self.date_updated = datetime.now().strftime('%d-%m-%YT%H:%M:%S')
def set_device_firmware_version(self, device_firmware_version: str) -> None:
self.device_firmware_version = device_firmware_version
self.date_updated = datetime.now().strftime('%d-%m-%YT%H:%M:%S')
def set_device_name(self, device_name: str) -> None:
self.device_name = device_name
self.device_short_name = device_name.lower().replace(" ", "_")
self.date_updated = datetime.now().strftime('%d-%m-%YT%H:%M:%S')
self.device_short_name = device_name.lower().replace(' ', '_')
self.device_id = str(uuid.uuid4())
self.date_created = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
self.device_root_path = device_root_path
if not self.device_root_path or not self.device_root_path.is_dir():
logger.error(f'Invalid device root path: {device_root_path}')
raise ValueError(f'Invalid device root path: {device_root_path}')
logger.debug(f'Device name: {device_name}')
logger.debug(f'Device short_name: {self.device_short_name}')
logger.debug(f'Device root dir: {device_root_path}')
logger.info(f'Initialized DeviceMetadata model: {device_name}')
@classmethod
def load_from_json(cls, device_file_path: Path):
assert device_file_path.is_file(), f"{device_file_path} is not a file"
assert device_file_path.name == DEVICE_METADATA_FILE, f"{device_file_path} is not a {DEVICE_METADATA_FILE}"
logger.info(f'Loading DeviceMetadata from JSON file: {device_file_path}')
assert device_file_path.is_file(), f'{device_file_path} is not a file'
assert device_file_path.name == DEVICE_METADATA_FILE, f'{device_file_path} is not a {DEVICE_METADATA_FILE}'
device_meta_filename = device_file_path
with device_meta_filename.open('r') as file:
metadata_json = json.load(file)
metadata_model_obj = cls.model_validate_json(metadata_json)
metadata_model_obj = cls.from_json(metadata_json)
return metadata_model_obj
def save_to_json(self, file_path: Path):
logger.info(f'Saving DeviceMetadata to JSON file: {file_path}')
if file_path.is_file():
print(f"File {file_path} already exists, update instead.")
print(f'File {file_path} already exists, update instead.')
return ReturnCodes.FILE_ALREADY_EXISTS
metadata = self.model_dump_json(indent=2)
metadata = self.to_json(indent=2)
with file_path.open('w') as file:
json.dump(metadata, file)
return ReturnCodes.SUCCESS
@classmethod
def update_metadata_in_json(cls, file_path: Path, **kwargs):
# TODO Maybe not needed at all.
assert file_path.is_file()
for field in IMMUTABLE_FIELDS:
if field in kwargs:
print(f"Field {field} is immutable")
return ReturnCodes.IMMUTABLE
metadata = cls.load_from_json(file_path)
for field, value in kwargs.items():
if field in metadata.model_fields_set:
setattr(metadata, field, value)
metadata.date_updated = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
pass
def from_json(cls, metadata_json):
if isinstance(metadata_json, dict):
return DeviceMetadata(**metadata_json)
def to_json(self, indent=2):
# TODO: atm almost exact copy as in CaptureMetadata
data = {}
fields = {
'device_name': True,
'device_short_name': True,
'device_id': True,
'date_created': True,
'device_root_path': True,
'aliases': False,
'device_type': False,
'device_serial_number': False,
'device_firmware_version': False,
'date_updated': False,
'capture_files': False,
}
for field, is_mandatory in fields.items():
value = getattr(self, field, None)
if value not in [None, ''] or is_mandatory:
if value in [None, ''] and is_mandatory:
logger.debug(f'Mandatory field {field}: {value}')
raise ValueError(f'Field {field} is required and cannot be empty.')
data[field] = str(value) if not isinstance(value, str) else value
logger.debug(f'Device metadata: {data}')
return json.dumps(data, indent=indent)
def dir_contains_device_metadata(dir_path: Path):
@ -121,7 +104,7 @@ def dir_contains_device_metadata(dir_path: Path):
return False
else:
meta_file_path = dir_path / DEVICE_METADATA_FILE
print(f"Device metadata file path {str(meta_file_path)}")
print(f'Device metadata file path {str(meta_file_path)}')
if not meta_file_path.is_file():
return False
else:

View File

@ -1,38 +1,56 @@
import logging
import pathlib
from iottb import definitions
from iottb.definitions import DEVICE_METADATA_FILE, ReturnCodes
from iottb.definitions import DEVICE_METADATA_FILE
from iottb.logger import logger
from iottb.models.device_metadata_model import DeviceMetadata
from iottb.utils.device_metadata_utils import *
from archive.device_metadata_utils import *
logger.setLevel(logging.INFO) # Since module currently passes all tests
def setup_init_device_root_parser(subparsers):
parser = subparsers.add_parser("add-device", aliases=["add-device-root", "add"])
parser.add_argument("--root_dir", type=pathlib.Path, default=pathlib.Path.cwd())
parser = subparsers.add_parser('add-device', aliases=['add-device-root', 'add'])
parser.add_argument('--root_dir', type=pathlib.Path, default=pathlib.Path.cwd())
group = parser.add_mutually_exclusive_group()
group.add_argument("--guided", action="store_true", help="Guided setup", default=False)
group.add_argument("--name", action="store", type=str, help="name of device")
group.add_argument('--guided', action='store_true', help='Guided setup', default=False)
group.add_argument('--name', action='store', type=str, help='name of device')
parser.set_defaults(func=handle_add)
def handle_add(args):
logger.info(f'Add device handler called with args {args}')
args.root_dir.mkdir(parents=True, exist_ok=True) # else metadata.save_to_file will fail TODO: unclear what to assume
if args.guided:
logger.debug('begin guided setup')
metadata = guided_setup(args.root_dir)
logger.debug('guided setup complete')
else:
device_name = args.name
args.root_dir.mkdir(parents=True, exist_ok=True)
metadata = DeviceMetadata(device_name, args.root_dir)
logger.debug('Setup through passed args: setup')
if not args.name:
logger.error('No device name specified with unguided setup.')
return ReturnCodes.ERROR
metadata = DeviceMetadata(args.name, args.root_dir)
file_path = args.root_dir / DEVICE_METADATA_FILE
response = input(f"Confirm device metadata: {metadata.model_dump()} [y/N]")
if response.lower() not in definitions.AFFIRMATIVE_USER_RESPONSE.add(""):
configure_metadata()
assert False, "TODO implement dynamic setup"
if metadata.save_to_json(file_path) == ReturnCodes.FILE_ALREADY_EXISTS:
print("Directory already contains a device metadata file. Aborting operation.")
if file_path.exists():
print('Directory already contains a metadata file. Aborting.')
return ReturnCodes.ABORTED
assert Path(file_path).exists(), f"{file_path} does not exist"
serialized_metadata = metadata.to_json()
response = input(f'Confirm device metadata: {serialized_metadata} [y/N]')
logger.debug(f'response: {response}')
if response not in definitions.AFFIRMATIVE_USER_RESPONSE:
print('Adding device aborted by user.')
return ReturnCodes.ABORTED
logger.debug(f'Device metadata file {file_path}')
if metadata.save_to_json(file_path) == ReturnCodes.FILE_ALREADY_EXISTS:
logger.error('File exists after checking, which should not happen.')
return ReturnCodes.ABORTED
print('Device metadata successfully created.')
return ReturnCodes.SUCCESS
@ -41,12 +59,17 @@ def configure_metadata():
def guided_setup(device_root) -> DeviceMetadata:
response = "N"
device_name = ""
while response.upper() == "N":
device_name = input("Please enter name of device: ")
if device_name == "" or device_name is None:
print("Name cannot be empty")
logger.info('Guided setup')
response = 'N'
device_name = ''
while response.upper() == 'N':
device_name = input('Please enter name of device: ')
response = input(f'Confirm device name: {device_name} [y/N] ')
if device_name == '' or device_name is None:
print('Name cannot be empty')
logger.warning('Name cannot be empty')
logger.debug(f'Response is {response}')
logger.debug(f'Device name is {device_name}')
return DeviceMetadata(device_name, device_root)

View File

@ -10,31 +10,31 @@ from iottb.utils.capture_utils import get_capture_src_folder, make_capture_src_f
def setup_capture_parser(subparsers):
parser = subparsers.add_parser('sniff', help='Sniff packets with tcpdump')
# metadata args
parser.add_argument("-a", "--ip-address", help="IP address of the device to sniff", dest="device_ip")
parser.add_argument('-a', '--ip-address', help='IP address of the device to sniff', dest='device_ip')
# tcpdump args
parser.add_argument("device_root", help="Root folder for device to sniff",
parser.add_argument('device_root', help='Root folder for device to sniff',
type=Path, default=Path.cwd())
parser.add_argument("-s", "--safe", help="Ensure correct device root folder before sniffing", action="store_true")
parser.add_argument("--app", help="Application name to sniff", dest="app_name", default=None)
parser.add_argument('-s', '--safe', help='Ensure correct device root folder before sniffing', action='store_true')
parser.add_argument('--app', help='Application name to sniff', dest='app_name', default=None)
parser_sniff_tcpdump = parser.add_argument_group('tcpdump arguments')
parser_sniff_tcpdump.add_argument("-i", "--interface", help="Interface to capture on.", dest="capture_interface",
parser_sniff_tcpdump.add_argument('-i', '--interface', help='Interface to capture on.', dest='capture_interface',
required=True)
parser_sniff_tcpdump.add_argument("-I", "--monitor-mode", help="Put interface into monitor mode",
action="store_true")
parser_sniff_tcpdump.add_argument("-n", help="Deactivate name resolution. True by default.",
action="store_true", dest="no_name_resolution")
parser_sniff_tcpdump.add_argument("-#", "--number",
help="Print packet number at beginning of line. True by default.",
action="store_true")
parser_sniff_tcpdump.add_argument("-e", help="Print link layer headers. True by default.",
action="store_true", dest="print_link_layer")
parser_sniff_tcpdump.add_argument("-t", action="count", default=0,
help="Please see tcpdump manual for details. Unused by default.")
parser_sniff_tcpdump.add_argument('-I', '--monitor-mode', help='Put interface into monitor mode',
action='store_true')
parser_sniff_tcpdump.add_argument('-n', help='Deactivate name resolution. True by default.',
action='store_true', dest='no_name_resolution')
parser_sniff_tcpdump.add_argument('-#', '--number',
help='Print packet number at beginning of line. True by default.',
action='store_true')
parser_sniff_tcpdump.add_argument('-e', help='Print link layer headers. True by default.',
action='store_true', dest='print_link_layer')
parser_sniff_tcpdump.add_argument('-t', action='count', default=0,
help='Please see tcpdump manual for details. Unused by default.')
cap_size_group = parser.add_mutually_exclusive_group(required=False)
cap_size_group.add_argument("-c", "--count", type=int, help="Number of packets to capture.", default=1000)
cap_size_group.add_argument("--mins", type=int, help="Time in minutes to capture.", default=1)
cap_size_group.add_argument('-c', '--count', type=int, help='Number of packets to capture.', default=1000)
cap_size_group.add_argument('--mins', type=int, help='Time in minutes to capture.', default=1)
parser.set_defaults(func=handle_capture)
@ -45,25 +45,25 @@ def cwd_is_device_root_dir() -> bool:
def start_guided_device_root_dir_setup():
assert False, "Not implemented"
assert False, 'Not implemented'
def handle_metadata():
assert not cwd_is_device_root_dir()
print(f"Unable to find {DEVICE_METADATA_FILE} in current working directory")
print("You need to setup a device root directory before using this command")
response = input("Would you like to be guided through the setup? [y/n]")
if response.lower() == "y":
print(f'Unable to find {DEVICE_METADATA_FILE} in current working directory')
print('You need to setup a device root directory before using this command')
response = input('Would you like to be guided through the setup? [y/n]')
if response.lower() == 'y':
start_guided_device_root_dir_setup()
else:
print("'iottb init-device-root --help' for more information.")
print('\'iottb init-device-root --help\' for more information.')
exit(ReturnCodes.ABORTED)
# device_id = handle_capture_metadata()
return ReturnCodes.SUCCESS
def get_device_metadata_from_file(device_metadata_filename: Path) -> str:
assert device_metadata_filename.is_file(), f"Device metadata file '{device_metadata_filename} does not exist"
assert device_metadata_filename.is_file(), f'Device metadata file f"{device_metadata_filename}" does not exist'
device_metadata = DeviceMetadata.load_from_json(device_metadata_filename)
return device_metadata
@ -73,26 +73,26 @@ def run_tcpdump(cmd):
try:
p = subprocess.run(cmd, capture_output=True, text=True, check=True)
if p.returncode != 0:
print(f"Error running tcpdump {p.stderr}")
print(f'Error running tcpdump {p.stderr}')
else:
print(f"tcpdump run successfully\n: {p.stdout}")
print(f'tcpdump run successfully\n: {p.stdout}')
except KeyboardInterrupt:
pass
def handle_capture(args):
assert args.device_root is not None, f"Device root directory is required"
assert dir_contains_device_metadata(args.device_root), f"Device metadata file '{args.device_root}' does not exist"
assert args.device_root is not None, f'Device root directory is required'
assert dir_contains_device_metadata(args.device_root), f'Device metadata file \'{args.device_root}\' does not exist'
# get device metadata
if args.safe and not dir_contains_device_metadata(args.device_root):
print(f"Supplied folder contains no device metadata. "
f"Please setup a device root directory before using this command")
print(f'Supplied folder contains no device metadata. '
f'Please setup a device root directory before using this command')
exit(ReturnCodes.ABORTED)
elif dir_contains_device_metadata(args.device_root):
device_metadata_filename = args.device_root / DEVICE_METADATA_FILE
device_data = DeviceMetadata.load_from_json(device_metadata_filename)
else:
name = input("Please enter a device name: ")
name = input('Please enter a device name: ')
args.device_root.mkdir(parents=True, exist_ok=True)
device_data = DeviceMetadata(name, args.device_root)
# start constructing environment for capture
@ -100,10 +100,10 @@ def handle_capture(args):
make_capture_src_folder(capture_dir)
capture_metadata = CaptureMetadata(device_data, capture_dir)
capture_metadata.set_interface(args.capture_interface)
capture_metadata.interface = args.capture_interface
cmd = ['sudo', 'tcpdump', '-i', args.capture_interface]
cmd = build_tcpdump_args(args, cmd, capture_metadata)
capture_metadata.set_tcpdump_command(cmd)
capture_metadata.tcpdump_command = cmd
print('Executing: ' + ' '.join(cmd))
@ -112,16 +112,16 @@ def handle_capture(args):
start_time = datetime.now().strftime('%H:%M:%S')
run_tcpdump(cmd)
stop_time = datetime.now().strftime('%H:%M:%S')
capture_metadata.set_start_time(start_time)
capture_metadata.set_stop_time(stop_time)
capture_metadata.start_time = start_time
capture_metadata.stop_time = stop_time
except KeyboardInterrupt:
print("Received keyboard interrupt.")
print('Received keyboard interrupt.')
exit(ReturnCodes.ABORTED)
except subprocess.CalledProcessError as e:
print(f"Failed to capture packet: {e}")
print(f'Failed to capture packet: {e}')
exit(ReturnCodes.FAILURE)
except Exception as e:
print(f"Failed to capture packet: {e}")
print(f'Failed to capture packet: {e}')
exit(ReturnCodes.FAILURE)
return ReturnCodes.SUCCESS
@ -141,18 +141,18 @@ def build_tcpdump_args(args, cmd, capture_metadata: CaptureMetadata):
cmd.append('-c')
cmd.append(str(args.count))
elif args.mins:
assert False, "Unimplemented option"
assert False, 'Unimplemented option'
if args.app_name is not None:
capture_metadata.set_app_name(args.app_name)
capture_metadata.app = args.app_name
capture_metadata.build_capture_file_name()
cmd.append('-w')
cmd.append(capture_metadata.get_capfile_name())
cmd.append(capture_metadata.capture_file)
if args.safe:
cmd.append(f'host {args.device_ip}') # if not specified, filter 'any' implied by tcpdump
capture_metadata.set_device_ip_address(args.device_ip)
capture_metadata.device_id = args.device_ip
return cmd
@ -162,7 +162,7 @@ def build_tcpdump_args(args, cmd, capture_metadata: CaptureMetadata):
# if args.app_name is not None:
# capture_file_prefix = args.app_name
# capture_metadata.set_app(args.app_name)
# capfile_name = capture_file_prefix + "_" + str(capture_metadata.get_capture_id()) + ".pcap"
# capfile_name = capture_file_prefix + '_' + str(capture_metadata.get_capture_id()) + '.pcap'
# capture_metadata.set_capture_file(capfile_name)
# capfile_abs_path = capture_dir / capfile_name
# capture_metadata.set_capture_file(capfile_name)

View File

@ -16,29 +16,29 @@ def get_capture_date_folder(device_root: Path):
try:
today_folder.mkdir()
except FileExistsError:
print(f"Folder {today_folder} already exists")
print(f'Folder {today_folder} already exists')
return today_folder
raise FileNotFoundError(f"Given path {device_root} is not a device root directory")
raise FileNotFoundError(f'Given path {device_root} is not a device root directory')
def get_capture_src_folder(device_folder: Path):
assert device_folder.is_dir(), f"Given path {device_folder} is not a folder"
assert device_folder.is_dir(), f'Given path {device_folder} is not a folder'
today_iso = get_iso_date()
max_sequence_number = 1
for d in device_folder.iterdir():
if d.is_dir() and d.name.startswith(f'{today_iso}_capture_'):
name = d.name
num = int(name.split("_")[2])
num = int(name.split('_')[2])
max_sequence_number = max(max_sequence_number, num)
next_sequence_number = max_sequence_number + 1
return device_folder.joinpath(f"{today_iso}_capture_{next_sequence_number:03}")
return device_folder.joinpath(f'{today_iso}_capture_{next_sequence_number:03}')
def make_capture_src_folder(capture_src_folder: Path):
try:
capture_src_folder.mkdir()
except FileExistsError:
print(f"Folder {capture_src_folder} already exists")
print(f'Folder {capture_src_folder} already exists')
finally:
return capture_src_folder

View File

@ -12,7 +12,7 @@ def check_installed() -> bool:
def ensure_installed():
"""Ensure that tcpdump is installed, raise an error if not."""
if not check_installed():
raise RuntimeError("tcpdump is not installed. Please install it to continue.")
raise RuntimeError('tcpdump is not installed. Please install it to continue.')
def list_interfaces() -> str:
@ -22,8 +22,8 @@ def list_interfaces() -> str:
result = subprocess.run(['tcpdump', '--list-interfaces'], capture_output=True, text=True, check=True)
return result.stdout
except subprocess.CalledProcessError as e:
print(f"Failed to list interfaces: {e}")
return ""
print(f'Failed to list interfaces: {e}')
return ''
def is_valid_ipv4(ip: str) -> bool:

View File

@ -13,6 +13,6 @@ def subfolder_exists(parent: Path, child: str):
def generate_unique_string_with_prefix(prefix: str):
return prefix + "_" + str(uuid.uuid4())
return prefix + '_' + str(uuid.uuid4())

View File

@ -0,0 +1,44 @@
import sys
import unittest
from io import StringIO
from unittest.mock import patch, MagicMock
from pathlib import Path
from iottb.definitions import DEVICE_METADATA_FILE
import shutil
from iottb.__main__ import main
class TestDeviceMetadataFileCreation(unittest.TestCase):
def setUp(self):
self.test_dir = Path('/tmp/iottbtest/test_add_device')
self.test_dir.mkdir(parents=True, exist_ok=True)
# self.captured_output = StringIO()
# sys.stdout = self.captured_output
def tearDown(self):
# shutil.rmtree(str(self.test_dir))
for item in self.test_dir.iterdir():
if item.is_dir():
item.rmdir()
else:
item.unlink()
self.test_dir.rmdir()
# sys.stdout = sys.__stdout__
@patch('builtins.input', side_effect=['iPhone 14', 'y', 'y'])
def test_guided_device_setup(self, mock_input):
sys.argv = ['__main__.py', 'add', '--root_dir', str(self.test_dir), '--guided']
main()
expected_file = self.test_dir / DEVICE_METADATA_FILE
self.assertTrue(expected_file.exists()), f'Expected file not created: {expected_file}'
@patch('builtins.input', side_effect=['y']) # need mock_input else wont work
def test_device_setup(self, mock_input):
sys.argv = ['__main__.py', 'add', '--root_dir', str(self.test_dir), '--name', 'iPhone 14']
main()
expected_file = self.test_dir / DEVICE_METADATA_FILE
self.assertTrue(expected_file.exists()), f'Expected file not created: {expected_file}'
if __name__ == '__main__':
unittest.main()

View File

@ -1,6 +1,2 @@
import json
from pathlib import Path
from unittest.mock import mock_open, patch
import pytest
from iottb.utils.capture_metadata_utils import set_device_ip_address