diff --git a/.idea/workspace.xml b/.idea/workspace.xml
index 60695e5..2fbbf7e 100644
--- a/.idea/workspace.xml
+++ b/.idea/workspace.xml
@@ -4,15 +4,19 @@
-
-
+
+
+
-
+
+
+
+
@@ -46,27 +50,27 @@
- {
- "keyToString": {
- "ASKED_ADD_EXTERNAL_FILES": "true",
- "ASKED_MARK_IGNORED_FILES_AS_EXCLUDED": "true",
- "ASKED_SHARE_PROJECT_CONFIGURATION_FILES": "true",
- "Python.__init__.executor": "Run",
- "Python.__main__.executor": "Run",
- "Python.main.executor": "Run",
- "RunOnceActivity.ShowReadmeOnStart": "true",
- "SHARE_PROJECT_CONFIGURATION_FILES": "true",
- "git-widget-placeholder": "cli-dev",
- "last_opened_file_path": "/home/slnopriv/projects/2024-bsc-sebastian-lenzlinger/code/kydcap/utils/device_metadata_utils.py",
- "node.js.detected.package.eslint": "true",
- "node.js.detected.package.tslint": "true",
- "node.js.selected.package.eslint": "(autodetect)",
- "node.js.selected.package.tslint": "(autodetect)",
- "nodejs_package_manager_path": "npm",
- "settings.editor.selected.configurable": "com.jetbrains.python.configuration.PyActiveSdkModuleConfigurable",
- "vue.rearranger.settings.migration": "true"
+
+}]]>
@@ -154,7 +158,8 @@
-
+
+
@@ -220,7 +225,15 @@
1714919098392
-
+
+
+ 1714924463148
+
+
+
+ 1714924463148
+
+
@@ -278,7 +291,8 @@
-
+
+
diff --git a/code/iottb/__main__.py b/code/iottb/__main__.py
index 3ed6dcd..70fe0d6 100644
--- a/code/iottb/__main__.py
+++ b/code/iottb/__main__.py
@@ -4,8 +4,6 @@ import argparse
from iottb.subcommands.capture import setup_capture_parser
from iottb.subcommands.add_device import setup_init_device_root_parser
-CAP_DIR_PREFIX = ...
-
######################
# Argparse setup
diff --git a/code/iottb/cli.py b/code/iottb/cli.py
deleted file mode 100644
index 1e91ffe..0000000
--- a/code/iottb/cli.py
+++ /dev/null
@@ -1,2 +0,0 @@
-import argparse
-from .subcommands.capture import handle_sniff
\ No newline at end of file
diff --git a/code/iottb/definitions.py b/code/iottb/definitions.py
index 202f572..507e1a0 100644
--- a/code/iottb/definitions.py
+++ b/code/iottb/definitions.py
@@ -1,15 +1,18 @@
from datetime import datetime
from enum import Flag, unique, global_enum
-
-DEVICE_METADATA_FILE = "device-metadata.json"
-CAPTURE_METADATA_FILE = "capture-metadata.json"
+DEVICE_METADATA_FILE = "device_metadata.json"
+CAPTURE_METADATA_FILE = "capture_metadata.json"
TODAY_DATE_STRING = datetime.now().strftime("%d%b%Y").lower() # TODO convert to function in utils or so
+CAPTURE_FOLDER_BASENAME = "capture_###"
+
AFFIRMATIVE_USER_RESPONSE = {"yes", "y", "true", "Y", "Yes", "YES"}
NEGATIVE_USER_RESPONSE = {"no", "n", "N", "No"}
YES_DEFAULT = AFFIRMATIVE_USER_RESPONSE.union({"", " "})
NO_DEFAULT = NEGATIVE_USER_RESPONSE.union({"", " "})
+
+
@unique
@global_enum
class ReturnCodes(Flag):
diff --git a/code/iottb/models/capture_metadata_model.py b/code/iottb/models/capture_metadata_model.py
index aa320bf..dabf2e6 100644
--- a/code/iottb/models/capture_metadata_model.py
+++ b/code/iottb/models/capture_metadata_model.py
@@ -3,46 +3,155 @@ import uuid
from datetime import datetime
from pathlib import Path
from typing import Optional, Any
+from uuid import UUID
from pydantic import BaseModel, Field
-from iottb.definitions import ReturnCodes
+from iottb.definitions import ReturnCodes, CAPTURE_METADATA_FILE
+from iottb.models.device_metadata_model import DeviceMetadata
class CaptureMetadata(BaseModel):
# Required Fields
- device_id: str
+ device_metadata = Field(DeviceMetadata, exclude=True)
capture_id: uuid.UUID = Field(default_factory=lambda: str(uuid.uuid4()))
+ capture_dir: Path
+ capture_file: str
capture_date: str = Field(default_factory=lambda: datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower())
# Statistics
start_time: str
stop_time: str
+
+ # tcpdump
packet_count: Optional[int]
pcap_filter: str = ""
+ tcpdump_command: str = ""
+ interface = Field(str, default="")
# Optional Fields
- device_ip_address: Optional[str] = None
+ device_ip_address: Optional[str] = "No IP Address set"
device_mac_address: Optional[str] = None
app: Optional[str] = None
app_version: Optional[str] = None
firmware_version: Optional[str] = None
- def __init__(self, device_id: str, start_time: str, stop_time: str, /, **data: Any):
+ def __init__(self, device_metadata: DeviceMetadata, capture_dir: Path, /, **data: Any):
super().__init__(**data) # Pycharms orders
- assert isinstance(device_id, str)
- assert isinstance(start_time, str)
- assert isinstance(stop_time, str)
- self.device_id = device_id
+ self.device_metadata = device_metadata
+ self.capture_dir = capture_dir
+ assert capture_dir.is_dir()
+
+ # Getters
+ def get_device_id(self) -> str:
+ return self.device_id
+
+ def get_start_time(self) -> str:
+ return self.start_time
+
+ def get_stop_time(self) -> str:
+ return self.stop_time
+
+ def get_packet_count(self) -> int:
+ return self.packet_count
+
+ def get_pcap_filter(self) -> str:
+ return self.pcap_filter
+
+ def get_device_ip_address(self) -> str:
+ return self.device_ip_address
+
+ def get_device_mac_address(self) -> str:
+ return self.device_mac_address
+
+ def get_app(self) -> str:
+ return self.app
+
+ def get_app_version(self) -> str:
+ return self.app_version
+
+ def get_firmware_version(self) -> str:
+ return self.firmware_version
+
+ def get_capture_id(self) -> UUID:
+ return self.capture_id
+
+ def get_capture_date(self) -> str:
+ return self.capture_date
+
+ def get_capfile_name(self):
+ return self.capture_file
+
+ def get_device_metadata(self) -> DeviceMetadata:
+ return self.device_metadata
+
+ def get_interface(self):
+ return self.interface
+
+ # Setters
+ def set_capture_dir(self, capture_dir: Path):
+ self.capture_dir = capture_dir
+
+ def set_capture_file(self, capture_file: str):
+ self.capture_file = capture_file
+
+ def set_capture_date(self, capture_date: str):
+ self.capture_date = capture_date
+
+ def set_start_time(self, start_time: str):
self.start_time = start_time
+
+ def set_stop_time(self, stop_time: str):
self.stop_time = stop_time
- def save_to_json(self, file_path: Path):
+ def set_packet_count(self, packet_count: int):
+ self.packet_count = packet_count
+
+ def set_pcap_filter(self, pcap_filter: str):
+ self.pcap_filter = pcap_filter
+
+ def set_device_ip_address(self, device_ip_address: str):
+ self.device_ip_address = device_ip_address
+
+ def set_device_mac_address(self, device_mac_address: str):
+ self.device_mac_address = device_mac_address
+
+ def set_app(self, app: str):
+ self.app = app
+
+ def set_app_version(self, app_version: str):
+ self.app_version = app_version
+
+ def set_firmware_version(self, firmware_version: str):
+ self.firmware_version = firmware_version
+ self.device_metadata.set_device_firmware_version(firmware_version)
+
+ def set_interface(self, interface: str):
+ self.interface = interface
+
+ def set_tcpdump_command(self, tcpdump_command: str):
+ self.tcpdump_command = tcpdump_command
+
+ # Other
+
+ def build_capture_file_name(self):
+ prefix = ""
+ if self.app is None:
+ prefix = self.device_metadata.get_device_short_name()
+ else:
+ assert str(self.app).strip() not in {"", " "}, f"app is not a valid name: {self.app}"
+ prefix = self.get_app()
+ # assert self.capture_dir is not None, f"{self.capture_dir} does not exist"
+ filename = f"{prefix}_{str(self.capture_id)}.pcap"
+ self.set_capture_file(filename)
+
+ def save_capture_metadata_to_json(self, file_path: Path = Path(CAPTURE_METADATA_FILE)):
+ assert self.capture_dir.is_dir(), f"capture_dir is not a directory: {self.capture_dir}"
if file_path.is_file():
print(f"File {file_path} already exists, update instead.")
return ReturnCodes.FILE_ALREADY_EXISTS
- metadata = self.model_dump_json(indent=2)
+ metadata = self.model_dump_json(indent=2, exclude_unset=True, exclude_none=True)
with file_path.open('w') as file:
json.dump(metadata, file)
return ReturnCodes.SUCCESS
diff --git a/code/iottb/models/device_metadata_model.py b/code/iottb/models/device_metadata_model.py
index 9435d0a..e5d3452 100644
--- a/code/iottb/models/device_metadata_model.py
+++ b/code/iottb/models/device_metadata_model.py
@@ -5,8 +5,7 @@ from pathlib import Path
from typing import Optional, List, Any
# iottb modules
-from iottb.definitions import ReturnCodes
-
+from iottb.definitions import ReturnCodes, DEVICE_METADATA_FILE
# 3rd party libs
from pydantic import BaseModel, Field
@@ -20,6 +19,7 @@ class DeviceMetadata(BaseModel):
device_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
date_created: str = Field(default_factory=lambda: datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower())
+ device_root_path: Path
# Optional Fields
aliases: List[str] = Field(default_factory=lambda: [])
device_type: Optional[str] = None
@@ -29,15 +29,65 @@ class DeviceMetadata(BaseModel):
capture_files: Optional[List[str]] = []
- def __init__(self, device_name: str, /, **data: Any):
+ def __init__(self, device_name: str, device_root_dir: Path, /, **data: Any):
super().__init__(**data)
self.device_name = device_name
self.device_short_name = device_name.lower().replace(" ", "_")
+ assert dir_contains_device_metadata(device_root_dir), \
+ f"Directory {device_root_dir} is missing a {DEVICE_METADATA_FILE} file"
+ self.device_root_dir = device_root_dir
+
+ def get_device_id(self) -> str:
+ return self.device_id
+
+ def get_device_name(self) -> str:
+ return self.device_name
+
+ def get_device_short_name(self) -> str:
+ return self.device_short_name
+
+ def get_device_type(self) -> str:
+ return self.device_type
+
+ def get_device_serial_number(self) -> str:
+ return self.device_serial_number
+
+ def get_device_firmware_version(self) -> str:
+ return self.device_firmware_version
+
+ def get_date_updated(self) -> str:
+ return self.date_updated
+
+ def get_capture_files(self) -> List[str]:
+ return self.capture_files
+
+ def get_aliases(self) -> List[str]:
+ return self.aliases
+
+ def set_device_type(self, device_type: str) -> None:
+ self.device_type = device_type
+ self.date_updated = datetime.now().strftime('%d-%m-%YT%H:%M:%S')
+
+ def set_device_serial_number(self, device_serial_number: str) -> None:
+ self.device_serial_number = device_serial_number
+ self.date_updated = datetime.now().strftime('%d-%m-%YT%H:%M:%S')
+
+ def set_device_firmware_version(self, device_firmware_version: str) -> None:
+ self.device_firmware_version = device_firmware_version
+ self.date_updated = datetime.now().strftime('%d-%m-%YT%H:%M:%S')
+
+ def set_device_name(self, device_name: str) -> None:
+ self.device_name = device_name
+ self.device_short_name = device_name.lower().replace(" ", "_")
+ self.date_updated = datetime.now().strftime('%d-%m-%YT%H:%M:%S')
@classmethod
- def load_from_json(cls, file_path: Path):
- assert file_path.is_file()
- with file_path.open('r') as file:
+ def load_from_json(cls, root_path: Path):
+ assert root_path.is_file()
+ assert root_path.name == DEVICE_METADATA_FILE
+ assert dir_contains_device_metadata(root_path)
+ device_meta_filename = root_path / DEVICE_METADATA_FILE
+ with device_meta_filename.open('r') as file:
metadata_json = json.load(file)
metadata_model_obj = cls.model_validate_json(metadata_json)
return metadata_model_obj
@@ -65,3 +115,14 @@ class DeviceMetadata(BaseModel):
setattr(metadata, field, value)
metadata.date_updated = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
pass
+
+
+def dir_contains_device_metadata(dir_path: Path):
+ if not dir_path.is_dir():
+ return False
+ else:
+ meta_file_path = dir_path / DEVICE_METADATA_FILE
+ if not meta_file_path.is_file():
+ return False
+ else:
+ return True
diff --git a/code/iottb/subcommands/capture.py b/code/iottb/subcommands/capture.py
index 513e1c2..2cf6edc 100644
--- a/code/iottb/subcommands/capture.py
+++ b/code/iottb/subcommands/capture.py
@@ -1,10 +1,10 @@
import subprocess
-import uuid
from pathlib import Path
from iottb.definitions import *
-from iottb.models.device_metadata_model import DeviceMetadata
from iottb.models.capture_metadata_model import CaptureMetadata
+from iottb.models.device_metadata_model import DeviceMetadata, dir_contains_device_metadata
+from iottb.utils.capture_utils import get_capture_src_folder, make_capture_src_folder
def setup_capture_parser(subparsers):
@@ -12,29 +12,31 @@ def setup_capture_parser(subparsers):
# metadata args
parser.add_argument("-a", "--ip-address", help="IP address of the device to sniff", dest="device_ip")
# tcpdump args
+ parser.add_argument("device-root", help="Root folder for device to sniff", dest="device_folder",
+ type=Path, required=True, default=Path.cwd())
+ parser.add_argument("-s", "--safe", help="Ensure correct device root folder before sniffing", action="store_true")
+ parser.add_argument("--app", help="Application name to sniff", dest="app_name", default=None)
+
parser_sniff_tcpdump = parser.add_argument_group('tcpdump arguments')
parser_sniff_tcpdump.add_argument("-i", "--interface", help="Interface to capture on.", dest="capture_interface",
- default="any")
+ required=True)
parser_sniff_tcpdump.add_argument("-I", "--monitor-mode", help="Put interface into monitor mode",
action="store_true")
- parser_sniff_tcpdump.add_argument("-n", help="Deactivate name resolution. Option is set by default.",
+ parser_sniff_tcpdump.add_argument("-n", help="Deactivate name resolution. True by default.",
action="store_true", dest="no_name_resolution")
parser_sniff_tcpdump.add_argument("-#", "--number",
- help="Print packet number at beginning of line. Set by default.",
+ help="Print packet number at beginning of line. True by default.",
action="store_true")
- parser_sniff_tcpdump.add_argument("-e", help="Print link layer headers. Option is set by default.",
+ parser_sniff_tcpdump.add_argument("-e", help="Print link layer headers. True by default.",
action="store_true", dest="print_link_layer")
parser_sniff_tcpdump.add_argument("-t", action="count", default=0,
help="Please see tcpdump manual for details. Unused by default.")
- # parser_sniff_tcpdump.add_argument("--filter",type=str,default="ip help=f"pcap filter expression. \
- # Defaults is '{default}'")
- # shared args
+
cap_size_group = parser.add_mutually_exclusive_group(required=False)
- cap_size_group.add_argument("-c", "--count", type=int, help="Number of packets to capture.", default=0)
- cap_size_group.add_argument("--mins", type=int, help="Time in minutes to capture.", default=60)
+ cap_size_group.add_argument("-c", "--count", type=int, help="Number of packets to capture.", default=1000)
+ cap_size_group.add_argument("--mins", type=int, help="Time in minutes to capture.", default=1)
+
parser.set_defaults(func=handle_capture)
- # return parser
- # parser.add_default(func=handle_sniff(args=sniff_args))
def cwd_is_device_root_dir() -> bool:
@@ -42,9 +44,6 @@ def cwd_is_device_root_dir() -> bool:
return device_metadata_file.is_file()
-def get_user_capture_config():
- pass
-
def start_guided_device_root_dir_setup():
assert False, "Not implemented"
@@ -63,52 +62,58 @@ def handle_metadata():
return ReturnCodes.SUCCESS
-def get_device_id_from_file(device_metadata_filename: Path) -> str:
+def get_device_metadata_from_file(device_metadata_filename: Path) -> str:
assert device_metadata_filename.is_file(), f"Device metadata file '{device_metadata_filename} does not exist"
device_metadata = DeviceMetadata.load_from_json(device_metadata_filename)
- device_id = device_metadata.device_id
- return device_id
+ return device_metadata
-def setup_capture_folder():
- pass
-
-
-def run_tcpdum(cmd):
+def run_tcpdump(cmd):
+ # TODO: Maybe specify files for stout and stderr
try:
- p = subprocess.run(cmd)
+ p = subprocess.run(cmd, capture_output=True, text=True, check=True)
+ if p.returncode != 0:
+ print(p.stderr)
+ else:
+ print(p.stdout)
except KeyboardInterrupt:
pass
-def generate_capfile_name():
- name = datetime.now().strftime("%Y%m%d_%H%M%S") + uuid.uuid4().hex
def handle_capture(args):
assert args.device_root is not None, f"Device root directory is required"
- device_metadata_file = Path.cwd() / DEVICE_METADATA_FILE
- device_id = get_device_id_from_file(device_metadata_file)
- assert device_metadata_file.is_file(), f"Device metadata file '{device_metadata_file} does not exist"
- capture_dir = setup_capture_folder(args.dev_root)
+ assert dir_contains_device_metadata(args.device_root)
+ # get device metadata
+ if args.safe and not dir_contains_device_metadata(args.device_root):
+ print(f"Supplied folder contains no device metadata. "
+ f"Please setup a device root directory before using this command")
+ exit(ReturnCodes.ABORTED)
+ elif dir_contains_device_metadata(args.device_root):
+ device_metadata_filename = args.device_root / DEVICE_METADATA_FILE
+ device_data = DeviceMetadata.load_from_json(device_metadata_filename)
+ else:
+ name = input("Please enter a device name: ")
+ args.device_root.mkdir(parents=True, exist_ok=True)
+ device_data = DeviceMetadata(name, args.device_root)
+ # start constructing environment for capture
+ capture_dir = get_capture_src_folder(args.device_root)
+ make_capture_src_folder(capture_dir)
+ capture_metadata = CaptureMetadata(device_data, capture_dir)
+
+ capture_metadata.set_interface(args.capture_interface)
cmd = ['sudo', 'tcpdump', '-i', args.capture_interface]
- if args.monitor_mode:
- cmd.append('-I')
- if args.no_name_resolution:
- cmd.append('-n')
- if args.number:
- cmd.append('-#')
- if args.print_link_layer:
- cmd.append('-e')
- if args.count:
- cmd.append('-c')
- cmd.append(str(args.count))
- elif args.mins:
- pass
+ cmd = build_tcpdump_args(args, cmd, capture_metadata)
+ capture_metadata.set_tcpdump_command(cmd)
+
print('Executing: ' + ' '.join(cmd))
- # TODO maybe dump this into file -> put into device metadata
+
+ # run capture
try:
start_time = datetime.now().strftime('%H:%M:%S')
- run_tcpdum(cmd)
+ run_tcpdump(cmd)
stop_time = datetime.now().strftime('%H:%M:%S')
+ capture_metadata.set_start_time(start_time)
+ capture_metadata.set_stop_time(stop_time)
except KeyboardInterrupt:
print("Received keyboard interrupt.")
exit(ReturnCodes.ABORTED)
@@ -122,4 +127,44 @@ def handle_capture(args):
return ReturnCodes.SUCCESS
+def build_tcpdump_args(args, cmd, capture_metadata: CaptureMetadata):
+ if args.monitor_mode:
+ cmd.append('-I')
+ if args.no_name_resolution:
+ cmd.append('-n')
+ if args.number:
+ cmd.append('-#')
+ if args.print_link_layer:
+ cmd.append('-e')
+ if args.count:
+ cmd.append('-c')
+ cmd.append(str(args.count))
+ elif args.mins:
+ assert False, "Unimplemented option"
+
+ if args.app_name is not None:
+ capture_metadata.set_app_name(args.app_name)
+
+ capture_metadata.build_capture_file_name()
+ cmd.append('-w')
+ cmd.append(capture_metadata.get_capfile_name())
+
+ if args.safe:
+ cmd.append(f'host {args.device_ip}') # if not specified, filter 'any' implied by tcpdump
+ capture_metadata.set_device_ip_address(args.device_ip)
+
+ return cmd
+
+
+# def capture_file_cmd(args, cmd, capture_dir, capture_metadata: CaptureMetadata):
+# capture_file_prefix = capture_metadata.get_device_metadata().get_device_short_name()
+# if args.app_name is not None:
+# capture_file_prefix = args.app_name
+# capture_metadata.set_app(args.app_name)
+# capfile_name = capture_file_prefix + "_" + str(capture_metadata.get_capture_id()) + ".pcap"
+# capture_metadata.set_capture_file(capfile_name)
+# capfile_abs_path = capture_dir / capfile_name
+# capture_metadata.set_capture_file(capfile_name)
+# cmd.append('-w')
+# cmd.append(str(capfile_abs_path))
diff --git a/code/iottb/subcommands/config.py b/code/iottb/subcommands/config.py
deleted file mode 100644
index e69de29..0000000
diff --git a/code/iottb/utils/capture_metadata_utils.py b/code/iottb/utils/capture_metadata_utils.py
index ae67e69..2332127 100644
--- a/code/iottb/utils/capture_metadata_utils.py
+++ b/code/iottb/utils/capture_metadata_utils.py
@@ -36,5 +36,3 @@ def set_device_mac_address(mac_addr: str, file_path: Path):
with file_path.open('w') as f:
json.dump(data, f)
return ReturnCodes.SUCCESS
-
-# TODO finnish for other fields in capture metadata
diff --git a/code/iottb/utils/capture_utils.py b/code/iottb/utils/capture_utils.py
new file mode 100644
index 0000000..03b3c2f
--- /dev/null
+++ b/code/iottb/utils/capture_utils.py
@@ -0,0 +1,44 @@
+import uuid
+from pathlib import Path
+from iottb.models.device_metadata_model import dir_contains_device_metadata
+from iottb.utils.utils import get_iso_date
+
+
+def get_capture_uuid():
+ return str(uuid.uuid4())
+
+
+def get_capture_date_folder(device_root: Path):
+ today_iso = get_iso_date()
+ today_folder = device_root / today_iso
+ if dir_contains_device_metadata(device_root):
+ if not today_folder.is_dir():
+ try:
+ today_folder.mkdir()
+ except FileExistsError:
+ print(f"Folder {today_folder} already exists")
+ return today_folder
+ raise FileNotFoundError(f"Given path {device_root} is not a device root directory")
+
+
+def get_capture_src_folder(device_folder: Path):
+ assert device_folder.is_dir(), f"Given path {device_folder} is not a folder"
+ today_iso = get_iso_date()
+ max_sequence_number = 1
+ for d in device_folder.iterdir():
+ if d.is_dir() and d.name.startswith(f'{today_iso}_capture_'):
+ name = d.name
+ num = int(name.split("_")[2])
+ max_sequence_number = max(max_sequence_number, num)
+
+ next_sequence_number = max_sequence_number + 1
+ return device_folder.joinpath(f"{today_iso}_capture_{next_sequence_number:03}")
+
+
+def make_capture_src_folder(capture_src_folder: Path):
+ try:
+ capture_src_folder.mkdir()
+ except FileExistsError:
+ print(f"Folder {capture_src_folder} already exists")
+ finally:
+ return capture_src_folder
diff --git a/code/iottb/utils/device_metadata_utils.py b/code/iottb/utils/device_metadata_utils.py
index 61f3e6c..9e80728 100644
--- a/code/iottb/utils/device_metadata_utils.py
+++ b/code/iottb/utils/device_metadata_utils.py
@@ -47,3 +47,5 @@ def update_device_type(device_type: str, file_path: Path):
with file_path.open('w') as file:
json.dump(metadata, file)
return ReturnCodes.SUCCESS
+
+
diff --git a/code/iottb/utils/tcpdump_utils.py b/code/iottb/utils/tcpdump_utils.py
index 08bfb7e..c83be66 100644
--- a/code/iottb/utils/tcpdump_utils.py
+++ b/code/iottb/utils/tcpdump_utils.py
@@ -1,5 +1,7 @@
+import ipaddress
import shutil
import subprocess
+from typing import Optional
def check_installed() -> bool:
@@ -24,5 +26,16 @@ def list_interfaces() -> str:
return ""
-def start_tcpdump():
- return None
+def is_valid_ipv4(ip: str) -> bool:
+ try:
+ ipaddress.IPv4Address(ip)
+ return True
+ except ValueError:
+ return False
+
+def str_to_ipv4(ip: str) -> (bool, Optional[ipaddress]):
+ try:
+ address = ipaddress.IPv4Address(ip)
+ return address == ipaddress.IPv4Address(ip), address
+ except ipaddress.AddressValueError:
+ return False, None
diff --git a/code/iottb/utils/utils.py b/code/iottb/utils/utils.py
new file mode 100644
index 0000000..0b0770d
--- /dev/null
+++ b/code/iottb/utils/utils.py
@@ -0,0 +1,18 @@
+import uuid
+from datetime import datetime
+from iottb.definitions import TODAY_DATE_STRING, DEVICE_METADATA_FILE, CAPTURE_METADATA_FILE
+from pathlib import Path
+
+
+def get_iso_date():
+ return datetime.now().strftime('%Y-%m-%d')
+
+
+def subfolder_exists(parent: Path, child: str):
+ return parent.joinpath(child).exists()
+
+
+def generate_unique_string_with_prefix(prefix: str):
+ return prefix + "_" + str(uuid.uuid4())
+
+