diff --git a/.idea/workspace.xml b/.idea/workspace.xml
index 141240e..60695e5 100644
--- a/.idea/workspace.xml
+++ b/.idea/workspace.xml
@@ -4,26 +4,15 @@
-
-
-
-
+
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
@@ -165,7 +154,7 @@
-
+
@@ -223,7 +212,15 @@
1714823516954
-
+
+
+ 1714919098392
+
+
+
+ 1714919098392
+
+
@@ -280,7 +277,8 @@
-
+
+
diff --git a/code/iottb/__main__.py b/code/iottb/__main__.py
index 17fa694..3ed6dcd 100644
--- a/code/iottb/__main__.py
+++ b/code/iottb/__main__.py
@@ -2,7 +2,7 @@
import argparse
from iottb.subcommands.capture import setup_capture_parser
-from iottb.subcommands.init import setup_init_iottb_root_parser
+from iottb.subcommands.add_device import setup_init_device_root_parser
CAP_DIR_PREFIX = ...
@@ -16,7 +16,7 @@ def setup_argparse():
subparsers = root_parser.add_subparsers(title="subcommands", required=True, dest="command")
setup_capture_parser(subparsers)
- setup_init_iottb_root_parser(subparsers)
+ setup_init_device_root_parser(subparsers)
return root_parser
diff --git a/code/iottb/definitions.py b/code/iottb/definitions.py
index 8471ea2..202f572 100644
--- a/code/iottb/definitions.py
+++ b/code/iottb/definitions.py
@@ -6,7 +6,10 @@ DEVICE_METADATA_FILE = "device-metadata.json"
CAPTURE_METADATA_FILE = "capture-metadata.json"
TODAY_DATE_STRING = datetime.now().strftime("%d%b%Y").lower() # TODO convert to function in utils or so
-
+AFFIRMATIVE_USER_RESPONSE = {"yes", "y", "true", "Y", "Yes", "YES"}
+NEGATIVE_USER_RESPONSE = {"no", "n", "N", "No"}
+YES_DEFAULT = AFFIRMATIVE_USER_RESPONSE.union({"", " "})
+NO_DEFAULT = NEGATIVE_USER_RESPONSE.union({"", " "})
@unique
@global_enum
class ReturnCodes(Flag):
diff --git a/code/iottb/models/capture_metadata_model.py b/code/iottb/models/capture_metadata_model.py
index 47cfed4..aa320bf 100644
--- a/code/iottb/models/capture_metadata_model.py
+++ b/code/iottb/models/capture_metadata_model.py
@@ -9,7 +9,7 @@ from pydantic import BaseModel, Field
from iottb.definitions import ReturnCodes
-class KydcapCaptureMetadata(BaseModel):
+class CaptureMetadata(BaseModel):
# Required Fields
device_id: str
capture_id: uuid.UUID = Field(default_factory=lambda: str(uuid.uuid4()))
diff --git a/code/iottb/models/device_metadata_model.py b/code/iottb/models/device_metadata_model.py
index 3609ba3..9435d0a 100644
--- a/code/iottb/models/device_metadata_model.py
+++ b/code/iottb/models/device_metadata_model.py
@@ -21,6 +21,7 @@ class DeviceMetadata(BaseModel):
date_created: str = Field(default_factory=lambda: datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower())
# Optional Fields
+ aliases: List[str] = Field(default_factory=lambda: [])
device_type: Optional[str] = None
device_serial_number: Optional[str] = None
device_firmware_version: Optional[str] = None
diff --git a/code/iottb/subcommands/add_device.py b/code/iottb/subcommands/add_device.py
new file mode 100644
index 0000000..47b90ea
--- /dev/null
+++ b/code/iottb/subcommands/add_device.py
@@ -0,0 +1,59 @@
+import pathlib
+
+from iottb import definitions
+from iottb.definitions import DEVICE_METADATA_FILE, ReturnCodes
+from iottb.models.device_metadata_model import DeviceMetadata
+from iottb.utils.device_metadata_utils import *
+
+
+def setup_init_device_root_parser(subparsers):
+ parser = subparsers.add_parser("add-device", aliases=["add-device-root", "add"])
+ parser.add_argument("--root", type=pathlib.Path, default=pathlib.Path.cwd())
+ group = parser.add_mutually_exclusive_group()
+ group.add_argument("--guided", action="store_true", help="Guided setup", default=False)
+ group.add_argument("name", action="store", type=str, help="name of device")
+ parser.set_defaults(func=handle_add)
+
+
+def handle_add(args):
+ print("Entered add-device-root")
+ metadata = None
+ if args.guided:
+ metadata = guided_setup(args.root_dir)
+ else:
+ device_name = args.name
+ args.root_dir.mkdir(parents=True, exist_ok=True)
+ args.root_dir.chdir()
+ metadata = DeviceMetadata(device_name)
+
+ file_path = args.root_dir / DEVICE_METADATA_FILE
+ response = input(f"Confirm device metadata: {metadata.model_dump()} [y/N]")
+ if response.lower() not in definitions.AFFIRMATIVE_USER_RESPONSE.add(""):
+ configure_metadata()
+ assert False, "TODO implement dynamic setup"
+ assert metadata.model_dump() != ""
+ if metadata.save_to_json(file_path) == ReturnCodes.FILE_ALREADY_EXISTS:
+ print("Directory already contains a device metadata file. Aborting operation.")
+ return ReturnCodes.ABORTED
+ assert Path(file_path).exists(), f"{file_path} does not exist"
+ return ReturnCodes.SUCCESS
+
+
+def configure_metadata():
+ pass
+
+
+def guided_setup(device_root) -> DeviceMetadata:
+ response = "N"
+ device_name = ""
+ while response.upper() == "N":
+ device_name = input("Please enter name of device: ")
+ if device_name == "" or device_name is None:
+ print("Name cannot be empty")
+ response = input(f"Confirm device name: {device_name} [y/N]")
+
+ assert response.lower() in definitions.AFFIRMATIVE_USER_RESPONSE.add(""), f"{response.upper()} not supported"
+ assert device_name != ""
+ assert device_name is not None
+ return DeviceMetadata(device_name)
+
diff --git a/code/iottb/subcommands/capture.py b/code/iottb/subcommands/capture.py
index f606d52..513e1c2 100644
--- a/code/iottb/subcommands/capture.py
+++ b/code/iottb/subcommands/capture.py
@@ -1,8 +1,10 @@
import subprocess
+import uuid
from pathlib import Path
from iottb.definitions import *
from iottb.models.device_metadata_model import DeviceMetadata
+from iottb.models.capture_metadata_model import CaptureMetadata
def setup_capture_parser(subparsers):
@@ -37,7 +39,7 @@ def setup_capture_parser(subparsers):
def cwd_is_device_root_dir() -> bool:
device_metadata_file = Path.cwd() / DEVICE_METADATA_FILE
- return device_metadata_file.exists()
+ return device_metadata_file.is_file()
def get_user_capture_config():
@@ -61,57 +63,63 @@ def handle_metadata():
return ReturnCodes.SUCCESS
-def handle_capture_metadata():
- device_metadata_json = Path.cwd() / DEVICE_METADATA_FILE
- device_metadata = DeviceMetadata.load_from_json(device_metadata_json)
+def get_device_id_from_file(device_metadata_filename: Path) -> str:
+ assert device_metadata_filename.is_file(), f"Device metadata file '{device_metadata_filename} does not exist"
+ device_metadata = DeviceMetadata.load_from_json(device_metadata_filename)
device_id = device_metadata.device_id
return device_id
-def handle_date_dir():
+def setup_capture_folder():
pass
def run_tcpdum(cmd):
- subprocess.run(cmd)
+ try:
+ p = subprocess.run(cmd)
+ except KeyboardInterrupt:
+ pass
+def generate_capfile_name():
+ name = datetime.now().strftime("%Y%m%d_%H%M%S") + uuid.uuid4().hex
def handle_capture(args):
- if cwd_is_device_root_dir():
- handle_date_dir()
- cmd = ['sudo tcpdump', '-i', args.capture_interface]
- if args.monitor_mode:
- cmd.append('-I')
- if args.no_name_resolution:
- cmd.append('-n')
- if args.number:
- cmd.append('-#')
- if args.print_link_layer:
- cmd.append('-e')
- if args.count:
- cmd.append('-c')
- cmd.append(str(args.count))
- elif args.mins:
- pass
- print('Executing: ' + ' '.join(cmd))
- # TODO maybe dump this into file -> put into device metadata
- try:
- start_time = datetime.now().strftime('%H:%M:%S')
- run_tcpdum(cmd)
- stop_time = datetime.now().strftime('%H:%M:%S')
- except KeyboardInterrupt:
- print("Received keyboard interrupt.")
- exit(ReturnCodes.ABORTED)
- except subprocess.CalledProcessError as e:
- print(f"Failed to capture packet: {e}")
- exit(ReturnCodes.FAILURE)
- except Exception as e:
- print(f"Failed to capture packet: {e}")
- exit(ReturnCodes.FAILURE)
+ assert args.device_root is not None, f"Device root directory is required"
+ device_metadata_file = Path.cwd() / DEVICE_METADATA_FILE
+ device_id = get_device_id_from_file(device_metadata_file)
+ assert device_metadata_file.is_file(), f"Device metadata file '{device_metadata_file} does not exist"
+ capture_dir = setup_capture_folder(args.dev_root)
+ cmd = ['sudo', 'tcpdump', '-i', args.capture_interface]
+ if args.monitor_mode:
+ cmd.append('-I')
+ if args.no_name_resolution:
+ cmd.append('-n')
+ if args.number:
+ cmd.append('-#')
+ if args.print_link_layer:
+ cmd.append('-e')
+ if args.count:
+ cmd.append('-c')
+ cmd.append(str(args.count))
+ elif args.mins:
+ pass
+ print('Executing: ' + ' '.join(cmd))
+ # TODO maybe dump this into file -> put into device metadata
+ try:
+ start_time = datetime.now().strftime('%H:%M:%S')
+ run_tcpdum(cmd)
+ stop_time = datetime.now().strftime('%H:%M:%S')
+ except KeyboardInterrupt:
+ print("Received keyboard interrupt.")
+ exit(ReturnCodes.ABORTED)
+ except subprocess.CalledProcessError as e:
+ print(f"Failed to capture packet: {e}")
+ exit(ReturnCodes.FAILURE)
+ except Exception as e:
+ print(f"Failed to capture packet: {e}")
+ exit(ReturnCodes.FAILURE)
- return ReturnCodes.SUCCESS
- else:
- handle_metadata()
+ return ReturnCodes.SUCCESS
diff --git a/code/iottb/subcommands/init.py b/code/iottb/subcommands/init.py
deleted file mode 100644
index 204e91b..0000000
--- a/code/iottb/subcommands/init.py
+++ /dev/null
@@ -1,40 +0,0 @@
-import pathlib
-
-from iottb.definitions import DEVICE_METADATA_FILE
-from iottb.models.device_metadata_model import DeviceMetadata
-
-
-def setup_init_iottb_root_parser(subparsers):
- parser = subparsers.add_parser("init", aliases=["idr"])
- parser.add_argument("--root_dir", type=pathlib.Path, default=pathlib.Path.cwd())
- group = parser.add_mutually_exclusive_group()
- group.add_argument("--dynamic", action="store_true", help="enable guided setup", default=False)
- group.add_argument("-n", "--name", action="store", type=str, help="name of device")
- parser.set_defaults(func=handle_idr)
-
-
-def handle_idr(args):
- print("Entered iottb initialize-device-root")
- root_dir = args.root_dir
- device_name = None
- if args.dynamic:
- response = "N"
- while response == "N":
- name = input("Please enter name of device: ")
- # TODO extended config for other fields like apps, firmware etc.
- response = input(f"Confirm device name: {name} [y/N]")
- device_name = name
- else:
- device_name = args.name
- root_dir.mkdir(parents=True, exist_ok=True)
- root_dir.chdir()
- dev_metadata_model = DeviceMetadata(device_name)
- file_path = root_dir / device_name / DEVICE_METADATA_FILE
- assert not file_path.exists(), f"{file_path} already exists"
- if args.dynamic:
- response = input(f"Confirm device metadata: {dev_metadata_model.model_dump()} [y/N]")
- if response.lower() != "y":
- assert False, "TODO implement dynamic setup"
- code = dev_metadata_model.save_to_json(file_path)
- print(f"Device metadata saved to {file_path}")
- return code