diff --git a/.idea/workspace.xml b/.idea/workspace.xml
index 4e4bd90..dfa4ef1 100644
--- a/.idea/workspace.xml
+++ b/.idea/workspace.xml
@@ -4,13 +4,13 @@
-
-
-
-
+
+
-
-
+
+
+
+
@@ -28,7 +28,7 @@
@@ -44,27 +44,28 @@
- {
- "keyToString": {
- "ASKED_ADD_EXTERNAL_FILES": "true",
- "ASKED_MARK_IGNORED_FILES_AS_EXCLUDED": "true",
- "ASKED_SHARE_PROJECT_CONFIGURATION_FILES": "true",
- "Python.__init__.executor": "Run",
- "Python.__main__.executor": "Run",
- "Python.main.executor": "Run",
- "RunOnceActivity.ShowReadmeOnStart": "true",
- "SHARE_PROJECT_CONFIGURATION_FILES": "true",
- "git-widget-placeholder": "main",
- "last_opened_file_path": "/home/slnopriv/projects/2024-bsc-sebastian-lenzlinger/code/kydcap/utils/device_metadata_utils.py",
- "node.js.detected.package.eslint": "true",
- "node.js.detected.package.tslint": "true",
- "node.js.selected.package.eslint": "(autodetect)",
- "node.js.selected.package.tslint": "(autodetect)",
- "nodejs_package_manager_path": "npm",
- "settings.editor.selected.configurable": "com.jetbrains.python.configuration.PyActiveSdkModuleConfigurable",
- "vue.rearranger.settings.migration": "true"
+
+}]]>
@@ -98,7 +99,7 @@
-
+
@@ -107,13 +108,13 @@
-
+
-
-
+
+
@@ -127,6 +128,7 @@
+
@@ -150,8 +152,10 @@
-
-
+
+
+
+
@@ -201,15 +205,39 @@
1714617266799
-
+
- 1714622469786
+ 1714823516954
- 1714622469786
+ 1714823516954
-
+
+
+ 1714919098392
+
+
+
+ 1714919098392
+
+
+
+ 1714924463148
+
+
+
+ 1714924463148
+
+
+
+ 1715101138312
+
+
+
+ 1715101138312
+
+
@@ -265,10 +293,15 @@
-
-
+
+
+
+
+
+
+
diff --git a/archive/metadata_utils.py b/archive/metadata_utils.py
index 770fe63..c0cc93e 100644
--- a/archive/metadata_utils.py
+++ b/archive/metadata_utils.py
@@ -3,8 +3,8 @@ from pathlib import Path
from pydantic import BaseModel
-from kydcap.models.device_metadata_model import DeviceMetadata
-from kydcap.config import DEVICE_METADATA_FILE
+from iottb.models.device_metadata_model import DeviceMetadata
+from iottb.definitions import DEVICE_METADATA_FILE
def write_device_metadata_to_file(metadata: DeviceMetadata, device_path: Path):
diff --git a/code/kydcap/__init__.py b/code/iottb/__init__.py
similarity index 100%
rename from code/kydcap/__init__.py
rename to code/iottb/__init__.py
diff --git a/code/kydcap/__main__.py b/code/iottb/__main__.py
similarity index 70%
rename from code/kydcap/__main__.py
rename to code/iottb/__main__.py
index 773ed35..70fe0d6 100644
--- a/code/kydcap/__main__.py
+++ b/code/iottb/__main__.py
@@ -1,10 +1,8 @@
#!/usr/bin/env python3
import argparse
-from kydcap.subcommands.sniff import setup_sniff_parser
-from kydcap.subcommands.initialize_device_root_dir import setup_init_root_dir_parser
-
-CAP_DIR_PREFIX = ...
+from iottb.subcommands.capture import setup_capture_parser
+from iottb.subcommands.add_device import setup_init_device_root_parser
######################
@@ -12,11 +10,11 @@ CAP_DIR_PREFIX = ...
######################
def setup_argparse():
# create top level parser
- root_parser = argparse.ArgumentParser(prog="kydcap")
+ root_parser = argparse.ArgumentParser(prog="iottb")
subparsers = root_parser.add_subparsers(title="subcommands", required=True, dest="command")
- setup_sniff_parser(subparsers)
- setup_init_root_dir_parser(subparsers)
+ setup_capture_parser(subparsers)
+ setup_init_device_root_parser(subparsers)
return root_parser
diff --git a/code/iottb/definitions.py b/code/iottb/definitions.py
new file mode 100644
index 0000000..507e1a0
--- /dev/null
+++ b/code/iottb/definitions.py
@@ -0,0 +1,26 @@
+from datetime import datetime
+from enum import Flag, unique, global_enum
+
+DEVICE_METADATA_FILE = "device_metadata.json"
+CAPTURE_METADATA_FILE = "capture_metadata.json"
+TODAY_DATE_STRING = datetime.now().strftime("%d%b%Y").lower() # TODO convert to function in utils or so
+
+CAPTURE_FOLDER_BASENAME = "capture_###"
+
+AFFIRMATIVE_USER_RESPONSE = {"yes", "y", "true", "Y", "Yes", "YES"}
+NEGATIVE_USER_RESPONSE = {"no", "n", "N", "No"}
+YES_DEFAULT = AFFIRMATIVE_USER_RESPONSE.union({"", " "})
+NO_DEFAULT = NEGATIVE_USER_RESPONSE.union({"", " "})
+
+
+@unique
+@global_enum
+class ReturnCodes(Flag):
+ SUCCESS = 0
+ ABORTED = 1
+ FAILURE = 2
+ UNKNOWN = 3
+ FILE_NOT_FOUND = 4
+ FILE_ALREADY_EXISTS = 5
+ INVALID_ARGUMENT = 6
+ INVALID_ARGUMENT_VALUE = 7
diff --git a/code/iottb/logger.py b/code/iottb/logger.py
new file mode 100644
index 0000000..33f787f
--- /dev/null
+++ b/code/iottb/logger.py
@@ -0,0 +1,28 @@
+import logging
+import sys
+from logging.handlers import RotatingFileHandler
+
+
+def setup_logging():
+ logger_obj = logging.getLogger('iottbLogger')
+ logger_obj.setLevel(logging.INFO)
+
+ file_handler = RotatingFileHandler('iottb.log')
+ console_handler = logging.StreamHandler(sys.stdout)
+
+ file_handler.setLevel(logging.DEBUG)
+ console_handler.setLevel(logging.INFO)
+
+ file_fmt = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+ console_fmt = logging.Formatter('%(name)s - %(levelname)s - %(message)s')
+
+ file_handler.setFormatter(file_fmt)
+ console_handler.setFormatter(console_fmt)
+
+ logger_obj.addHandler(file_handler)
+ logger_obj.addHandler(console_handler)
+
+ return logger_obj
+
+
+logger = setup_logging()
diff --git a/code/kydcap/models/__init__.py b/code/iottb/models/__init__.py
similarity index 100%
rename from code/kydcap/models/__init__.py
rename to code/iottb/models/__init__.py
diff --git a/code/iottb/models/capture_metadata_model.py b/code/iottb/models/capture_metadata_model.py
new file mode 100644
index 0000000..0073679
--- /dev/null
+++ b/code/iottb/models/capture_metadata_model.py
@@ -0,0 +1,157 @@
+import json
+import uuid
+from datetime import datetime
+from pathlib import Path
+from typing import Optional, Any
+from uuid import UUID
+
+from pydantic import BaseModel, Field
+
+from iottb.definitions import ReturnCodes, CAPTURE_METADATA_FILE
+from iottb.models.device_metadata_model import DeviceMetadata
+
+
+class CaptureMetadata(BaseModel):
+ # Required Fields
+ device_metadata: DeviceMetadata = Field(exclude=True)
+ capture_id: uuid.UUID = Field(default_factory=lambda: str(uuid.uuid4()))
+ capture_dir: Path
+ capture_file: str
+ capture_date: str = Field(default_factory=lambda: datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower())
+
+ # Statistics
+ start_time: str
+ stop_time: str
+
+ # tcpdump
+ packet_count: Optional[int]
+ pcap_filter: str = ""
+ tcpdump_command: str = ""
+ interface: str = ""
+
+ # Optional Fields
+ device_ip_address: Optional[str] = "No IP Address set"
+ device_mac_address: Optional[str] = None
+
+ app: Optional[str] = None
+ app_version: Optional[str] = None
+ firmware_version: Optional[str] = None
+
+ def __init__(self, device_metadata: DeviceMetadata, capture_dir: Path, /, **data: Any):
+ super().__init__(**data) # Pycharms orders
+ self.device_metadata = device_metadata
+ self.capture_dir = capture_dir
+ assert capture_dir.is_dir()
+
+ # Getters
+ def get_device_id(self) -> str:
+ return self.device_id
+
+ def get_start_time(self) -> str:
+ return self.start_time
+
+ def get_stop_time(self) -> str:
+ return self.stop_time
+
+ def get_packet_count(self) -> int:
+ return self.packet_count
+
+ def get_pcap_filter(self) -> str:
+ return self.pcap_filter
+
+ def get_device_ip_address(self) -> str:
+ return self.device_ip_address
+
+ def get_device_mac_address(self) -> str:
+ return self.device_mac_address
+
+ def get_app(self) -> str:
+ return self.app
+
+ def get_app_version(self) -> str:
+ return self.app_version
+
+ def get_firmware_version(self) -> str:
+ return self.firmware_version
+
+ def get_capture_id(self) -> UUID:
+ return self.capture_id
+
+ def get_capture_date(self) -> str:
+ return self.capture_date
+
+ def get_capfile_name(self):
+ return self.capture_file
+
+ def get_device_metadata(self) -> DeviceMetadata:
+ return self.device_metadata
+
+ def get_interface(self):
+ return self.interface
+
+ # Setters
+ def set_capture_dir(self, capture_dir: Path):
+ self.capture_dir = capture_dir
+
+ def set_capture_file(self, capture_file: str):
+ self.capture_file = capture_file
+
+ def set_capture_date(self, capture_date: str):
+ self.capture_date = capture_date
+
+ def set_start_time(self, start_time: str):
+ self.start_time = start_time
+
+ def set_stop_time(self, stop_time: str):
+ self.stop_time = stop_time
+
+ def set_packet_count(self, packet_count: int):
+ self.packet_count = packet_count
+
+ def set_pcap_filter(self, pcap_filter: str):
+ self.pcap_filter = pcap_filter
+
+ def set_device_ip_address(self, device_ip_address: str):
+ self.device_ip_address = device_ip_address
+
+ def set_device_mac_address(self, device_mac_address: str):
+ self.device_mac_address = device_mac_address
+
+ def set_app(self, app: str):
+ self.app = app
+
+ def set_app_version(self, app_version: str):
+ self.app_version = app_version
+
+ def set_firmware_version(self, firmware_version: str):
+ self.firmware_version = firmware_version
+ self.device_metadata.set_device_firmware_version(firmware_version)
+
+ def set_interface(self, interface: str):
+ self.interface = interface
+
+ def set_tcpdump_command(self, tcpdump_command: str):
+ self.tcpdump_command = tcpdump_command
+
+ # Other
+
+ def build_capture_file_name(self):
+ prefix = ""
+ if self.app is None:
+ prefix = self.device_metadata.get_device_short_name()
+ else:
+ assert str(self.app).strip() not in {"", " "}, f"app is not a valid name: {self.app}"
+ prefix = self.get_app()
+ # assert self.capture_dir is not None, f"{self.capture_dir} does not exist"
+ filename = f"{prefix}_{str(self.capture_id)}.pcap"
+ self.set_capture_file(filename)
+
+ def save_capture_metadata_to_json(self, file_path: Path = Path(CAPTURE_METADATA_FILE)):
+ assert self.capture_dir.is_dir(), f"capture_dir is not a directory: {self.capture_dir}"
+ if file_path.is_file():
+ print(f"File {file_path} already exists, update instead.")
+ return ReturnCodes.FILE_ALREADY_EXISTS
+ metadata = self.model_dump_json(indent=2, exclude_unset=True, exclude_none=True)
+ with file_path.open('w') as file:
+ json.dump(metadata, file)
+ return ReturnCodes.SUCCESS
diff --git a/code/iottb/models/device_metadata_model.py b/code/iottb/models/device_metadata_model.py
new file mode 100644
index 0000000..bb75188
--- /dev/null
+++ b/code/iottb/models/device_metadata_model.py
@@ -0,0 +1,128 @@
+import json
+import uuid
+from datetime import datetime
+from pathlib import Path
+from typing import Optional, List, Any
+
+# iottb modules
+from iottb.definitions import ReturnCodes, DEVICE_METADATA_FILE
+# 3rd party libs
+from pydantic import BaseModel, Field
+
+IMMUTABLE_FIELDS = {"device_name", "device_short_name", "device_id", "date_created"}
+
+
+class DeviceMetadata(BaseModel):
+ # Required fields
+ device_name: str
+ device_short_name: str
+ device_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
+ date_created: str = Field(default_factory=lambda: datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower())
+
+ device_root_path: Path
+ # Optional Fields
+ aliases: List[str] = Field(default_factory=lambda: [])
+ device_type: Optional[str] = None
+ device_serial_number: Optional[str] = None
+ device_firmware_version: Optional[str] = None
+ date_updated: Optional[str] = None
+
+ capture_files: Optional[List[str]] = []
+
+ def __init__(self, device_name: str, device_root_dir: Path, /, **data: Any):
+ super().__init__(**data)
+ self.device_name = device_name
+ self.device_short_name = device_name.lower().replace(" ", "_")
+ # assert dir_contains_device_metadata(device_root_dir), \
+ # f"Directory {device_root_dir} is missing a {DEVICE_METADATA_FILE} file"
+ self.device_root_dir = device_root_dir
+
+ def get_device_id(self) -> str:
+ return self.device_id
+
+ def get_device_name(self) -> str:
+ return self.device_name
+
+ def get_device_short_name(self) -> str:
+ return self.device_short_name
+
+ def get_device_type(self) -> str:
+ return self.device_type
+
+ def get_device_serial_number(self) -> str:
+ return self.device_serial_number
+
+ def get_device_firmware_version(self) -> str:
+ return self.device_firmware_version
+
+ def get_date_updated(self) -> str:
+ return self.date_updated
+
+ def get_capture_files(self) -> List[str]:
+ return self.capture_files
+
+ def get_aliases(self) -> List[str]:
+ return self.aliases
+
+ def set_device_type(self, device_type: str) -> None:
+ self.device_type = device_type
+ self.date_updated = datetime.now().strftime('%d-%m-%YT%H:%M:%S')
+
+ def set_device_serial_number(self, device_serial_number: str) -> None:
+ self.device_serial_number = device_serial_number
+ self.date_updated = datetime.now().strftime('%d-%m-%YT%H:%M:%S')
+
+ def set_device_firmware_version(self, device_firmware_version: str) -> None:
+ self.device_firmware_version = device_firmware_version
+ self.date_updated = datetime.now().strftime('%d-%m-%YT%H:%M:%S')
+
+ def set_device_name(self, device_name: str) -> None:
+ self.device_name = device_name
+ self.device_short_name = device_name.lower().replace(" ", "_")
+ self.date_updated = datetime.now().strftime('%d-%m-%YT%H:%M:%S')
+
+ @classmethod
+ def load_from_json(cls, device_file_path: Path):
+ assert device_file_path.is_file(), f"{device_file_path} is not a file"
+ assert device_file_path.name == DEVICE_METADATA_FILE, f"{device_file_path} is not a {DEVICE_METADATA_FILE}"
+ device_meta_filename = device_file_path
+ with device_meta_filename.open('r') as file:
+ metadata_json = json.load(file)
+ metadata_model_obj = cls.model_validate_json(metadata_json)
+ return metadata_model_obj
+
+ def save_to_json(self, file_path: Path):
+ if file_path.is_file():
+ print(f"File {file_path} already exists, update instead.")
+ return ReturnCodes.FILE_ALREADY_EXISTS
+ metadata = self.model_dump_json(indent=2)
+ with file_path.open('w') as file:
+ json.dump(metadata, file)
+ return ReturnCodes.SUCCESS
+
+ @classmethod
+ def update_metadata_in_json(cls, file_path: Path, **kwargs):
+ # TODO Maybe not needed at all.
+ assert file_path.is_file()
+ for field in IMMUTABLE_FIELDS:
+ if field in kwargs:
+ print(f"Field {field} is immutable")
+ return ReturnCodes.IMMUTABLE
+ metadata = cls.load_from_json(file_path)
+ for field, value in kwargs.items():
+ if field in metadata.model_fields_set:
+ setattr(metadata, field, value)
+ metadata.date_updated = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
+ pass
+
+
+def dir_contains_device_metadata(dir_path: Path):
+ if not dir_path.is_dir():
+ return False
+ else:
+ meta_file_path = dir_path / DEVICE_METADATA_FILE
+ print(f"Device metadata file path {str(meta_file_path)}")
+ if not meta_file_path.is_file():
+ return False
+ else:
+ return True
diff --git a/code/kydcap/subcommands/__init__.py b/code/iottb/subcommands/__init__.py
similarity index 100%
rename from code/kydcap/subcommands/__init__.py
rename to code/iottb/subcommands/__init__.py
diff --git a/code/iottb/subcommands/add_device.py b/code/iottb/subcommands/add_device.py
new file mode 100644
index 0000000..5afddab
--- /dev/null
+++ b/code/iottb/subcommands/add_device.py
@@ -0,0 +1,59 @@
+import pathlib
+
+from iottb import definitions
+from iottb.definitions import DEVICE_METADATA_FILE, ReturnCodes
+from iottb.models.device_metadata_model import DeviceMetadata
+from iottb.utils.device_metadata_utils import *
+
+
+def setup_init_device_root_parser(subparsers):
+ parser = subparsers.add_parser("add-device", aliases=["add-device-root", "add"])
+ parser.add_argument("--root_dir", type=pathlib.Path, default=pathlib.Path.cwd())
+ group = parser.add_mutually_exclusive_group()
+ group.add_argument("--guided", action="store_true", help="Guided setup", default=False)
+ group.add_argument("--name", action="store", type=str, help="name of device")
+ parser.set_defaults(func=handle_add)
+
+
+def handle_add(args):
+ print("Entered add-device-root")
+
+ if args.guided:
+ metadata = guided_setup(args.root_dir)
+ else:
+ device_name = args.name
+ args.root_dir.mkdir(parents=True, exist_ok=True)
+ args.root_dir.chdir()
+ metadata = DeviceMetadata(device_name, args.root_dir)
+
+ file_path = args.root_dir / DEVICE_METADATA_FILE
+ response = input(f"Confirm device metadata: {metadata.model_dump()} [y/N]")
+ if response.lower() not in definitions.AFFIRMATIVE_USER_RESPONSE.add(""):
+ configure_metadata()
+ assert False, "TODO implement dynamic setup"
+ assert metadata.model_dump() != ""
+ if metadata.save_to_json(file_path) == ReturnCodes.FILE_ALREADY_EXISTS:
+ print("Directory already contains a device metadata file. Aborting operation.")
+ return ReturnCodes.ABORTED
+ assert Path(file_path).exists(), f"{file_path} does not exist"
+ return ReturnCodes.SUCCESS
+
+
+def configure_metadata():
+ pass
+
+
+def guided_setup(device_root) -> DeviceMetadata:
+ response = "N"
+ device_name = ""
+ while response.upper() == "N":
+ device_name = input("Please enter name of device: ")
+ if device_name == "" or device_name is None:
+ print("Name cannot be empty")
+ response = input(f"Confirm device name: {device_name} [y/N] ")
+
+ assert response.lower() in definitions.AFFIRMATIVE_USER_RESPONSE.add(""), f"{response.upper()} not supported"
+ assert device_name != ""
+ assert device_name is not None
+ return DeviceMetadata(device_name, device_root)
+
diff --git a/code/iottb/subcommands/capture.py b/code/iottb/subcommands/capture.py
new file mode 100644
index 0000000..a657493
--- /dev/null
+++ b/code/iottb/subcommands/capture.py
@@ -0,0 +1,170 @@
+import subprocess
+from pathlib import Path
+
+from iottb.definitions import *
+from iottb.models.capture_metadata_model import CaptureMetadata
+from iottb.models.device_metadata_model import DeviceMetadata, dir_contains_device_metadata
+from iottb.utils.capture_utils import get_capture_src_folder, make_capture_src_folder
+
+
+def setup_capture_parser(subparsers):
+ parser = subparsers.add_parser('sniff', help='Sniff packets with tcpdump')
+ # metadata args
+ parser.add_argument("-a", "--ip-address", help="IP address of the device to sniff", dest="device_ip")
+ # tcpdump args
+ parser.add_argument("device_root", help="Root folder for device to sniff",
+ type=Path, default=Path.cwd())
+ parser.add_argument("-s", "--safe", help="Ensure correct device root folder before sniffing", action="store_true")
+ parser.add_argument("--app", help="Application name to sniff", dest="app_name", default=None)
+
+ parser_sniff_tcpdump = parser.add_argument_group('tcpdump arguments')
+ parser_sniff_tcpdump.add_argument("-i", "--interface", help="Interface to capture on.", dest="capture_interface",
+ required=True)
+ parser_sniff_tcpdump.add_argument("-I", "--monitor-mode", help="Put interface into monitor mode",
+ action="store_true")
+ parser_sniff_tcpdump.add_argument("-n", help="Deactivate name resolution. True by default.",
+ action="store_true", dest="no_name_resolution")
+ parser_sniff_tcpdump.add_argument("-#", "--number",
+ help="Print packet number at beginning of line. True by default.",
+ action="store_true")
+ parser_sniff_tcpdump.add_argument("-e", help="Print link layer headers. True by default.",
+ action="store_true", dest="print_link_layer")
+ parser_sniff_tcpdump.add_argument("-t", action="count", default=0,
+ help="Please see tcpdump manual for details. Unused by default.")
+
+ cap_size_group = parser.add_mutually_exclusive_group(required=False)
+ cap_size_group.add_argument("-c", "--count", type=int, help="Number of packets to capture.", default=1000)
+ cap_size_group.add_argument("--mins", type=int, help="Time in minutes to capture.", default=1)
+
+ parser.set_defaults(func=handle_capture)
+
+
+def cwd_is_device_root_dir() -> bool:
+ device_metadata_file = Path.cwd() / DEVICE_METADATA_FILE
+ return device_metadata_file.is_file()
+
+
+def start_guided_device_root_dir_setup():
+ assert False, "Not implemented"
+
+
+def handle_metadata():
+ assert not cwd_is_device_root_dir()
+ print(f"Unable to find {DEVICE_METADATA_FILE} in current working directory")
+ print("You need to setup a device root directory before using this command")
+ response = input("Would you like to be guided through the setup? [y/n]")
+ if response.lower() == "y":
+ start_guided_device_root_dir_setup()
+ else:
+ print("'iottb init-device-root --help' for more information.")
+ exit(ReturnCodes.ABORTED)
+ # device_id = handle_capture_metadata()
+ return ReturnCodes.SUCCESS
+
+
+def get_device_metadata_from_file(device_metadata_filename: Path) -> str:
+ assert device_metadata_filename.is_file(), f"Device metadata file '{device_metadata_filename} does not exist"
+ device_metadata = DeviceMetadata.load_from_json(device_metadata_filename)
+ return device_metadata
+
+
+def run_tcpdump(cmd):
+ # TODO: Maybe specify files for stout and stderr
+ try:
+ p = subprocess.run(cmd, capture_output=True, text=True, check=True)
+ if p.returncode != 0:
+ print(f"Error running tcpdump {p.stderr}")
+ else:
+ print(f"tcpdump run successfully\n: {p.stdout}")
+ except KeyboardInterrupt:
+ pass
+
+
+def handle_capture(args):
+ assert args.device_root is not None, f"Device root directory is required"
+ assert dir_contains_device_metadata(args.device_root), f"Device metadata file '{args.device_root}' does not exist"
+ # get device metadata
+ if args.safe and not dir_contains_device_metadata(args.device_root):
+ print(f"Supplied folder contains no device metadata. "
+ f"Please setup a device root directory before using this command")
+ exit(ReturnCodes.ABORTED)
+ elif dir_contains_device_metadata(args.device_root):
+ device_metadata_filename = args.device_root / DEVICE_METADATA_FILE
+ device_data = DeviceMetadata.load_from_json(device_metadata_filename)
+ else:
+ name = input("Please enter a device name: ")
+ args.device_root.mkdir(parents=True, exist_ok=True)
+ device_data = DeviceMetadata(name, args.device_root)
+ # start constructing environment for capture
+ capture_dir = get_capture_src_folder(args.device_root)
+ make_capture_src_folder(capture_dir)
+ capture_metadata = CaptureMetadata(device_data, capture_dir)
+
+ capture_metadata.set_interface(args.capture_interface)
+ cmd = ['sudo', 'tcpdump', '-i', args.capture_interface]
+ cmd = build_tcpdump_args(args, cmd, capture_metadata)
+ capture_metadata.set_tcpdump_command(cmd)
+
+ print('Executing: ' + ' '.join(cmd))
+
+ # run capture
+ try:
+ start_time = datetime.now().strftime('%H:%M:%S')
+ run_tcpdump(cmd)
+ stop_time = datetime.now().strftime('%H:%M:%S')
+ capture_metadata.set_start_time(start_time)
+ capture_metadata.set_stop_time(stop_time)
+ except KeyboardInterrupt:
+ print("Received keyboard interrupt.")
+ exit(ReturnCodes.ABORTED)
+ except subprocess.CalledProcessError as e:
+ print(f"Failed to capture packet: {e}")
+ exit(ReturnCodes.FAILURE)
+ except Exception as e:
+ print(f"Failed to capture packet: {e}")
+ exit(ReturnCodes.FAILURE)
+
+ return ReturnCodes.SUCCESS
+
+
+def build_tcpdump_args(args, cmd, capture_metadata: CaptureMetadata):
+ if args.monitor_mode:
+ cmd.append('-I')
+ if args.no_name_resolution:
+ cmd.append('-n')
+ if args.number:
+ cmd.append('-#')
+ if args.print_link_layer:
+ cmd.append('-e')
+
+ if args.count:
+ cmd.append('-c')
+ cmd.append(str(args.count))
+ elif args.mins:
+ assert False, "Unimplemented option"
+
+ if args.app_name is not None:
+ capture_metadata.set_app_name(args.app_name)
+
+ capture_metadata.build_capture_file_name()
+ cmd.append('-w')
+ cmd.append(capture_metadata.get_capfile_name())
+
+ if args.safe:
+ cmd.append(f'host {args.device_ip}') # if not specified, filter 'any' implied by tcpdump
+ capture_metadata.set_device_ip_address(args.device_ip)
+
+ return cmd
+
+
+# def capture_file_cmd(args, cmd, capture_dir, capture_metadata: CaptureMetadata):
+# capture_file_prefix = capture_metadata.get_device_metadata().get_device_short_name()
+# if args.app_name is not None:
+# capture_file_prefix = args.app_name
+# capture_metadata.set_app(args.app_name)
+# capfile_name = capture_file_prefix + "_" + str(capture_metadata.get_capture_id()) + ".pcap"
+# capture_metadata.set_capture_file(capfile_name)
+# capfile_abs_path = capture_dir / capfile_name
+# capture_metadata.set_capture_file(capfile_name)
+# cmd.append('-w')
+# cmd.append(str(capfile_abs_path))
diff --git a/code/kydcap/utils/__init__.py b/code/iottb/utils/__init__.py
similarity index 100%
rename from code/kydcap/utils/__init__.py
rename to code/iottb/utils/__init__.py
diff --git a/code/kydcap/utils/capture_metadata_utils.py b/code/iottb/utils/capture_metadata_utils.py
similarity index 93%
rename from code/kydcap/utils/capture_metadata_utils.py
rename to code/iottb/utils/capture_metadata_utils.py
index fa5b93a..2332127 100644
--- a/code/kydcap/utils/capture_metadata_utils.py
+++ b/code/iottb/utils/capture_metadata_utils.py
@@ -1,7 +1,7 @@
import json
from pathlib import Path
-from kydcap.config import ReturnCodes
+from iottb.definitions import ReturnCodes
def set_device_ip_address(ip_addr: str, file_path: Path):
@@ -36,5 +36,3 @@ def set_device_mac_address(mac_addr: str, file_path: Path):
with file_path.open('w') as f:
json.dump(data, f)
return ReturnCodes.SUCCESS
-
-# TODO finnish for other fields in capture metadata
diff --git a/code/iottb/utils/capture_utils.py b/code/iottb/utils/capture_utils.py
new file mode 100644
index 0000000..03b3c2f
--- /dev/null
+++ b/code/iottb/utils/capture_utils.py
@@ -0,0 +1,44 @@
+import uuid
+from pathlib import Path
+from iottb.models.device_metadata_model import dir_contains_device_metadata
+from iottb.utils.utils import get_iso_date
+
+
+def get_capture_uuid():
+ return str(uuid.uuid4())
+
+
+def get_capture_date_folder(device_root: Path):
+ today_iso = get_iso_date()
+ today_folder = device_root / today_iso
+ if dir_contains_device_metadata(device_root):
+ if not today_folder.is_dir():
+ try:
+ today_folder.mkdir()
+ except FileExistsError:
+ print(f"Folder {today_folder} already exists")
+ return today_folder
+ raise FileNotFoundError(f"Given path {device_root} is not a device root directory")
+
+
+def get_capture_src_folder(device_folder: Path):
+ assert device_folder.is_dir(), f"Given path {device_folder} is not a folder"
+ today_iso = get_iso_date()
+ max_sequence_number = 1
+ for d in device_folder.iterdir():
+ if d.is_dir() and d.name.startswith(f'{today_iso}_capture_'):
+ name = d.name
+ num = int(name.split("_")[2])
+ max_sequence_number = max(max_sequence_number, num)
+
+ next_sequence_number = max_sequence_number + 1
+ return device_folder.joinpath(f"{today_iso}_capture_{next_sequence_number:03}")
+
+
+def make_capture_src_folder(capture_src_folder: Path):
+ try:
+ capture_src_folder.mkdir()
+ except FileExistsError:
+ print(f"Folder {capture_src_folder} already exists")
+ finally:
+ return capture_src_folder
diff --git a/code/kydcap/utils/device_metadata_utils.py b/code/iottb/utils/device_metadata_utils.py
similarity index 97%
rename from code/kydcap/utils/device_metadata_utils.py
rename to code/iottb/utils/device_metadata_utils.py
index 380ff3a..9e80728 100644
--- a/code/kydcap/utils/device_metadata_utils.py
+++ b/code/iottb/utils/device_metadata_utils.py
@@ -2,7 +2,7 @@ import json
from datetime import datetime
from pathlib import Path
-from kydcap.config import ReturnCodes
+from iottb.definitions import ReturnCodes
def update_firmware_version(version: str, file_path: Path):
@@ -47,3 +47,5 @@ def update_device_type(device_type: str, file_path: Path):
with file_path.open('w') as file:
json.dump(metadata, file)
return ReturnCodes.SUCCESS
+
+
diff --git a/code/kydcap/utils/utils.py b/code/iottb/utils/tcpdump_utils.py
similarity index 53%
rename from code/kydcap/utils/utils.py
rename to code/iottb/utils/tcpdump_utils.py
index a6d0ff2..c83be66 100644
--- a/code/kydcap/utils/utils.py
+++ b/code/iottb/utils/tcpdump_utils.py
@@ -1,16 +1,17 @@
+import ipaddress
import shutil
import subprocess
+from typing import Optional
-DEPENDENCIES =
-def check_installed(tool) -> bool:
+def check_installed() -> bool:
"""Check if tcpdump is installed and available on the system path."""
- return shutil.which(f'{tool}') is not None
+ return shutil.which('tcpdump') is not None
-def ensure_installed(tool):
+def ensure_installed():
"""Ensure that tcpdump is installed, raise an error if not."""
- if not check_installed(tool):
+ if not check_installed():
raise RuntimeError("tcpdump is not installed. Please install it to continue.")
@@ -25,5 +26,16 @@ def list_interfaces() -> str:
return ""
-def start_tcpdump():
- return None
+def is_valid_ipv4(ip: str) -> bool:
+ try:
+ ipaddress.IPv4Address(ip)
+ return True
+ except ValueError:
+ return False
+
+def str_to_ipv4(ip: str) -> (bool, Optional[ipaddress]):
+ try:
+ address = ipaddress.IPv4Address(ip)
+ return address == ipaddress.IPv4Address(ip), address
+ except ipaddress.AddressValueError:
+ return False, None
diff --git a/code/iottb/utils/utils.py b/code/iottb/utils/utils.py
new file mode 100644
index 0000000..0b0770d
--- /dev/null
+++ b/code/iottb/utils/utils.py
@@ -0,0 +1,18 @@
+import uuid
+from datetime import datetime
+from iottb.definitions import TODAY_DATE_STRING, DEVICE_METADATA_FILE, CAPTURE_METADATA_FILE
+from pathlib import Path
+
+
+def get_iso_date():
+ return datetime.now().strftime('%Y-%m-%d')
+
+
+def subfolder_exists(parent: Path, child: str):
+ return parent.joinpath(child).exists()
+
+
+def generate_unique_string_with_prefix(prefix: str):
+ return prefix + "_" + str(uuid.uuid4())
+
+
diff --git a/code/kydcap/config.py b/code/kydcap/config.py
deleted file mode 100644
index e39c53c..0000000
--- a/code/kydcap/config.py
+++ /dev/null
@@ -1,20 +0,0 @@
-from datetime import datetime
-from enum import Flag, unique, global_enum
-
-
-DEVICE_METADATA_FILE = "device-metadata.json"
-CAPTURE_METADATA_FILE = "capture-metadata.json"
-TODAY_DATE_STRING = datetime.now().strftime("%d%b%Y").lower()
-
-
-@unique
-@global_enum
-class ReturnCodes(Flag):
- SUCCESS = 0
- ABORTED = 1
- FAILURE = 2
- UNKNOWN = 3
- FILE_NOT_FOUND = 4
- FILE_ALREADY_EXISTS = 5
- INVALID_ARGUMENT = 6
- INVALID_ARGUMENT_VALUE = 7
diff --git a/code/kydcap/models/capture_metadata_model.py b/code/kydcap/models/capture_metadata_model.py
deleted file mode 100644
index 6e26975..0000000
--- a/code/kydcap/models/capture_metadata_model.py
+++ /dev/null
@@ -1,47 +0,0 @@
-import json
-import uuid
-from datetime import datetime
-from pathlib import Path
-from typing import Optional, Any
-
-from pydantic import BaseModel, Field
-
-from kydcap.config import ReturnCodes
-
-
-class KydcapCaptureMetadata(BaseModel):
- # Required Fields
- device_id: str
- capture_id: uuid.UUID = Field(default_factory=lambda: str(uuid.uuid4()))
- capture_date: str = Field(default_factory=lambda: datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower())
-
- # Statistics
- start_time: str
- stop_time: str
- packet_count: Optional[int]
-
- # Optional Fields
- device_ip_address: Optional[str] = None
- device_mac_address: Optional[str] = None
-
- app: Optional[str] = None
- app_version: Optional[str] = None
- firmware_version: Optional[str] = None
-
- def __init__(self, device_id: str, start_time: str, stop_time: str, /, **data: Any):
- super().__init__(**data) # Pycharms orders
- assert isinstance(device_id, str)
- assert isinstance(start_time, str)
- assert isinstance(stop_time, str)
- self.device_id = device_id
- self.start_time = start_time
- self.stop_time = stop_time
-
- def save_to_json(self, file_path: Path):
- if file_path.is_file():
- print(f"File {file_path} already exists, update instead.")
- return ReturnCodes.FILE_ALREADY_EXISTS
- metadata = self.model_dump_json(indent=2)
- with file_path.open('w') as file:
- json.dump(metadata, file)
- return ReturnCodes.SUCCESS
diff --git a/code/kydcap/models/device_metadata_model.py b/code/kydcap/models/device_metadata_model.py
deleted file mode 100644
index 173f92f..0000000
--- a/code/kydcap/models/device_metadata_model.py
+++ /dev/null
@@ -1,66 +0,0 @@
-import json
-import uuid
-from datetime import datetime
-from pathlib import Path
-from typing import Optional, List, Any
-
-# kydcap modules
-from kydcap.config import ReturnCodes
-
-# 3rd party libs
-from pydantic import BaseModel, Field
-
-IMMUTABLE_FIELDS = {"device_name", "device_short_name", "device_id", "date_created"}
-
-
-class DeviceMetadata(BaseModel):
- # Required fields
- device_name: str
- device_short_name: str
- device_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
- date_created: str = Field(default_factory=lambda: datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower())
-
- # Optional Fields
- device_type: Optional[str] = None
- device_serial_number: Optional[str] = None
- device_firmware_version: Optional[str] = None
- date_updated: Optional[str] = None
-
- capture_files: Optional[List[str]] = []
-
- def __init__(self, device_name: str, /, **data: Any):
- super().__init__(**data)
- self.device_name = device_name
- self.device_short_name = device_name.lower().replace(" ", "_")
-
- @classmethod
- def load_from_json(cls, file_path: Path):
- assert file_path.is_file()
- with file_path.open('r') as file:
- metadata_json = json.load(file)
- metadata_model_obj = cls.model_validate_json(metadata_json)
- return metadata_model_obj
-
- def save_to_json(self, file_path: Path):
- if file_path.is_file():
- print(f"File {file_path} already exists, update instead.")
- return ReturnCodes.FILE_ALREADY_EXISTS
- metadata = self.model_dump_json(indent=2)
- with file_path.open('w') as file:
- json.dump(metadata, file)
- return ReturnCodes.SUCCESS
-
- @classmethod
- def update_metadata_in_json(cls, file_path: Path, **kwargs):
- # TODO Maybe not needed at all.
- assert file_path.is_file()
- for field in IMMUTABLE_FIELDS:
- if field in kwargs:
- print(f"Field {field} is immutable")
- return ReturnCodes.IMMUTABLE
- metadata = cls.load_from_json(file_path)
- for field, value in kwargs.items():
- if field in metadata.model_fields_set:
- setattr(metadata, field, value)
- metadata.date_updated = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
- pass
diff --git a/code/kydcap/scripts/wifi_ctl.sh b/code/kydcap/scripts/wifi_ctl.sh
deleted file mode 100644
index 076d2fd..0000000
--- a/code/kydcap/scripts/wifi_ctl.sh
+++ /dev/null
@@ -1,55 +0,0 @@
-#!/usr/bin/env bash
-# Note, this is not my original work. Source: https://linuxtldr.com/changing-interface-mode/
-
-function list_nic_info () {
- ip addr show
-}
-
-function enable_monm_iw () {
- interface=$1
- sudo ip link set "$interface" down
- sudo iw "$interface" set monitor control
- sudo ip link set "$interface" up
-}
-
-function disable_monm_iw () {
- interface=$1
- sudo ip link set "$interface" down
- sudo iw "$interface" set type managed
- sudo ip link set "$interface" up
-}
-
-function enable_monm_iwconfig () {
- interface=$1
- sudo ifconfig "$interface" down
- sudo iwconfig "$interface" mode monitor
- sudo ifconfig "$interface" up
-}
-
-function disable_monm_iwconfig () {
- interface=$1
- sudo ifconfig "$interface" down
- sudo iwconfig "$interface" mode managed
- sudo ifconfig "$interface" up
-}
-
-function enable_monm_acng () {
- interface=$1
- sudo airmon-ng check
- sudo airmon-ng check kill
- sudo airmon-ng start "$interface"
-}
-
-function disable_monm_acng () {
- interface="${1}mon"
- sudo airmon-ng stop "$interface"
- sudo systemctl restart NetworkManager
-}
-
-if declare -f "$1" > /dev/null
-then
- "$@"
-else
- echo "Unknown function '$1'" >&2
- exit 1
-fi
\ No newline at end of file
diff --git a/code/kydcap/subcommands/initialize_device_root_dir.py b/code/kydcap/subcommands/initialize_device_root_dir.py
deleted file mode 100644
index 88f4ef3..0000000
--- a/code/kydcap/subcommands/initialize_device_root_dir.py
+++ /dev/null
@@ -1,40 +0,0 @@
-import pathlib
-
-from kydcap.config import DEVICE_METADATA_FILE
-from kydcap.models.device_metadata_model import DeviceMetadata
-
-
-def setup_init_root_dir_parser(subparsers):
- parser = subparsers.add_parser("init-device-root", aliases=["idr"])
- parser.add_argument("--root_dir", type=pathlib.Path, default=pathlib.Path.cwd())
- group = parser.add_mutually_exclusive_group()
- group.add_argument("--dynamic", action="store_true", help="enable guided setup", default=False)
- group.add_argument("-n", "--name", action="store", type=str, help="name of device")
- parser.set_defaults(func=handle_idr)
-
-
-def handle_idr(args):
- print("Entered kydcap initialize-device-root")
- root_dir = args.root_dir
- device_name = None
- if args.dynamic:
- response = "N"
- while response == "N":
- name = input("Please enter name of device: ")
- # TODO extended config for other fields like apps, firmware etc.
- response = input(f"Confirm device name: {name} [y/N]")
- device_name = name
- else:
- device_name = args.name
- root_dir.mkdir(parents=True, exist_ok=True)
- root_dir.chdir()
- dev_metadata_model = DeviceMetadata(device_name)
- file_path = root_dir / device_name / DEVICE_METADATA_FILE
- assert not file_path.exists(), f"{file_path} already exists"
- if args.dynamic:
- response = input(f"Confirm device metadata: {dev_metadata_model.model_dump()} [y/N]")
- if response.lower() != "y":
- assert False, "TODO implement dynamic setup"
- code = dev_metadata_model.save_to_json(file_path)
- print(f"Device metadata saved to {file_path}")
- return code
diff --git a/code/kydcap/subcommands/sniff.py b/code/kydcap/subcommands/sniff.py
deleted file mode 100644
index 87df0a5..0000000
--- a/code/kydcap/subcommands/sniff.py
+++ /dev/null
@@ -1,104 +0,0 @@
-import subprocess
-from pathlib import Path
-
-from kydcap.config import *
-from kydcap.models.device_metadata_model import DeviceMetadata
-
-
-def setup_sniff_parser(subparsers):
- parser = subparsers.add_parser('sniff', help='Sniff packets with tcpdump')
- # metadata args
- parser.add_argument("-a", "--ip-address=", help="IP address of the device to sniff", dest="device_ip")
- # tcpdump args
- parser_sniff_tcpdump = parser.add_argument_group('tcpdump arguments')
- parser_sniff_tcpdump.add_argument("-i", "--interface=", help="Interface to capture on.", dest="capture_interface",
- default="any")
- parser_sniff_tcpdump.add_argument("-I", "--monitor-mode", help="Put interface into monitor mode",
- action="store_true")
- parser_sniff_tcpdump.add_argument("-n", help="Deactivate name resolution. Option is set by default.",
- action="store_true", dest="no_name_resolution")
- parser_sniff_tcpdump.add_argument("-#", "--number",
- help="Print packet number at beginning of line. Set by default.",
- action="store_true")
- parser_sniff_tcpdump.add_argument("-e", help="Print link layer headers. Option is set by default.",
- action="store_true", dest="print_link_layer")
- parser_sniff_tcpdump.add_argument("-t", action="count", default=0,
- help="Please see tcpdump manual for details. Unused by default.")
- # parser_sniff_tcpdump.add_argument("--filter",type=str,default="ip help=f"pcap filter expression. \
- # Defaults is '{default}'")
- # shared args
- cap_size_group = parser.add_mutually_exclusive_group(required=False)
- cap_size_group.add_argument("-c", "--count", type=int, help="Number of packets to capture.", default=0)
- cap_size_group.add_argument("--mins", type=int, help="Time in minutes to capture.", default=60)
- parser.set_defaults(func=handle_sniff)
- # return parser
- # parser.add_default(func=handle_sniff(args=sniff_args))
-
-
-def cwd_is_device_root_dir() -> bool:
- device_metadata_file = Path.cwd() / DEVICE_METADATA_FILE
- return device_metadata_file.exists()
-
-
-def start_guided_device_root_dir_setup():
- assert False, "Not implemented"
-
-
-def handle_metadata():
- assert not cwd_is_device_root_dir()
- print(f"Unable to find {DEVICE_METADATA_FILE} in current working directory")
- print("You need to setup a device root directory before using this command")
- response = input("Would you like to be guided through the setup? [y/n]")
- if response.lower() == "y":
- start_guided_device_root_dir_setup()
- else:
- print("'kydcap init-device-root --help' for more information.")
- exit(ReturnCodes.ABORTED)
- # device_id = handle_capture_metadata()
- return ReturnCodes.SUCCESS
-
-
-def handle_capture_metadata():
- device_metadata_json = Path.cwd() / DEVICE_METADATA_FILE
- device_metadata = DeviceMetadata.load_from_json(device_metadata_json)
- device_id = device_metadata.device_id
- return device_id
-
-
-def handle_sniff(args):
- if not cwd_is_device_root_dir():
- handle_metadata()
- else:
- cmd = ['sudo', 'tcpdump', '-i', args.capture_interface]
- if args.monitor_mode:
- cmd.append('-I')
- if args.no_name_resolution:
- cmd.append('-n')
- if args.number:
- cmd.append('-#')
- if args.print_link_layer:
- cmd.append('-e')
- if args.count:
- cmd.append('-c')
- cmd.append(str(args.count))
- elif args.mins:
- pass
- print('Complete command:' + ' '.join(cmd))
- # TODO maybe dump this into file -> put into device metadata
- # TODO generate pcap filename
- # TODO construct capture metadata file
- try:
- start_time = datetime.now().strftime('%H:%M:%S')
- subprocess.run(cmd)
- stop_time = datetime.now().strftime('%H:%M:%S')
- except KeyboardInterrupt:
- print("Received keyboard interrupt.")
- exit(ReturnCodes.ABORTED)
- except subprocess.CalledProcessError as e:
- print(f"Failed to capture packet: {e}")
- exit(ReturnCodes.FAILURE)
- except Exception as e:
- print(f"Failed to capture packet: {e}")
- exit(ReturnCodes.FAILURE)
-
- return ReturnCodes.SUCCESS
diff --git a/code/kydcap/utils/wifi_ctrl_utils.py b/code/kydcap/utils/wifi_ctrl_utils.py
deleted file mode 100644
index 671d038..0000000
--- a/code/kydcap/utils/wifi_ctrl_utils.py
+++ /dev/null
@@ -1,10 +0,0 @@
-import subprocess
-
-def enable_monitor_mode(interface):
- pass
-
-def disable_monitor_mode(interface):
- pass
-
-def get_ap_channel(interface):
- pass
\ No newline at end of file
diff --git a/code/tests/test_capture_metadata_model.py b/code/tests/test_capture_metadata_model.py
new file mode 100644
index 0000000..632490a
--- /dev/null
+++ b/code/tests/test_capture_metadata_model.py
@@ -0,0 +1,2 @@
+def test_save_to_json():
+ assert False
diff --git a/code/tests/utils/test_capture_metadata_utils.py b/code/tests/utils/test_capture_metadata_utils.py
index 26647c8..71278c3 100644
--- a/code/tests/utils/test_capture_metadata_utils.py
+++ b/code/tests/utils/test_capture_metadata_utils.py
@@ -3,4 +3,4 @@ from pathlib import Path
from unittest.mock import mock_open, patch
import pytest
-from kydcap.utils.capture_metadata_utils import set_device_ip_address
\ No newline at end of file
+from iottb.utils.capture_metadata_utils import set_device_ip_address
\ No newline at end of file