diff --git a/.idea/2024-bsc-sebastian-lenzlinger.iml b/.idea/2024-bsc-sebastian-lenzlinger.iml deleted file mode 100644 index aad402c..0000000 --- a/.idea/2024-bsc-sebastian-lenzlinger.iml +++ /dev/null @@ -1,10 +0,0 @@ - - - - - - - \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml deleted file mode 100644 index 0ea184e..0000000 --- a/.idea/misc.xml +++ /dev/null @@ -1,7 +0,0 @@ - - - - - - \ No newline at end of file diff --git a/.idea/webResources.xml b/.idea/webResources.xml deleted file mode 100644 index aa647dc..0000000 --- a/.idea/webResources.xml +++ /dev/null @@ -1,14 +0,0 @@ - - - - - - - - - - - - - - \ No newline at end of file diff --git a/code/.gitkeep b/archive/.gitkeep similarity index 100% rename from code/.gitkeep rename to archive/.gitkeep diff --git a/code/iottb/subcommands/__init__.py b/archive/__init__.py similarity index 100% rename from code/iottb/subcommands/__init__.py rename to archive/__init__.py diff --git a/archive/functions_dump.py b/archive/functions_dump.py index 4d94ad9..1639e2a 100644 --- a/archive/functions_dump.py +++ b/archive/functions_dump.py @@ -30,3 +30,30 @@ def setup_sniff_parser(subparsers): def setup_pcap_filter_parser(parser_sniff): parser_pcap_filter = parser_sniff.add_argument_parser('pcap-filter expression') pass + +def check_iottb_env(): + # This makes the option '--root-dir' obsolescent # TODO How to streamline this?\ + try: + iottb_home = environ['IOTTB_HOME'] # TODO WARN implicit declaration of env var name! + except KeyError: + logger.error(f"Environment variable 'IOTTB_HOME' is not set." + f"Setting environment variable 'IOTTB_HOME' to '~/{IOTTB_HOME_ABS}'") + environ['IOTTB_HOME'] = IOTTB_HOME_ABS + finally: + if not Path(IOTTB_HOME_ABS).exists(): + print(f'"{IOTTB_HOME_ABS}" does not exist.') + response = input('Do you want to create it now? [y/N]') + logger.debug(f'response: {response}') + if response.lower() != 'y': + logger.debug(f'Not setting "IOTTB_HOME"') + print('TODO') + print("Aborting execution...") + return ReturnCodes.ABORTED + else: + print(f'Setting environment variable IOTTB_HOME""') + Path(IOTTB_HOME_ABS).mkdir(parents=True, + exist_ok=False) # Should always work since in 'not exist' code path + return ReturnCodes.SUCCESS + logger.info(f'"{IOTTB_HOME_ABS}" exists.') + # TODO: Check that it is a valid iottb dir or can we say it is valid by definition if? + return ReturnCodes.SUCCESS diff --git a/code/tests/__init__.py b/archive/iottb/__init__.py similarity index 100% rename from code/tests/__init__.py rename to archive/iottb/__init__.py diff --git a/archive/iottb/__main__.py b/archive/iottb/__main__.py new file mode 100644 index 0000000..208076e --- /dev/null +++ b/archive/iottb/__main__.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python3 +import argparse +from os import environ +from pathlib import Path +import logging +from archive.iottb.subcommands.add_device import setup_init_device_root_parser +# from iottb.subcommands.capture import setup_capture_parser +from iottb.subcommands.sniff import setup_sniff_parser +from iottb.utils.tcpdump_utils import list_interfaces +from iottb.logger import setup_logging + +logger = logging.getLogger('iottbLogger.__main__') +logger.setLevel(logging.DEBUG) + + +###################### +# Argparse setup +###################### +def setup_argparse(): + # create top level parser + root_parser = argparse.ArgumentParser(prog='iottb') + # shared options + root_parser.add_argument('--verbose', '-v', action='count', default=0) + root_parser.add_argument('--script-mode', action='store_true', help='Run in script mode (non-interactive)') + # Group of args w.r.t iottb.db creation + group = root_parser.add_argument_group('database options') + group.add_argument('--db-home', default=Path.home() / 'IoTtb.db') + group.add_argument('--config-home', default=Path.home() / '.config' / 'iottb.conf', type=Path, ) + group.add_argument('--user', default=Path.home().stem, type=Path, ) + + # configure subcommands + subparsers = root_parser.add_subparsers(title='subcommands', required=True, dest='command') + # setup_capture_parser(subparsers) + setup_init_device_root_parser(subparsers) + setup_sniff_parser(subparsers) + # Utility to list interfaces directly with iottb instead of relying on external tooling + + interfaces_parser = subparsers.add_parser('list-interfaces', aliases=['li', 'if'], + help='List available network interfaces.') + interfaces_parser.set_defaults(func=list_interfaces) + + return root_parser + + +### +# Where put ?! +### +class IoTdb: + def __init__(self, db_home=Path.home() / 'IoTtb.db', iottb_config=Path.home() / '.conf' / 'iottb.conf', + user=Path.home().stem): + self.db_home = db_home + self.config_home = iottb_config + self.default_filters_home = db_home / 'default_filters' + self.user = user + + def create_db(self, mode=0o777, parents=False, exist_ok=False): + logger.info(f'Creating db at {self.db_home}') + try: + self.db_home.mkdir(mode=mode, parents=parents, exist_ok=exist_ok) + except FileExistsError: + logger.error(f'Database path already at {self.db_home} exists and is not a directory') + finally: + logger.debug(f'Leaving finally clause in create_db') + + def create_device_tree(self, mode=0o777, parents=False, exist_ok=False): + logger.info(f'Creating device tree at {self.db_home / 'devices'}') + #TODO + + def parse_db_config(self): + pass + + def parse_iottb_config(self): + pass + + def get_known_devices(self): + pass + + +def iottb_db_exists(db_home=Path.home() / 'IoTtb.db'): + res = db_home.is_dir() + + +def main(): + logger.debug(f'Pre setup_argparse()') + parser = setup_argparse() + logger.debug('Post setup_argparse().') + args = parser.parse_args() + logger.debug(f'Args parsed: {args}') + if args.command: + try: + args.func(args) + except KeyboardInterrupt: + print('Received keyboard interrupt. Exiting...') + exit(1) + except Exception as e: + logger.debug(f'Error in main: {e}') + print(f'Error: {e}') + # create_capture_directory(args.device_name) + + +if __name__ == '__main__': + setup_logging() + logger.debug("Debug level is working") + logger.info("Info level is working") + logger.warning("Warning level is working") + + main() diff --git a/code/iottb/definitions.py b/archive/iottb/definitions.py similarity index 100% rename from code/iottb/definitions.py rename to archive/iottb/definitions.py diff --git a/archive/iottb/logger.py b/archive/iottb/logger.py new file mode 100644 index 0000000..cc0cdb5 --- /dev/null +++ b/archive/iottb/logger.py @@ -0,0 +1,35 @@ +import logging +import sys +import os +from logging.handlers import RotatingFileHandler + + +def setup_logging(): + # Ensure the logs directory exists + log_directory = 'logs' + if not os.path.exists(log_directory): + os.makedirs(log_directory) + + # Create handlers + file_handler = RotatingFileHandler(os.path.join(log_directory, 'iottb.log'), maxBytes=1048576, backupCount=5) + console_handler = logging.StreamHandler(sys.stdout) + + # Create formatters and add it to handlers + file_fmt = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + console_fmt = logging.Formatter( + '%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s') + + file_handler.setFormatter(file_fmt) + console_handler.setFormatter(console_fmt) + + # Get the root logger and add handlers + root_logger = logging.getLogger() + root_logger.setLevel(logging.DEBUG) + root_logger.addHandler(file_handler) + root_logger.addHandler(console_handler) + + # Prevent propagation to the root logger to avoid duplicate logs + root_logger.propagate = False + + +setup_logging() diff --git a/code/tests/fixtures/__init__.py b/archive/iottb/models/__init__.py similarity index 100% rename from code/tests/fixtures/__init__.py rename to archive/iottb/models/__init__.py diff --git a/code/iottb/models/capture_metadata_model.py b/archive/iottb/models/capture_metadata_model.py similarity index 91% rename from code/iottb/models/capture_metadata_model.py rename to archive/iottb/models/capture_metadata_model.py index 4213212..9ddec56 100644 --- a/code/iottb/models/capture_metadata_model.py +++ b/archive/iottb/models/capture_metadata_model.py @@ -6,17 +6,20 @@ from typing import Optional from iottb.definitions import ReturnCodes, CAPTURE_METADATA_FILE from iottb.models.device_metadata_model import DeviceMetadata -from iottb.logger import logger +import logging + +logger = logging.getLogger('iottbLogger.capture_metadata_model') +logger.setLevel(logging.DEBUG) class CaptureMetadata: # Required Fields device_metadata: DeviceMetadata - capture_id: str = lambda: str(uuid.uuid4()) + device_id: str capture_dir: Path capture_file: str - capture_date: str = lambda: datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower() + # Statistics start_time: str @@ -39,7 +42,8 @@ class CaptureMetadata: def __init__(self, device_metadata: DeviceMetadata, capture_dir: Path): logger.info(f'Creating CaptureMetadata model from DeviceMetadata: {device_metadata}') self.device_metadata = device_metadata - + self.capture_id = str(uuid.uuid4()) + self.capture_date = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower() self.capture_dir = capture_dir assert capture_dir.is_dir(), f'Capture directory {capture_dir} does not exist' @@ -47,7 +51,7 @@ class CaptureMetadata: logger.info(f'Building capture file name') if self.app is None: logger.debug(f'No app specified') - prefix = self.device_metadata.device_short_name + prefix = "iphone-14" #self.device_metadata.device_short_name else: logger.debug(f'App specified: {self.app}') assert str(self.app).strip() not in {'', ' '}, f'app is not a valid name: {self.app}' diff --git a/code/iottb/models/device_metadata_model.py b/archive/iottb/models/device_metadata_model.py similarity index 97% rename from code/iottb/models/device_metadata_model.py rename to archive/iottb/models/device_metadata_model.py index 359aa96..b0ea063 100644 --- a/code/iottb/models/device_metadata_model.py +++ b/archive/iottb/models/device_metadata_model.py @@ -6,7 +6,10 @@ from typing import Optional, List # iottb modules from iottb.definitions import ReturnCodes, DEVICE_METADATA_FILE -from iottb.logger import logger +import logging + +logger = logging.getLogger('iottbLogger.device_metadata_model') +logger.setLevel(logging.DEBUG) # 3rd party libs IMMUTABLE_FIELDS = {'device_name', 'device_short_name', 'device_id', 'date_created'} diff --git a/code/tests/models/test_capture_metadata_model.py b/archive/iottb/subcommands/__init__.py similarity index 100% rename from code/tests/models/test_capture_metadata_model.py rename to archive/iottb/subcommands/__init__.py diff --git a/code/iottb/subcommands/add_device.py b/archive/iottb/subcommands/add_device.py similarity index 77% rename from code/iottb/subcommands/add_device.py rename to archive/iottb/subcommands/add_device.py index d3fb9f8..425bb16 100644 --- a/code/iottb/subcommands/add_device.py +++ b/archive/iottb/subcommands/add_device.py @@ -1,18 +1,21 @@ import logging +import os import pathlib from iottb import definitions from iottb.definitions import DEVICE_METADATA_FILE, ReturnCodes -from iottb.logger import logger from iottb.models.device_metadata_model import DeviceMetadata -logger.setLevel(logging.INFO) # Since module currently passes all tests - +# logger.setLevel(logging.INFO) # Since module currently passes all tests +logger = logging.getLogger('iottbLogger.add_device') +logger.setLevel(logging.INFO) def setup_init_device_root_parser(subparsers): + #assert os.environ['IOTTB_HOME'] is not None, "IOTTB_HOME environment variable is not set" parser = subparsers.add_parser('add-device', aliases=['add-device-root', 'add'], help='Initialize a folder for a device.') - parser.add_argument('--root_dir', type=pathlib.Path, default=pathlib.Path.cwd()) + parser.add_argument('--root_dir', type=pathlib.Path, + default=definitions.IOTTB_HOME_ABS) # TODO: Refactor code to not use this or handle iottb here group = parser.add_mutually_exclusive_group() group.add_argument('--guided', action='store_true', help='Guided setup', default=False) group.add_argument('--name', action='store', type=str, help='name of device') @@ -20,14 +23,12 @@ def setup_init_device_root_parser(subparsers): def handle_add(args): + # TODO: This whole function should be refactored into using the fact that IOTTB_HOME is set, and the dir exists logger.info(f'Add device handler called with args {args}') - args.root_dir.mkdir(parents=True, - exist_ok=True) # else metadata.save_to_file will fail TODO: unclear what to assume - if args.guided: logger.debug('begin guided setup') - metadata = guided_setup(args.root_dir) + metadata = guided_setup(args.root_dir) # TODO refactor to use IOTTB_HOME logger.debug('guided setup complete') else: logger.debug('Setup through passed args: setup') @@ -36,7 +37,7 @@ def handle_add(args): return ReturnCodes.ERROR metadata = DeviceMetadata(args.name, args.root_dir) - file_path = args.root_dir / DEVICE_METADATA_FILE + file_path = args.root_dir / DEVICE_METADATA_FILE # TODO IOTTB_HOME REFACTOR if file_path.exists(): print('Directory already contains a metadata file. Aborting.') return ReturnCodes.ABORTED diff --git a/code/iottb/subcommands/capture.py b/archive/iottb/subcommands/capture.py similarity index 96% rename from code/iottb/subcommands/capture.py rename to archive/iottb/subcommands/capture.py index a67b5e8..46ff6e1 100644 --- a/code/iottb/subcommands/capture.py +++ b/archive/iottb/subcommands/capture.py @@ -2,11 +2,15 @@ import subprocess from pathlib import Path from iottb.definitions import * +import logging from iottb.models.capture_metadata_model import CaptureMetadata from iottb.models.device_metadata_model import DeviceMetadata, dir_contains_device_metadata from iottb.utils.capture_utils import get_capture_src_folder, make_capture_src_folder from iottb.utils.tcpdump_utils import check_installed +logger = logging.getLogger('iottbLogger.capture') +logger.setLevel(logging.DEBUG) + def setup_capture_parser(subparsers): parser = subparsers.add_parser('sniff', help='Sniff packets with tcpdump') # metadata args @@ -33,7 +37,7 @@ def setup_capture_parser(subparsers): help='Please see tcpdump manual for details. Unused by default.') cap_size_group = parser.add_mutually_exclusive_group(required=False) - cap_size_group.add_argument('-c', '--count', type=int, help='Number of packets to capture.', default=1000) + cap_size_group.add_argument('-c', '--count', type=int, help='Number of packets to capture.', default=10) cap_size_group.add_argument('--mins', type=int, help='Time in minutes to capture.', default=1) parser.set_defaults(func=handle_capture) @@ -88,6 +92,7 @@ def handle_capture(args): assert args.device_root is not None, f'Device root directory is required' assert dir_contains_device_metadata(args.device_root), f'Device metadata file \'{args.device_root}\' does not exist' # get device metadata + logger.info(f'Device root directory: {args.device_root}') if args.safe and not dir_contains_device_metadata(args.device_root): print(f'Supplied folder contains no device metadata. ' f'Please setup a device root directory before using this command') @@ -98,6 +103,7 @@ def handle_capture(args): else: name = input('Please enter a device name: ') args.device_root.mkdir(parents=True, exist_ok=True) + device_data = DeviceMetadata(name, args.device_root) # start constructing environment for capture capture_dir = get_capture_src_folder(args.device_root) @@ -152,7 +158,7 @@ def build_tcpdump_args(args, cmd, capture_metadata: CaptureMetadata): capture_metadata.build_capture_file_name() cmd.append('-w') - cmd.append(capture_metadata.capture_file) + cmd.append(str(capture_metadata.capture_dir) + "/" + capture_metadata.capture_file) if args.safe: cmd.append(f'host {args.device_ip}') # if not specified, filter 'any' implied by tcpdump @@ -160,7 +166,6 @@ def build_tcpdump_args(args, cmd, capture_metadata: CaptureMetadata): return cmd - # def capture_file_cmd(args, cmd, capture_dir, capture_metadata: CaptureMetadata): # capture_file_prefix = capture_metadata.get_device_metadata().get_device_short_name() # if args.app_name is not None: diff --git a/archive/iottb/subcommands/sniff.py b/archive/iottb/subcommands/sniff.py new file mode 100644 index 0000000..554a9c7 --- /dev/null +++ b/archive/iottb/subcommands/sniff.py @@ -0,0 +1,63 @@ +import subprocess +import logging + + +logger = logging.getLogger('iottbLogger.capture') +logger.setLevel(logging.DEBUG) +class Sniffer: + def __init__(self): + pass + + +def setup_sniff_parser(subparsers): + parser = subparsers.add_parser('sniff', help='Sniff packets with tcpdump') + # metadata args + parser.add_argument('-a', '--addr', help='IP or MAC address of IoT device') + # tcpdump args + parser.add_argument('--app', help='Application name to sniff', default=None) + + parser_sniff_tcpdump = parser.add_argument_group('tcpdump arguments') + + parser_sniff_tcpdump.add_argument('-i', '--interface', help='Interface to capture on.', dest='capture_interface', + required=True) + parser_sniff_tcpdump.add_argument('-I', '--monitor-mode', help='Put interface into monitor mode', + action='store_true') + parser_sniff_tcpdump.add_argument('-n', help='Deactivate name resolution. True by default.', + action='store_true', dest='no_name_resolution') + parser_sniff_tcpdump.add_argument('-#', '--number', + help='Print packet number at beginning of line. True by default.', + action='store_true') + parser_sniff_tcpdump.add_argument('-e', help='Print link layer headers. True by default.', + action='store_true', dest='print_link_layer') + parser_sniff_tcpdump.add_argument('-t', action='count', default=0, + help='Please see tcpdump manual for details. Unused by default.') + + cap_size_group = parser.add_mutually_exclusive_group(required=False) + cap_size_group.add_argument('-c', '--count', type=int, help='Number of packets to capture.', default=10) + cap_size_group.add_argument('--mins', type=int, help='Time in minutes to capture.', default=1) + + parser.set_defaults(func=sniff) + + +def parse_addr(addr): + #TODO Implement + pass + + +def sniff(args): + if args.addr is None: + print('You must supply either a MAC or IP(v4) address to use this tool!') + logger.info("Exiting on account of missing MAC/IP.") + exit(1) + else: + (type, value) = parse_addr(args.addr) + #TODO Get this party started + +def sniff_tcpdump(args, filter): + pass + +def sniff_mitmproxy(args, filter): + pass + +def sniff_raw(cmd,args): + pass \ No newline at end of file diff --git a/code/tests/models/test_device_metadata_model.py b/archive/iottb/utils/__init__.py similarity index 100% rename from code/tests/models/test_device_metadata_model.py rename to archive/iottb/utils/__init__.py diff --git a/archive/iottb/utils/capture_utils.py b/archive/iottb/utils/capture_utils.py new file mode 100644 index 0000000..8c4d60f --- /dev/null +++ b/archive/iottb/utils/capture_utils.py @@ -0,0 +1,44 @@ +import uuid +from pathlib import Path +from iottb.models.device_metadata_model import dir_contains_device_metadata +from iottb.utils.utils import get_iso_date + + +def get_capture_uuid(): + return str(uuid.uuid4()) + + +def get_capture_date_folder(device_root: Path): + today_iso = get_iso_date() + today_folder = device_root / today_iso + if dir_contains_device_metadata(device_root): + if not today_folder.is_dir(): + try: + today_folder.mkdir() + except FileExistsError: + print(f'Folder {today_folder} already exists') + return today_folder + raise FileNotFoundError(f'Given path {device_root} is not a device root directory') + + +def get_capture_src_folder(device_folder: Path): + assert device_folder.is_dir(), f'Given path {device_folder} is not a folder' + today_iso = get_iso_date() + max_sequence_number = 1 + for d in device_folder.iterdir(): + if d.is_dir() and d.name.startswith(f'{today_iso}_capture_'): + name = d.name + num = int(name.split('_')[2]) + max_sequence_number = max(max_sequence_number, num) + + next_sequence_number = max_sequence_number + 1 + return device_folder.joinpath(f'{today_iso}_capture_{next_sequence_number:03}') + + +def make_capture_src_folder(capture_src_folder: Path): + try: + capture_src_folder.mkdir() + except FileExistsError: + print(f'Folder {capture_src_folder} already exists') + finally: + return capture_src_folder diff --git a/archive/iottb/utils/tcpdump_utils.py b/archive/iottb/utils/tcpdump_utils.py new file mode 100644 index 0000000..6870202 --- /dev/null +++ b/archive/iottb/utils/tcpdump_utils.py @@ -0,0 +1,41 @@ +import ipaddress +import shutil +import subprocess +from typing import Optional + + +def check_installed() -> bool: + """Check if tcpdump is installed and available on the system path.""" + return shutil.which('tcpdump') is not None + + +def ensure_installed(): + """Ensure that tcpdump is installed, raise an error if not.""" + if not check_installed(): + raise RuntimeError('tcpdump is not installed. Please install it to continue.') + + +def list_interfaces(args) -> str: + """List available network interfaces using tcpdump.""" + ensure_installed() + try: + result = subprocess.run(['tcpdump', '--list-interfaces'], capture_output=True, text=True, check=True) + print(result.stdout) + except subprocess.CalledProcessError as e: + print(f'Failed to list interfaces: {e}') + return '' + + +def is_valid_ipv4(ip: str) -> bool: + try: + ipaddress.IPv4Address(ip) + return True + except ValueError: + return False + +def str_to_ipv4(ip: str) -> (bool, Optional[ipaddress]): + try: + address = ipaddress.IPv4Address(ip) + return address == ipaddress.IPv4Address(ip), address + except ipaddress.AddressValueError: + return False, None diff --git a/code/iottb/utils/utils.py b/archive/iottb/utils/utils.py similarity index 100% rename from code/iottb/utils/utils.py rename to archive/iottb/utils/utils.py diff --git a/code/misc/dnsmasq.conf b/archive/misc/dnsmasq.conf similarity index 100% rename from code/misc/dnsmasq.conf rename to archive/misc/dnsmasq.conf diff --git a/code/misc/enable-forwarding.sh b/archive/misc/enable-forwarding.sh similarity index 100% rename from code/misc/enable-forwarding.sh rename to archive/misc/enable-forwarding.sh diff --git a/code/misc/hostapd.conf b/archive/misc/hostapd.conf similarity index 100% rename from code/misc/hostapd.conf rename to archive/misc/hostapd.conf diff --git a/code/misc/hostapd.conf.bak b/archive/misc/hostapd.conf.bak similarity index 100% rename from code/misc/hostapd.conf.bak rename to archive/misc/hostapd.conf.bak diff --git a/code/misc/initSwAP b/archive/misc/initSwAP similarity index 100% rename from code/misc/initSwAP rename to archive/misc/initSwAP diff --git a/code/misc/initSwAP_nftables b/archive/misc/initSwAP_nftables similarity index 100% rename from code/misc/initSwAP_nftables rename to archive/misc/initSwAP_nftables diff --git a/code/misc/make_ap.sh b/archive/misc/make_ap.sh similarity index 100% rename from code/misc/make_ap.sh rename to archive/misc/make_ap.sh diff --git a/archive/pyproject.toml b/archive/pyproject.toml new file mode 100644 index 0000000..1261bc7 --- /dev/null +++ b/archive/pyproject.toml @@ -0,0 +1,16 @@ +[build-system] +requires = ["setuptools>=42", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = 'iottb' +version = '0.1.0' +authors = [{name = "Sebastian Lenzlinger", email = "sebastian.lenzlinger@unibas.ch"}] +description = "Automation Tool for Capturing Network packets of IoT devices." +requires-python = ">=3.8" + +[tool.setuptools] +packages = ["iottb"] + +[project.scripts] +iottb = "iottb.__main__:main" \ No newline at end of file diff --git a/code/tests/test_main.py b/archive/tests/__init__.py similarity index 100% rename from code/tests/test_main.py rename to archive/tests/__init__.py diff --git a/code/tests/utils/test_device_metadata_utils.py b/archive/tests/fixtures/__init__.py similarity index 100% rename from code/tests/utils/test_device_metadata_utils.py rename to archive/tests/fixtures/__init__.py diff --git a/code/tests/fixtures/shared_fixtures.py b/archive/tests/fixtures/shared_fixtures.py similarity index 100% rename from code/tests/fixtures/shared_fixtures.py rename to archive/tests/fixtures/shared_fixtures.py diff --git a/code/tests/utils/test_tcpdump_utils.py b/archive/tests/models/test_capture_metadata_model.py similarity index 100% rename from code/tests/utils/test_tcpdump_utils.py rename to archive/tests/models/test_capture_metadata_model.py diff --git a/archive/tests/models/test_device_metadata_model.py b/archive/tests/models/test_device_metadata_model.py new file mode 100644 index 0000000..e69de29 diff --git a/code/tests/subcommands/test_add_device.py b/archive/tests/subcommands/test_add_device.py similarity index 100% rename from code/tests/subcommands/test_add_device.py rename to archive/tests/subcommands/test_add_device.py diff --git a/code/tests/test_capture_metadata_model.py b/archive/tests/test_capture_metadata_model.py similarity index 100% rename from code/tests/test_capture_metadata_model.py rename to archive/tests/test_capture_metadata_model.py diff --git a/archive/tests/test_main.py b/archive/tests/test_main.py new file mode 100644 index 0000000..e69de29 diff --git a/code/tests/utils/test_capture_metadata_utils.py b/archive/tests/utils/test_capture_metadata_utils.py similarity index 100% rename from code/tests/utils/test_capture_metadata_utils.py rename to archive/tests/utils/test_capture_metadata_utils.py diff --git a/archive/tests/utils/test_device_metadata_utils.py b/archive/tests/utils/test_device_metadata_utils.py new file mode 100644 index 0000000..e69de29 diff --git a/archive/tests/utils/test_tcpdump_utils.py b/archive/tests/utils/test_tcpdump_utils.py new file mode 100644 index 0000000..e69de29 diff --git a/code/iottb-project/.gitignore b/code/iottb-project/.gitignore new file mode 100644 index 0000000..a457143 --- /dev/null +++ b/code/iottb-project/.gitignore @@ -0,0 +1,36 @@ +__pycache__ +.venv +iottb.egg-info +.idea +*.log +logs/ +*.pyc +.obsidian + +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/**/usage.statistics.xml +.idea/**/dictionaries +.idea/**/shelf + +# AWS User-specific +.idea/**/aws.xml + +# Generated files +.idea/**/contentModel.xml + +# Sensitive or high-churn files +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml +.idea/**/dbnavigator.xml + +.private/ +*.pcap \ No newline at end of file diff --git a/code/iottb-project/iottb/commands/add_device.py b/code/iottb-project/iottb/commands/add_device.py index d518080..5524ea7 100644 --- a/code/iottb-project/iottb/commands/add_device.py +++ b/code/iottb-project/iottb/commands/add_device.py @@ -23,7 +23,7 @@ def add_device_guided(ctx, cn, db): @click.option('--dev', '--device-name', type=str, required=True, help='The name of the device to be added. If this string contains spaces or other special characters \ normalization is performed to derive a canonical name') -@click.option('--db', '--database', type=click.Path(exists=True, file_okay=False, dir_okay=True, path_type=Path), +@click.option('--db', '--database', type=click.Path(exists=True, file_okay=False, dir_okay=True), envvar='IOTTB_DB', show_envvar=True, help='Database in which to add this device. If not specified use default from config.') @click.option('--guided', is_flag=True, default=False, show_default=True, envvar='IOTTB_GUIDED_ADD', show_envvar=True, @@ -44,7 +44,7 @@ def add_device(dev, db, guided): # dependency: Database folder must exist if db: database = db - path = config.db_path_dict + path = config.db_path_dict[database] logger.debug(f'Resolved (path, db) {path}, {database}') else: path = config.default_db_location diff --git a/code/iottb-project/iottb/commands/developer.py b/code/iottb-project/iottb/commands/developer.py index 89ec530..d1e992b 100644 --- a/code/iottb-project/iottb/commands/developer.py +++ b/code/iottb-project/iottb/commands/developer.py @@ -2,6 +2,7 @@ from pathlib import Path import logging import click +from iottb import tb_echo from iottb.definitions import DB_NAME, CFG_FILE_PATH from iottb.models.iottb_config import IottbConfig @@ -94,12 +95,17 @@ def show_everything(ctx): click.echo(f"Default Database: {config.default_database}") click.echo(f"Default Database Path: {config.default_db_location}") click.echo("Database Locations:") + everything_dict = {} + for db_name, db_path in config.db_path_dict.items(): + + click.echo(f" - {db_name}: {db_path}") for db_name, db_path in config.db_path_dict.items(): full_db_path = Path(db_path) / db_name - click.echo(f" - {db_name}: {full_db_path}") if full_db_path.is_dir(): - click.echo(f"Contents of {db_name} at {full_db_path}:") + click.echo(f"\nContents of {full_db_path}:") + flag = True for item in full_db_path.iterdir(): + flag = False if item.is_file(): click.echo(f" - {item.name}") try: @@ -115,9 +121,10 @@ def show_everything(ctx): click.echo(f" - {subitem.name}") elif subitem.is_dir(): click.echo(f" - {subitem.name}/") + if flag: + tb_echo(f'\t EMPTY') else: - click.echo(f" {full_db_path} is not a directory") + click.echo(f"{full_db_path} is not a directory") + -warnstyle = {'fg': 'red', 'bold': True} -click.secho('Developer command used', **warnstyle) diff --git a/code/iottb-project/iottb/commands/sniff.py b/code/iottb-project/iottb/commands/sniff.py index 0c5334c..0eef81c 100644 --- a/code/iottb-project/iottb/commands/sniff.py +++ b/code/iottb-project/iottb/commands/sniff.py @@ -1,3 +1,8 @@ +import os +import shutil +import uuid +from time import time + import click import subprocess import json @@ -5,9 +10,11 @@ from pathlib import Path import logging import re from datetime import datetime +from click_option_group import optgroup from iottb.definitions import APP_NAME, CFG_FILE_PATH from iottb.models.iottb_config import IottbConfig from iottb.utils.string_processing import make_canonical_name + # Setup logger logger = logging.getLogger('iottb.sniff') @@ -40,29 +47,51 @@ def validate_sniff(ctx, param, value): @click.command('sniff', help='Sniff packets with tcpdump') -@click.argument('device') -@click.option('-i', '--interface', callback=validate_sniff, help='Network interface to capture on', - envvar='IOTTB_CAPTURE_INTERFACE') -@click.option('-a', '--address', callback=validate_sniff, help='IP or MAC address to filter packets by', - envvar='IOTTB_CAPTURE_ADDRESS') -@click.option('--db', '--database', type=click.Path(exists=True, file_okay=False), envvar='IOTTB_DB', - help='Database of device. Only needed if not current default.') -@click.option('--unsafe', is_flag=True, default=False, envvar='IOTTB_UNSAFE', is_eager=True, - help='Disable checks for otherwise required options') -@click.option('--guided', is_flag=True, default=False) -def sniff(device, interface, address, db, unsafe, guided): +@optgroup.group('Testbed sources') +@optgroup.option('--db', '--database', type=str, envvar='IOTTB_DB', show_envvar=True, + help='Database of device. Only needed if not current default.') +@optgroup.option('--app', type=str, help='Companion app being used during capture', required=False) +@optgroup.group('Runtime behaviour') +@optgroup.option('--unsafe', is_flag=True, default=False, envvar='IOTTB_UNSAFE', is_eager=True, + help='Disable checks for otherwise required options.\n', show_envvar=True) +@optgroup.option('--guided', is_flag=True, default=False, envvar='IOTTB_GUIDED', show_envvar=True) +@optgroup.option('--pre', type=click.Path(exists=True, executable=True), help='Script to be executed before main ' + 'command' + 'is started.') +@optgroup.group('Tcpdump options') +@optgroup.option('-i', '--interface', + help='Network interface to capture on.' + + 'If not specified tcpdump tries to find and appropriate one.\n', show_envvar=True, + envvar='IOTTB_CAPTURE_INTERFACE') +@optgroup.option('-a', '--address', callback=validate_sniff, + help='IP or MAC address to filter packets by.\n', show_envvar=True, + envvar='IOTTB_CAPTURE_ADDRESS') +@optgroup.option('-I', '--monitor-mode', help='Put interface into monitor mode.', is_flag=True) +@optgroup.option('--ff', type=str, envvar='IOTTB_CAPTURE_FILTER', show_envvar=True, + help='tcpdump filter as string or file path.') +@optgroup.option('-#', '--print-pacno', is_flag=True, default=True, + help='Print packet number at beginning of line. True by default.') +@optgroup.option('-e', '--print-ll', is_flag=True, default=False, + help='Print link layer headers. True by default.') +@optgroup.option('-c', '--count', type=int, help='Number of packets to capture.', default=1000) +# @optgroup.option('--mins', type=int, help='Time in minutes to capture.', default=1) +@click.argument('tcpdump-args', nargs=-1, required=False, metavar='[TCPDUMP-ARGS]') +@click.argument('device', required=False) +@click.pass_context +def sniff(ctx, device, interface, print_pacno, ff, count, monitor_mode, print_ll, address, db, unsafe, guided, + app, tcpdump_args, **params): """ Sniff packets from a device """ logger.info('sniff command invoked') # Step1: Load Config - config = IottbConfig(Path(CFG_FILE_PATH)) + config = ctx.obj['CONFIG'] logger.debug(f'Config loaded: {config}') # Step2: determine relevant database database = db if db else config.default_database - path = config.default_db_location[database] + path = config.db_path_dict[database] full_db_path = Path(path) / database - logger.debug(f'Full db path is {str(path)}') + logger.debug(f'Full db path is {str(full_db_path)}') # 2.2: Check if it exists if not full_db_path.is_dir(): @@ -84,35 +113,215 @@ def sniff(device, interface, address, db, unsafe, guided): device_path.mkdir(parents=True, exist_ok=True) logger.info(f'Device path {device_path} created') - # Generate filter - if not unsafe: - if is_ip_address(address): - packet_filter = f"host {address}" - elif is_mac_address(address): - packet_filter = f"ether host {address}" + click.echo(f'Found device at path {device_path}') + # Step 4: Generate filter + generic_filter = None + cap_filter = None + if ff: + logger.debug(f'ff: {ff}') + if Path(ff).is_file(): + logger.info('Given filter option is a file') + with open(ff, 'r') as f: + cap_filter = f.read().strip() else: - logger.error('Invalid address format') - click.echo('Invalid address format') - return + logger.info('Given filter option is an expression') + cap_filter = ff else: - packet_filter = None + if address is not None: + if is_ip_address(address): + generic_filter = 'net' + cap_filter = f'{generic_filter} {address}' + elif is_mac_address(address): + generic_filter = 'ether net' + cap_filter = f'{generic_filter} {address}' + elif not unsafe: + logger.error('Invalid address format') + click.echo('Invalid address format') + return + logger.info(f'Generic filter {generic_filter}') + click.echo(f'Using filter {cap_filter}') + # Step 5: prep capture directory + capture_date = datetime.now().strftime('%Y-%m-%d') + capture_base_dir = device_path / f'sniffs/{capture_date}' + capture_base_dir.mkdir(parents=True, exist_ok=True) + logger.debug(f'Previous captures {capture_base_dir.glob('cap*')}') + capture_count = sum(1 for _ in capture_base_dir.glob('cap*')) + logger.debug(f'Capture count is {capture_count}') -@click.command('sniff', help='Sniff packets with tcpdump') -@click.argument('device') -@click.option('-i', '--interface', required=False, help='Network interface to capture on', envvar='IOTTB_CAPTURE_INTERFACE') -@click.option('-a', '--address', required=True, help='IP or MAC address to filter packets by', envvar='IOTTB_CAPTURE_ADDRESS') -@click.option('--db', '--database', type=click.Path(exists=True, file_okay=False), envvar='IOTTB_DB', - help='Database of device. Only needed if not current default.') -@click.option('--unsafe', is_flag=True, default=False, envvar='IOTTB_UNSAFE', - help='Disable checks for otherwise required options') -@click.option('--guided', is_flag=True) -def sniff2(device, interface, address, cfg_file): - """ Sniff packets from a device """ - logger.info('sniff command invoked') - # Step 1: Load Config - # Dependency: Config file must exist - config = IottbConfig(Path(CFG_FILE_PATH)) - logger.debug(f'Config loaded: {config}') + capture_dir = f'cap{capture_count:04d}-{datetime.now().strftime('%H%M')}' + logger.debug(f'capture_dir: {capture_dir}') + + # Full path + capture_dir_full_path = capture_base_dir / capture_dir + capture_dir_full_path.mkdir(parents=True, exist_ok=True) + + click.echo(f'Files will be placed in {str(capture_dir_full_path)}') + logger.debug(f'successfully created capture directory') + + # Step 6: Prepare capture file names + # Generate UUID for filenames + capture_uuid = str(uuid.uuid4()) + click.echo(f'Capture has id {capture_uuid}') + + pcap_file = f"{canonical_name}_{capture_uuid}.pcap" + pcap_file_full_path = capture_dir_full_path / pcap_file + stdout_log_file = f'stdout_{capture_uuid}.log' + stderr_log_file = f'stderr_{capture_uuid}.log' + + logger.debug(f'Full pcap file path is {pcap_file_full_path}') + logger.info(f'pcap file name is {pcap_file}') + logger.info(f'stdout log file is {stdout_log_file}') + logger.info(f'stderr log file is {stderr_log_file}') + + # Step 7: Build tcpdump command + logger.debug(f'pgid {os.getpgrp()}') + logger.debug(f'ppid {os.getppid()}') + logger.debug(f'(real, effective, saved) user id: {os.getresuid()}') + logger.debug(f'(real, effective, saved) group id: {os.getresgid()}') + + cmd = ['sudo', 'tcpdump'] + + # 7.1 process flags + flags = [] + if print_pacno: + flags.append('-#') + if print_ll: + flags.append('-e') + if monitor_mode: + flags.append('-I') + flags.append('-n') # TODO: Integrate, in case name resolution is wanted! + cmd.extend(flags) + flags_string = " ".join(flags) + logger.debug(f'Flags: {flags_string}') + + # debug interlude + verbosity = ctx.obj['VERBOSITY'] + if verbosity > 0: + verbosity_flag = '-' + for i in range(0, verbosity): + verbosity_flag = verbosity_flag + 'v' + logger.debug(f'verbosity string to pass to tcpdump: {verbosity_flag}') + cmd.append(verbosity_flag) + + # 7.2 generic (i.e. reusable) kw args + generic_kw_args = [] + if count: + generic_kw_args.extend(['-c', str(count)]) + # if mins: + # generic_kw_args.extend(['-G', str(mins * 60)]) TODO: this currently loads to errors with sudo + cmd.extend(generic_kw_args) + generic_kw_args_string = " ".join(generic_kw_args) + logger.debug(f'KW args: {generic_kw_args_string}') + + # 7.3 special kw args (not a priori reusable) + non_generic_kw_args = [] + if interface: + non_generic_kw_args.extend(['-i', interface]) + non_generic_kw_args.extend(['-w', str(pcap_file_full_path)]) + cmd.extend(non_generic_kw_args) + non_generic_kw_args_string = " ".join(non_generic_kw_args) + logger.debug(f'Non transferable (special) kw args: {non_generic_kw_args_string}') + + # 7.4 add filter expression + if cap_filter: + logger.debug(f'cap_filter (not generic): {cap_filter}') + cmd.append(cap_filter) + + full_cmd_string = " ".join(cmd) + + logger.info(f'tcpdump command: {"".join(full_cmd_string)}') + click.echo('Capture setup complete!') + # Step 8: Execute tcpdump command + start_time = datetime.now().strftime("%H:%M:%S") + start = time() + try: + if guided: + click.confirm(f'Execute following command: {full_cmd_string}') + stdout_log_file_abs_path = capture_dir_full_path / stdout_log_file + stderr_log_file_abs_path = capture_dir_full_path / stderr_log_file + stdout_log_file_abs_path.touch(mode=0o777) + stderr_log_file_abs_path.touch(mode=0o777) + with open(stdout_log_file_abs_path, 'w') as out, open(stderr_log_file_abs_path, 'w') as err: + logger.debug(f'\nstdout: {out}.\nstderr: {err}.\n') + + tcp_complete = subprocess.run(cmd, check=True, capture_output=True, text=True) + + out.write(tcp_complete.stdout) + err.write(tcp_complete.stderr) + + #click.echo(f'Mock sniff execution') + click.echo(f"Capture complete. Saved to {pcap_file}") + except subprocess.CalledProcessError as e: + logger.error(f'Failed to capture packets: {e}') + click.echo(f'Failed to capture packets: {e}') + click.echo(f'Check {stderr_log_file} for more info.') + if ctx.obj['DEBUG']: + msg = [f'STDERR log {stderr_log_file} contents:\n'] + with open(capture_dir_full_path / stderr_log_file) as log: + for line in log: + msg.append(line) + + click.echo("\t".join(msg), lvl='e') + # print('DEBUG ACTIVE') + if guided: + click.prompt('Create metadata anyway?') + else: + click.echo('Aborting capture...') + exit() + end_time = datetime.now().strftime("%H:%M:%S") + end = time() + delta = end - start + click.echo(f'tcpdump took {delta:.2f} seconds.') + # Step 9: Register metadata + metadata = { + 'device': canonical_name, + 'device_id': device, + 'capture_id': capture_uuid, + 'capture_date_iso': datetime.now().isoformat(), + 'invoked_command': " ".join(map(str, cmd)), + 'capture_duration': delta, + 'generic_parameters': { + 'flags': flags_string, + 'kwargs': generic_kw_args_string, + 'filter': generic_filter + }, + 'non_generic_parameters': { + 'kwargs': non_generic_kw_args_string, + 'filter': cap_filter + }, + 'features': { + 'interface': interface, + 'address': address + }, + 'resources': { + 'pcap_file': str(pcap_file), + 'stdout_log': str(stdout_log_file), + 'stderr_log': str(stderr_log_file) + }, + 'environment': { + 'capture_dir': capture_dir, + 'database': database, + 'capture_base_dir': str(capture_base_dir), + 'capture_dir_abs_path': str(capture_dir_full_path) + } + } + + click.echo('Ensuring correct ownership of created files.') + username = os.getlogin() + gid = os.getgid() + + # Else there are issues when running with sudo: + try: + subprocess.run(f'sudo chown -R {username}:{username} {device_path}', shell=True) + except OSError as e: + click.echo(f'Some error {e}') + + click.echo(f'Saving metadata.') + metadata_abs_path = capture_dir_full_path / 'capture_metadata.json' + with open(metadata_abs_path, 'w') as f: + json.dump(metadata, f, indent=4) + + click.echo(f'END SNIFF SUBCOMMAND') diff --git a/code/iottb-project/iottb/commands/testbed.py b/code/iottb-project/iottb/commands/testbed.py new file mode 100644 index 0000000..eb26a9f --- /dev/null +++ b/code/iottb-project/iottb/commands/testbed.py @@ -0,0 +1,120 @@ +import click +from pathlib import Path +import logging +from logging.handlers import RotatingFileHandler +import sys +from iottb.models.iottb_config import IottbConfig +from iottb.definitions import DB_NAME, CFG_FILE_PATH + +logger = logging.getLogger(__name__) + + +@click.command() +@click.option('-d', '--dest', type=click.Path(), help='Location to put (new) iottb database') +@click.option('-n', '--name', default=DB_NAME, type=str, help='Name of new database.') +@click.option('--update-default/--no-update-default', default=True, help='If new db should be set as the new default') +@click.pass_context +def init_db(ctx, dest, name, update_default): + logger.info('init-db invoked') + config = ctx.obj['CONFIG'] + logger.debug(f'str(config)') + # Use the default path from config if dest is not provided + known_dbs = config.get_known_databases() + logger.debug(f'Known databases: {known_dbs}') + if name in known_dbs: + dest = config.get_database_location(name) + if Path(dest).joinpath(name).is_dir(): + click.echo(f'A database {name} already exists.') + logger.debug(f'DB {name} exists in {dest}') + click.echo(f'Exiting...') + exit() + logger.debug(f'DB name {name} registered but does not exist.') + if not dest: + logger.info('No dest set, choosing default destination.') + dest = Path(config.default_db_location).parent + + db_path = Path(dest).joinpath(name) + logger.debug(f'Full path for db {str(db_path)}') + # Create the directory if it doesn't exist + db_path.mkdir(parents=True, exist_ok=True) + logger.info(f"mkdir {db_path} successful") + click.echo(f'Created {db_path}') + + # Update configuration + config.set_database_location(name, str(dest)) + if update_default: + config.set_default_database(name, str(dest)) + config.save_config() + logger.info(f"Updated configuration with database {name} at {db_path}") + + +# @click.group('config') +# @click.pass_context +# def cfg(ctx): +# pass +# +# @click.command('set', help='Set the location of a database.') +# @click.argument('database', help='Name of database') +# @click.argument('location', help='Where the database is located (i.e. its parent directory)') +# @click.pass_context +# def set(ctx, key, value): +# click.echo(f'Setting {key} to {value} in config') +# config = ctx.obj['CONFIG'] +# logger.warning('No checks performed!') +# config.set_database_location(key, value) +# config.save_config() + + + + + +@click.command() +@click.option('-d', '--dest', type=click.Path(), help='Location to put (new) iottb database') +@click.option('-n', '--name', default=DB_NAME, type=str, help='Name of new database.') +@click.option('--update-default/--no-update-default', default=True, help='If new db should be set as the new default') +@click.pass_context +def init_db_inactive(ctx, dest, name, update_default): + logger.info('init-db invoked') + config = ctx.obj['CONFIG'] + logger.debug(f'str(config)') + + # Retrieve known databases + known_dbs = config.get_known_databases() + + # Determine destination path + if name in known_dbs: + dest = Path(config.get_database_location(name)) + if dest.joinpath(name).is_dir(): + click.echo(f'A database {name} already exists.') + logger.debug(f'DB {name} exists in {dest}') + click.echo(f'Exiting...') + exit() + logger.debug(f'DB name {name} registered but does not exist.') + elif not dest: + logger.info('No destination set, using default path from config.') + dest = Path(config.default_db_location).parent + + # Ensure destination path is absolute + dest = dest.resolve() + + # Combine destination path with database name + db_path = dest / name + logger.debug(f'Full path for database: {str(db_path)}') + + # Create the directory if it doesn't exist + try: + db_path.mkdir(parents=True, exist_ok=True) + logger.info(f'Directory {db_path} created successfully.') + click.echo(f'Created {db_path}') + except Exception as e: + logger.error(f'Failed to create directory {db_path}: {e}') + click.echo(f'Failed to create directory {db_path}: {e}', err=True) + exit(1) + + # Update configuration + config.set_database_location(name, str(db_path)) + if update_default: + config.set_default_database(name, str(db_path)) + config.save_config() + logger.info(f'Updated configuration with database {name} at {db_path}') + click.echo(f'Updated configuration with database {name} at {db_path}') diff --git a/code/iottb-project/iottb/definitions.py b/code/iottb-project/iottb/definitions.py index d4ee672..76d594c 100644 --- a/code/iottb-project/iottb/definitions.py +++ b/code/iottb-project/iottb/definitions.py @@ -44,3 +44,5 @@ TB_ECHO_STYLES = { } NAME_OF_CAPTURE_DIR = 'sniffs' + + diff --git a/code/iottb-project/iottb/main.py b/code/iottb-project/iottb/main.py index c3aaa7a..acba6dd 100644 --- a/code/iottb-project/iottb/main.py +++ b/code/iottb-project/iottb/main.py @@ -1,16 +1,20 @@ +import os +import shutil + import click from pathlib import Path import logging from iottb.commands.sniff import sniff from iottb.commands.developer import set_key_in_table_to, rm_cfg, rm_dbs, show_cfg, show_everything + ################################################## # Import package modules ################################################# from iottb.utils.logger_config import setup_logging from iottb import definitions from iottb.models.iottb_config import IottbConfig -from iottb.commands.initialize_testbed import init_db +from iottb.commands.testbed import init_db from iottb.commands.add_device import add_device ############################################################################ @@ -28,26 +32,33 @@ loglevel = definitions.LOGLEVEL logger = logging.getLogger(__name__) -@click.group() -@click.option('-v', '--verbosity', count=True, type=click.IntRange(0, 3), default=0, +@click.group(context_settings=dict(auto_envvar_prefix='IOTTB', show_default=True)) +@click.option('-v', '--verbosity', count=True, type=click.IntRange(0, 3), default=0, is_eager=True, help='Set verbosity') -@click.option('-d', '--debug', is_flag=True, default=False, +@click.option('-d', '--debug', is_flag=True, default=False, is_eager=True, help='Enable debug mode') +@click.option('--dry-run', is_flag=True, default=True, is_eager=True) @click.option('--cfg-file', type=click.Path(), default=Path(click.get_app_dir(APP_NAME)).joinpath('iottb.cfg'), envvar='IOTTB_CONF_HOME', help='Path to iottb config file') @click.pass_context -def cli(ctx, verbosity, debug, cfg_file): +def cli(ctx, verbosity, debug, dry_run, cfg_file): setup_logging(verbosity, debug) # Setup logging based on the loaded configuration and other options ctx.ensure_object(dict) # Make sure context is ready for use logger.info("Starting execution.") ctx.obj['CONFIG'] = IottbConfig(cfg_file) # Load configuration directly ctx.meta['FULL_PATH_CONFIG_FILE'] = str(cfg_file) + ctx.meta['DRY_RUN'] = dry_run + logger.debug(f'Verbosity: {verbosity}') + ctx.obj['VERBOSITY'] = verbosity + logger.debug(f'Debug: {debug}') + ctx.obj['DEBUG'] = debug ################################################################################## # Add all subcommands to group here ################################################################################# +# TODO: Is there a way to do this without pylint freaking out? # noinspection PyTypeChecker cli.add_command(init_db) cli.add_command(rm_cfg) @@ -58,5 +69,9 @@ cli.add_command(add_device) cli.add_command(show_cfg) cli.add_command(sniff) cli.add_command(show_everything) + + if __name__ == '__main__': - cli(auto_envvar_prefix='IOTTB', show_default=True, show_envvars=True) + cli() + for log in Path.cwd().iterdir(): + log.chmod(0o777) diff --git a/code/iottb-project/iottb/models/sniff_metadata.py b/code/iottb-project/iottb/models/sniff_metadata.py index 9e66e0d..9fa5e11 100644 --- a/code/iottb-project/iottb/models/sniff_metadata.py +++ b/code/iottb-project/iottb/models/sniff_metadata.py @@ -1,4 +1,39 @@ +import json import logging +import uuid +from datetime import datetime +from pathlib import Path logger = logging.getLogger('iottb.sniff') # Log with sniff subcommand +class CaptureMetadata: + def __init__(self, device_id, capture_dir, interface, address, capture_file, tcpdump_command, tcpdump_stdout, tcpdump_stderr, packet_filter, alias): + self.base_data = { + 'device_id': device_id, + 'capture_id': str(uuid.uuid4()), + 'capture_date': datetime.now().isoformat(), + 'capture_dir': str(capture_dir), + 'capture_file': capture_file, + 'start_time': "", + 'stop_time': "", + 'alias': alias + } + self.features = { + 'interface': interface, + 'device_ip_address': address if address else "No IP Address set", + 'tcpdump_stdout': str(tcpdump_stdout), + 'tcpdump_stderr': str(tcpdump_stderr), + 'packet_filter': packet_filter + } + self.command = tcpdump_command + + def save_to_file(self): + metadata = { + 'base_data': self.base_data, + 'features': self.features, + 'command': self.command + } + metadata_file_path = Path(self.base_data['capture_dir']) / 'metadata.json' + with open(metadata_file_path, 'w') as f: + json.dump(metadata, f, indent=4) + logger.info(f'Metadata saved to {metadata_file_path}') diff --git a/code/iottb-project/iottb/scripts/generate_help.py b/code/iottb-project/iottb/scripts/generate_help.py new file mode 100755 index 0000000..fd4b683 --- /dev/null +++ b/code/iottb-project/iottb/scripts/generate_help.py @@ -0,0 +1,52 @@ +import click +from io import StringIO +import sys + +# Import your CLI app here +from iottb.main import cli + +"""Script to generate the help text and write to file. + + Definitely needs better formatting. + Script is also not very flexible. +""" + + +def get_help_text(command): + """Get the help text for a given command.""" + help_text = StringIO() + with click.Context(command) as ctx: + # chatgpt says this helps: was right + sys_stdout = sys.stdout + sys.stdout = help_text + try: + click.echo(command.get_help(ctx)) + finally: + sys.stdout = sys_stdout + return help_text.getvalue() + + +def write_help_to_file(cli, filename): + """Write help messages of all commands and subcommands to a file.""" + with open(filename, 'w') as f: + # main + f.write(f"Main Command: iottb\n") + f.write(get_help_text(cli)) + f.write("\n\n") + + # go through subcommands + for cmd_name, cmd in cli.commands.items(): + f.write(f"Command: {cmd_name}\n") + f.write(get_help_text(cmd)) + f.write("\n\n") + + # subcommands of subcommands + if isinstance(cmd, click.Group): + for sub_cmd_name, sub_cmd in cmd.commands.items(): + f.write(f"Subcommand: {cmd_name} {sub_cmd_name}\n") + f.write(get_help_text(sub_cmd)) + f.write("\n\n") + + +if __name__ == "__main__": + write_help_to_file(cli, "help_messages.md") diff --git a/code/iottb-project/iottb/scripts/sudo_iottb b/code/iottb-project/iottb/scripts/sudo_iottb new file mode 100644 index 0000000..1f1af62 --- /dev/null +++ b/code/iottb-project/iottb/scripts/sudo_iottb @@ -0,0 +1,4 @@ +#/bin/sh +echo 'Running iottb as sudo' +sudo $(which python) iottb $@ +echo 'Finished executing iottb with sudo' \ No newline at end of file diff --git a/code/iottb-project/iottb/utils/string_processing.py b/code/iottb-project/iottb/utils/string_processing.py index 321e842..7b2ae39 100644 --- a/code/iottb-project/iottb/utils/string_processing.py +++ b/code/iottb-project/iottb/utils/string_processing.py @@ -34,7 +34,7 @@ def make_canonical_name(name): parts = norm_name.split('-') canonical_name = canonical_name = '-'.join(parts[:2]) aliases.append(canonical_name) - + aliases = list(set(aliases)) logger.debug(f'Canonical name: {canonical_name}') logger.debug(f'Aliases: {aliases}') - return canonical_name, list(set(aliases)) + return canonical_name, aliases diff --git a/code/iottb-project/pyproject.toml b/code/iottb-project/pyproject.toml index eda5a1f..80acba6 100644 --- a/code/iottb-project/pyproject.toml +++ b/code/iottb-project/pyproject.toml @@ -4,7 +4,6 @@ version = "0.1.0" description = "IoT Testbed" authors = ["Sebastian Lenzlinger "] readme = "README.md" -package-mode = false [tool.poetry.dependencies] python = "^3.12" diff --git a/code/iottb/.gitignore b/code/iottb/.gitignore new file mode 100644 index 0000000..852cc91 --- /dev/null +++ b/code/iottb/.gitignore @@ -0,0 +1,3 @@ +__pycache__ +.venv +iottb.egg-info \ No newline at end of file diff --git a/code/iottb/__main__.py b/code/iottb/__main__.py index 1512c70..fb99a10 100644 --- a/code/iottb/__main__.py +++ b/code/iottb/__main__.py @@ -1,82 +1,30 @@ -#!/usr/bin/env python3 import argparse -from os import environ +import logging from pathlib import Path -from iottb.logger import logger -from iottb.subcommands.add_device import setup_init_device_root_parser -from iottb.subcommands.capture import setup_capture_parser -from iottb.utils.tcpdump_utils import list_interfaces -from definitions import IOTTB_HOME_ABS, ReturnCodes +from .commands.sniff import setup_sniff_parser +from .config import Config +from .utils.file_utils import ensure_directory_exists -###################### -# Argparse setup -###################### -def setup_argparse(): - # create top level parser - root_parser = argparse.ArgumentParser(prog='iottb') - subparsers = root_parser.add_subparsers(title='subcommands', required=True, dest='command') - - # shared options - root_parser.add_argument('--verbose', '-v', action='count', default=0) - # configure subcommands - setup_capture_parser(subparsers) - setup_init_device_root_parser(subparsers) - - # Utility to list interfaces directly with iottb instead of relying on external tooling - - interfaces_parser = subparsers.add_parser('list-interfaces', aliases=['li', 'if'], - help='List available network interfaces.') - interfaces_parser.set_defaults(func=list_interfaces) - - return root_parser - - -def check_iottb_env(): - # This makes the option '--root-dir' obsolescent # TODO How to streamline this?\ - try: - iottb_home = environ['IOTTB_HOME'] # TODO WARN implicit declaration of env var name! - except KeyError: - logger.error(f"Environment variable 'IOTTB_HOME' is not set." - f"Setting environment variable 'IOTTB_HOME' to '~/{IOTTB_HOME_ABS}'") - environ['IOTTB_HOME'] = IOTTB_HOME_ABS - finally: - if not Path(IOTTB_HOME_ABS).exists(): - print(f'"{IOTTB_HOME_ABS}" does not exist.') - response = input('Do you want to create it now? [y/N]') - logger.debug(f'response: {response}') - if response.lower() != 'y': - logger.debug(f'Not creating "{environ['IOTTB_HOME']}"') - print('TODO') - print("Aborting execution...") - return ReturnCodes.ABORTED - else: - print(f'Creating "{environ['IOTTB_HOME']}"') - Path(IOTTB_HOME_ABS).mkdir(parents=True, - exist_ok=False) # Should always work since in 'not exist' code path - return ReturnCodes.OK - logger.info(f'"{IOTTB_HOME_ABS}" exists.') - # TODO: Check that it is a valid iottb dir or can we say it is valid by definition if? - return ReturnCodes.OK +def setup_logging(): + logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(name)s: %(message)s') def main(): - if check_iottb_env() != ReturnCodes.OK: - exit(ReturnCodes.ABORTED) - parser = setup_argparse() + setup_logging() + + parser = argparse.ArgumentParser(description='IoT Testbed') + subparsers = parser.add_subparsers() + + setup_sniff_parser(subparsers) + args = parser.parse_args() - print(args) - if args.command: - try: - args.func(args) - except KeyboardInterrupt: - print('Received keyboard interrupt. Exiting...') - exit(1) - except Exception as e: - print(f'Error: {e}') - # create_capture_directory(args.device_name) + if hasattr(args, 'func'): + args.func(args) + else: + parser.print_help() -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/code/iottb/commands/__init__.py b/code/iottb/commands/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/code/iottb/commands/sniff.py b/code/iottb/commands/sniff.py new file mode 100644 index 0000000..f8e97ad --- /dev/null +++ b/code/iottb/commands/sniff.py @@ -0,0 +1,108 @@ +import subprocess +import re +from datetime import datetime +from pathlib import Path +import logging +from models.capture_metadata import CaptureMetadata +from models.device_metadata import DeviceMetadata +from utils.capture_utils import get_capture_src_folder, make_capture_src_folder +from utils.tcpdump_utils import check_installed +from utils.file_utils import ensure_directory_exists +from config import Config + +logger = logging.getLogger('iottb.sniff') + + +def is_ip_address(address): + ip_pattern = re.compile(r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$") + return ip_pattern.match(address) is not None + + +def is_mac_address(address): + mac_pattern = re.compile(r"^([0-9A-Fa-f]{2}:){5}[0-9A-Fa-f]{2}$") + return mac_pattern.match(address) is not None + + +class Sniffer: + def __init__(self, device_id, capture_interface, address=None, safe=True): + #TODO Decide if we want to use the device_id as the device_name (seems unhandy) + self.device_id = device_id + self.capture_interface = capture_interface + self.address = address + self.safe = safe + self.config = Config().load_config() + self.device_path = self.initialize_device(device_id) + self.filter = self.generate_filter() + + def initialize_device(self, device_id): + db_path = Path(self.config.get('database_path', '~/iottb.db')).expanduser() + device_path = db_path / device_id + ensure_directory_exists(device_path) + + metadata_file = device_path / 'device_metadata.json' + if not metadata_file.exists(): + device_metadata = DeviceMetadata(device_name=device_id, device_root_path=device_path) + device_metadata.save_to_file() + return device_path + + def get_capture_metadata(self, capture_dir): + metadata = CaptureMetadata(device_id=self.device_id, capture_dir=capture_dir) + metadata.build_capture_file_name() + metadata.interface = self.capture_interface + metadata.device_ip_address = self.address or "No IP Address set" + return metadata + + def generate_filter(self): + + if not self.address and self.safe: + raise ValueError("Address must be provided in safe mode.") + + if is_ip_address(self.address): + return f"host {self.address}" + elif is_mac_address(self.address): + return f"ether host {self.address}" + else: + raise ValueError("Invalid address format.") + + def capture(self): + if not check_installed(): + print('Please install tcpdump first') + return + + capture_dir = make_capture_src_folder(get_capture_src_folder(self.device_path)) + metadata = self.get_capture_metadata(capture_dir) + pcap_file = capture_dir / metadata.capture_file + cmd = ['sudo', 'tcpdump', '-i', self.capture_interface, '-w', str(pcap_file)] + + if self.filter: + cmd.append(self.filter) + + metadata.tcpdump_command = ' '.join(cmd) + print(f'Executing: {metadata.tcpdump_command}') + + try: + metadata.start_time = datetime.now().isoformat() + subprocess.run(cmd, check=True) + metadata.stop_time = datetime.now().isoformat() + except subprocess.CalledProcessError as e: + logger.error(f'Failed to capture packets: {e}') + return + + metadata.save_to_file() + print(f"Capture complete. Metadata saved to {capture_dir / 'metadata.json'}") + + +def setup_sniff_parser(subparsers): + parser = subparsers.add_parser('sniff', help='Sniff packets with tcpdump') + parser.add_argument('device_id', help='ID of the device to sniff') + parser.add_argument('-i', '--interface', required=True, help='Network interface to capture on') + parser.add_argument('-a', '--address', help='IP or MAC address to filter packets by') + parser.add_argument('-u', '--unsafe', action='store_true', help='Run in unsafe mode without supplying an address. ' + 'Highly discouraged.') + parser.set_defaults(func=handle_sniff) + + +def handle_sniff(args): + sniffer = Sniffer(device_id=args.device_id, capture_interface=args.interface, address=args.address, + safe=not args.unsafe) + sniffer.capture() diff --git a/code/iottb/config.json b/code/iottb/config.json new file mode 100644 index 0000000..ec311ec --- /dev/null +++ b/code/iottb/config.json @@ -0,0 +1 @@ +{"database_path": "~/.iottb.db", "log_level": "INFO"} \ No newline at end of file diff --git a/code/iottb/config.py b/code/iottb/config.py new file mode 100644 index 0000000..6c0e532 --- /dev/null +++ b/code/iottb/config.py @@ -0,0 +1,45 @@ +import json +from pathlib import Path +import logging + +logger = logging.getLogger('iottb.config') + + +class Config: + DEFAULT_CONFIG = { + "database_path": "~/.iottb.db", + "log_level": "INFO" + } + + def __init__(self, config_file=None): + self.config_file = Path(config_file or "config.json") + if not self.config_file.exists(): + self.create_default_config() + else: + self.config = self.load_config() + + def create_default_config(self): + try: + self.save_config(self.DEFAULT_CONFIG) + except (IsADirectoryError, PermissionError) as e: + logger.error(f"Error creating default config: {e}") + raise + + def load_config(self): + try: + with open(self.config_file, "r") as file: + return json.load(file) + except IsADirectoryError as e: + logger.error(f"Error loading config: {e}") + raise + except PermissionError as e: + logger.error(f"Error loading config: {e}") + raise + + def save_config(self, config): + try: + with open(self.config_file, "w") as f: + json.dump(config, f, indent=2) + except (IsADirectoryError, PermissionError) as e: + logger.error(f"Error saving config: {e}") + raise diff --git a/code/iottb/logger.py b/code/iottb/logger.py index ea6add2..e69de29 100644 --- a/code/iottb/logger.py +++ b/code/iottb/logger.py @@ -1,28 +0,0 @@ -import logging -import sys -from logging.handlers import RotatingFileHandler - - -def setup_logging(): - logger_obj = logging.getLogger('iottbLogger') - logger_obj.setLevel(logging.DEBUG) - - file_handler = RotatingFileHandler('iottb.log') - console_handler = logging.StreamHandler(sys.stdout) - - file_handler.setLevel(logging.INFO) - console_handler.setLevel(logging.DEBUG) - - file_fmt = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') - console_fmt = logging.Formatter('%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s') - - file_handler.setFormatter(file_fmt) - console_handler.setFormatter(console_fmt) - - logger_obj.addHandler(file_handler) - logger_obj.addHandler(console_handler) - - return logger_obj - - -logger = setup_logging() diff --git a/code/iottb/models/capture_metadata.py b/code/iottb/models/capture_metadata.py new file mode 100644 index 0000000..4c7e1c0 --- /dev/null +++ b/code/iottb/models/capture_metadata.py @@ -0,0 +1,29 @@ +import json +import uuid +from datetime import datetime +from pathlib import Path + + +class CaptureMetadata: + def __init__(self, device_id, capture_dir): + self.device_id = device_id + self.capture_id = str(uuid.uuid4()) + self.capture_date = datetime.now().isoformat() + self.capture_dir = Path(capture_dir) + self.capture_file = "" + self.start_time = "" + self.stop_time = "" + self.tcpdump_command = "" + self.interface = "" + self.device_ip_address = "" + + def build_capture_file_name(self): + self.capture_file = f"{self.device_id}_{self.capture_id}.pcap" + + def to_dict(self): + return self.__dict__ + + def save_to_file(self, file_path=None): + file_path = file_path or self.capture_dir / 'metadata.json' + with open(file_path, 'w') as f: + json.dump(self.to_dict(), f, indent=4) diff --git a/code/iottb/models/device_metadata.py b/code/iottb/models/device_metadata.py new file mode 100644 index 0000000..5c5cb91 --- /dev/null +++ b/code/iottb/models/device_metadata.py @@ -0,0 +1,27 @@ +import json +import uuid +from datetime import datetime +from pathlib import Path + + +class DeviceMetadata: + def __init__(self, device_name, device_root_path): + self.device_name = device_name + self.device_short_name = device_name.lower().replace(' ', '_') + self.device_id = str(uuid.uuid4()) + self.date_created = datetime.now().isoformat() + self.device_root_path = Path(device_root_path) + + def to_dict(self): + return self.__dict__ + + def save_to_file(self): + file_path = self.device_root_path / 'device_metadata.json' + with open(file_path, 'w') as f: + json.dump(self.to_dict(), f, indent=4) + + @classmethod + def load_from_file(cls, file_path): + with open(file_path, 'r') as f: + data = json.load(f) + return cls(**data) diff --git a/code/iottb/pyproject.toml b/code/iottb/pyproject.toml new file mode 100644 index 0000000..580c3e8 --- /dev/null +++ b/code/iottb/pyproject.toml @@ -0,0 +1,18 @@ +[build-system] +requires = ["setuptools>=42", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "iottb" +version = "0.1.0" +authors = [{name = "Sebastian Lenzlinger", email = "sebastian.lenzlinger@unibas.ch"}] +description = "Automation Tool for Capturing Network packets of IoT devices." +requires-python = ">=3.8" +dependencies = [] + +[tool.setuptools.packages.find] +where = ["."] +exclude = ["tests*", "docs*"] + +[project.scripts] +iottb = "iottb.__main__:main" diff --git a/code/iottb/scripts/wifi_ctl.sh b/code/iottb/scripts/wifi_ctl.sh new file mode 100644 index 0000000..076d2fd --- /dev/null +++ b/code/iottb/scripts/wifi_ctl.sh @@ -0,0 +1,55 @@ +#!/usr/bin/env bash +# Note, this is not my original work. Source: https://linuxtldr.com/changing-interface-mode/ + +function list_nic_info () { + ip addr show +} + +function enable_monm_iw () { + interface=$1 + sudo ip link set "$interface" down + sudo iw "$interface" set monitor control + sudo ip link set "$interface" up +} + +function disable_monm_iw () { + interface=$1 + sudo ip link set "$interface" down + sudo iw "$interface" set type managed + sudo ip link set "$interface" up +} + +function enable_monm_iwconfig () { + interface=$1 + sudo ifconfig "$interface" down + sudo iwconfig "$interface" mode monitor + sudo ifconfig "$interface" up +} + +function disable_monm_iwconfig () { + interface=$1 + sudo ifconfig "$interface" down + sudo iwconfig "$interface" mode managed + sudo ifconfig "$interface" up +} + +function enable_monm_acng () { + interface=$1 + sudo airmon-ng check + sudo airmon-ng check kill + sudo airmon-ng start "$interface" +} + +function disable_monm_acng () { + interface="${1}mon" + sudo airmon-ng stop "$interface" + sudo systemctl restart NetworkManager +} + +if declare -f "$1" > /dev/null +then + "$@" +else + echo "Unknown function '$1'" >&2 + exit 1 +fi \ No newline at end of file diff --git a/code/iottb/templates/capture_metadata_template.json b/code/iottb/templates/capture_metadata_template.json new file mode 100644 index 0000000..ec49e81 --- /dev/null +++ b/code/iottb/templates/capture_metadata_template.json @@ -0,0 +1,15 @@ +{ + "device_id": "", + "capture_id": "", + "capture_date": "", + "capture_file": "", + "start_time": "", + "stop_time": "", + "capture_duration": "", + "interfaces": "", + "device_ip_address": "", + "device_mac_address": "", + "contacted_ip_address": [], + "device_firmware_version": "", + "campanion_app": "" +} \ No newline at end of file diff --git a/code/iottb/templates/config_template.json b/code/iottb/templates/config_template.json new file mode 100644 index 0000000..89338dc --- /dev/null +++ b/code/iottb/templates/config_template.json @@ -0,0 +1,4 @@ +{ + "database_path": "~/.iottb.db", + "log_level": "INFO" +} \ No newline at end of file diff --git a/code/iottb/templates/device_metadata_template.json b/code/iottb/templates/device_metadata_template.json new file mode 100644 index 0000000..79a17f1 --- /dev/null +++ b/code/iottb/templates/device_metadata_template.json @@ -0,0 +1,14 @@ +{ + "device_id": "", + "device_name": "", + "device_short_name": "", + "date_created": "", + "description": "", + "model": "", + "manufacturer": "", + "firmware_version": "", + "device_type": "", + "supported_interfaces": "", + "companion_applications": "", + "last_metadata_update": "" +} \ No newline at end of file diff --git a/code/iottb/test/test_Config.py b/code/iottb/test/test_Config.py new file mode 100644 index 0000000..cb21267 --- /dev/null +++ b/code/iottb/test/test_Config.py @@ -0,0 +1,43 @@ +import json +from pathlib import Path +from unittest import mock + +from config import Config + +import unittest + + +class TestConfig(unittest.TestCase): + + def test_creates_new_config_file_if_not_exists(self): + config_path = Path("test_config.json") + if config_path.exists(): + config_path.unlink() + config = Config(config_file=config_path) + self.assertTrue(config_path.exists()) + config_path.unlink() + + def test_writes_default_configuration_to_config_file(self): + config_path = Path("test_config.json") + if config_path.exists(): + config_path.unlink() + config = Config(config_file=config_path) + with open(config_path, "r") as f: + data = json.load(f) + self.assertEqual(data, {"database_path": "~/.iottb.db", "log_level": "INFO"}) + config_path.unlink() + + @unittest.mock.patch("builtins.open", side_effect=PermissionError) + def test_config_file_path_not_writable(self, mock_open): + config_path = Path("test_config.json") + with self.assertRaises(PermissionError): + config = Config(config_file=config_path) + config.create_default_config() + + def test_config_file_path_is_directory(self): + config_dir = Path("test_config_dir") + config_dir.mkdir(exist_ok=True) + with self.assertRaises(IsADirectoryError): + config = Config(config_file=config_dir) + config.create_default_config() + config_dir.rmdir() diff --git a/code/iottb/test/test_ensure_directory_exists.py b/code/iottb/test/test_ensure_directory_exists.py new file mode 100644 index 0000000..ac5b1fd --- /dev/null +++ b/code/iottb/test/test_ensure_directory_exists.py @@ -0,0 +1,38 @@ +from pathlib import Path + +# Generated by CodiumAI +import unittest + +from utils.file_utils import ensure_directory_exists + + +class TestEnsureDirectoryExists(unittest.TestCase): + + # creates directory if it does not exist + def test_creates_directory_if_not_exists(self): + path = Path('/tmp/testdir') + if path.exists(): + path.rmdir() + ensure_directory_exists(path) + self.assertTrue(path.exists()) + path.rmdir() + + # does not create directory if it already exists + def test_does_not_create_directory_if_exists(self): + path = Path('/tmp/testdir') + path.mkdir(exist_ok=True) + ensure_directory_exists(path) + self.assertTrue(path.exists()) + path.rmdir() + + # path is a symbolic link + def test_path_is_a_symbolic_link(self): + target_dir = Path('/tmp/targetdir') + symlink_path = Path('/tmp/symlinkdir') + target_dir.mkdir(exist_ok=True) + symlink_path.symlink_to(target_dir) + ensure_directory_exists(symlink_path) + self.assertTrue(symlink_path.exists()) + self.assertTrue(symlink_path.is_symlink()) + symlink_path.unlink() + target_dir.rmdir() diff --git a/code/iottb/test/test_is_ip_address.py b/code/iottb/test/test_is_ip_address.py new file mode 100644 index 0000000..a13c679 --- /dev/null +++ b/code/iottb/test/test_is_ip_address.py @@ -0,0 +1,62 @@ +from commands.sniff import is_ip_address + +import unittest + + +class TestIsIpAddress(unittest.TestCase): + + def test_valid_ipv4_address_all_octets_in_range(self): + self.assertTrue(is_ip_address("192.168.1.1")) + self.assertTrue(is_ip_address("0.0.0.0")) + self.assertTrue(is_ip_address("255.255.255.255")) + + def test_ipv4_address_with_leading_zeros(self): + self.assertTrue(is_ip_address("192.168.001.001")) + self.assertTrue(is_ip_address("0.0.0.0")) + self.assertTrue(is_ip_address("255.255.255.255")) + + def test_ipv4_address_mixed_single_double_digit_octets(self): + self.assertTrue(is_ip_address("192.168.1.01")) + self.assertTrue(is_ip_address("0.0.0.0")) + self.assertTrue(is_ip_address("255.255.255.255")) + + def test_ipv4_address_maximum_values_in_octets(self): + self.assertTrue(is_ip_address("255.255.255.255")) + self.assertTrue(is_ip_address("0.0.0.0")) + self.assertTrue(is_ip_address("192.168.1.1")) + + + def test_ipv4_address_minimum_values_in_octets(self): + self.assertTrue(is_ip_address("0.0.0.0")) + self.assertTrue(is_ip_address("192.168.1.1")) + self.assertTrue(is_ip_address("255.255.255.255")) + + + def test_ipv4_address_more_than_four_octets_invalid(self): + self.assertFalse(is_ip_address("192.168.1.1.1")) + self.assertFalse(is_ip_address("0.0.0.0.0")) + self.assertFalse(is_ip_address("255.255.255.255.255")) + + + def test_ipv4_address_fewer_than_four_octets_invalid(self): + self.assertFalse(is_ip_address("192.168.1")) + self.assertFalse(is_ip_address("0.0")) + self.assertFalse(is_ip_address("255")) + + + def test_ipv4_address_non_numeric_characters_invalid(self): + self.assertFalse(is_ip_address("192.a.b.c")) + self.assertFalse(is_ip_address("0.x.y.z")) + self.assertFalse(is_ip_address("255.q.w.e")) + + + def test_ipv4_address_octets_out_of_range_invalid(self): + self.assertFalse(is_ip_address("256.256.256.256")) + self.assertFalse(is_ip_address("300.300.300.300")) + self.assertFalse(is_ip_address("999.999.999.999")) + + + def test_ipv4_address_empty_string_invalid(self): + self.assertFalse(is_ip_address("")) + self.assertFalse(is_ip_address(" ")) + self.assertFalse(is_ip_address(None)) diff --git a/code/iottb/test/test_is_mac_address.py b/code/iottb/test/test_is_mac_address.py new file mode 100644 index 0000000..09e11a2 --- /dev/null +++ b/code/iottb/test/test_is_mac_address.py @@ -0,0 +1,64 @@ + +from commands.sniff import is_mac_address + +import unittest + +class TestIsMacAddress(unittest.TestCase): + + + def test_valid_mac_address_lowercase(self): + self.assertTrue(is_mac_address("aa:bb:cc:dd:ee:ff")) + self.assertFalse(is_mac_address("192.168.1.1")) + self.assertFalse(is_mac_address("aa:bb:cc:dd:ee:ff:gg")) + + + def test_valid_mac_address_uppercase(self): + self.assertTrue(is_mac_address("AA:BB:CC:DD:EE:FF")) + self.assertFalse(is_mac_address("10.0.0.1")) + self.assertFalse(is_mac_address("AA:BB:CC:DD:EE")) + + + def test_valid_mac_address_mixed_case(self): + self.assertTrue(is_mac_address("Aa:Bb:Cc:Dd:Ee:Ff")) + self.assertFalse(is_mac_address("172.16.0.1")) + self.assertFalse(is_mac_address("Aa:Bb:Cc:Dd:Ee:Ff:Gg")) + + + def test_valid_mac_address_digits(self): + self.assertTrue(is_mac_address("00:11:22:33:44:55")) + self.assertFalse(is_mac_address("8.8.8.8")) + self.assertFalse(is_mac_address("00:11:22:33:44")) + + # returns False for an empty string + def test_empty_string(self): + self.assertFalse(is_mac_address("")) + self.assertFalse(is_mac_address(":")) + + def test_invalid_characters(self): + self.assertFalse(is_mac_address("gh:ij:kl:mn:op:qr")) + self.assertFalse(is_mac_address("192.168.0.256")) + self.assertFalse(is_mac_address("ghij::klmn::opqr")) + + # returns False for a MAC address with incorrect length + def test_incorrect_length(self): + self.assertFalse(is_mac_address("aa:bb:cc")) + self.assertFalse(is_mac_address("10.0.0.256")) + self.assertFalse(is_mac_address("aa::bb::cc::dd::ee::ff::gg")) + + # returns False for a MAC address with missing colons + def test_missing_colons(self): + self.assertFalse(is_mac_address("aabbccddeeff")) + self.assertFalse(is_mac_address("127.0.0.1")) + self.assertFalse(is_mac_address("aabbccddeeffgg")) + + # returns False for a MAC address with extra colons + def test_extra_colons(self): + self.assertFalse(is_mac_address("aa::bb::cc::dd::ee::ff")) + self.assertFalse(is_mac_address("192.168.1.256")) + self.assertFalse(is_mac_address("aa::bb::cc::dd::ee::ff::gg")) + + # returns False for a MAC address with spaces + def test_spaces_in_mac(self): + self.assertFalse(is_mac_address("aa bb cc dd ee ff")) + self.assertFalse(is_mac_address("8.8.4.4")) + self.assertFalse(is_mac_address("aa bb cc dd ee ff gg")) diff --git a/code/iottb/utils/capture_utils.py b/code/iottb/utils/capture_utils.py index 8c4d60f..d6d73b3 100644 --- a/code/iottb/utils/capture_utils.py +++ b/code/iottb/utils/capture_utils.py @@ -1,44 +1,20 @@ -import uuid from pathlib import Path -from iottb.models.device_metadata_model import dir_contains_device_metadata -from iottb.utils.utils import get_iso_date +from datetime import datetime -def get_capture_uuid(): - return str(uuid.uuid4()) +def get_capture_src_folder(device_path): + today_str = datetime.now().strftime('%Y-%m-%d') + capture_base_path = device_path / today_str + capture_base_path.mkdir(parents=True, exist_ok=True) + + existing_captures = [d for d in capture_base_path.iterdir() if d.is_dir()] + nth_capture = len(existing_captures) + 1 + capture_dir = capture_base_path / f'capture_{nth_capture}' + capture_dir.mkdir(parents=True, exist_ok=True) + + return capture_dir -def get_capture_date_folder(device_root: Path): - today_iso = get_iso_date() - today_folder = device_root / today_iso - if dir_contains_device_metadata(device_root): - if not today_folder.is_dir(): - try: - today_folder.mkdir() - except FileExistsError: - print(f'Folder {today_folder} already exists') - return today_folder - raise FileNotFoundError(f'Given path {device_root} is not a device root directory') - - -def get_capture_src_folder(device_folder: Path): - assert device_folder.is_dir(), f'Given path {device_folder} is not a folder' - today_iso = get_iso_date() - max_sequence_number = 1 - for d in device_folder.iterdir(): - if d.is_dir() and d.name.startswith(f'{today_iso}_capture_'): - name = d.name - num = int(name.split('_')[2]) - max_sequence_number = max(max_sequence_number, num) - - next_sequence_number = max_sequence_number + 1 - return device_folder.joinpath(f'{today_iso}_capture_{next_sequence_number:03}') - - -def make_capture_src_folder(capture_src_folder: Path): - try: - capture_src_folder.mkdir() - except FileExistsError: - print(f'Folder {capture_src_folder} already exists') - finally: - return capture_src_folder +def make_capture_src_folder(capture_src_folder): + capture_src_folder.mkdir(parents=True, exist_ok=True) + return capture_src_folder diff --git a/code/iottb/utils/diagram1.py b/code/iottb/utils/diagram1.py new file mode 100644 index 0000000..28a9657 --- /dev/null +++ b/code/iottb/utils/diagram1.py @@ -0,0 +1,29 @@ +import matplotlib.pyplot as plt +import networkx as nx + +# Create the graph +G1 = nx.DiGraph() + +# Add nodes with positions +G1.add_node("IoT Device", pos=(1, 3)) +G1.add_node("AP", pos=(3, 3)) +G1.add_node("Switch (Port Mirroring Enabled)", pos=(5, 3)) +G1.add_node("Gateway Router", pos=(7, 3)) +G1.add_node("Internet", pos=(9, 3)) +G1.add_node("Capture Device", pos=(5, 1)) + +# Add edges +G1.add_edge("IoT Device", "AP") +G1.add_edge("AP", "Switch (Port Mirroring Enabled)") +G1.add_edge("Switch (Port Mirroring Enabled)", "Gateway Router") +G1.add_edge("Gateway Router", "Internet") +G1.add_edge("Switch (Port Mirroring Enabled)", "Capture Device") + +# Draw the graph +pos = nx.get_node_attributes(G1, 'pos') +plt.figure(figsize=(12, 8)) +nx.draw(G1, pos, with_labels=True, node_size=3000, node_color='lightblue', font_size=10, font_weight='bold') +nx.draw_networkx_edge_labels(G1, pos, edge_labels={("Switch (Port Mirroring Enabled)", "Capture Device"): "Mirrored Traffic"}, font_color='red') + +plt.title("IoT Device Connected via AP to Gateway Router via Switch with Port Mirroring Enabled") +plt.show() diff --git a/code/iottb/utils/diagramm2.py b/code/iottb/utils/diagramm2.py new file mode 100644 index 0000000..4e723da --- /dev/null +++ b/code/iottb/utils/diagramm2.py @@ -0,0 +1,27 @@ +import matplotlib.pyplot as plt +import networkx as nx + +# Create the graph +G2 = nx.DiGraph() + +# Add nodes with positions +G2.add_node("IoT Device", pos=(1, 3)) +G2.add_node("Capture Device (Hotspot)", pos=(3, 3)) +G2.add_node("Ethernet Connection", pos=(5, 3)) +G2.add_node("Gateway Router", pos=(7, 3)) +G2.add_node("Internet", pos=(9, 3)) + +# Add edges +G2.add_edge("IoT Device", "Capture Device (Hotspot)") +G2.add_edge("Capture Device (Hotspot)", "Ethernet Connection") +G2.add_edge("Ethernet Connection", "Gateway Router") +G2.add_edge("Gateway Router", "Internet") + +# Draw the graph +pos = nx.get_node_attributes(G2, 'pos') +plt.figure(figsize=(12, 8)) +nx.draw(G2, pos, with_labels=True, node_size=3000, node_color='lightblue', font_size=10, font_weight='bold') +nx.draw_networkx_edge_labels(G2, pos, edge_labels={("Capture Device (Hotspot)", "Ethernet Connection"): "Bridged Traffic"}, font_color='red') + +plt.title("Capture Device Provides Hotspot and Bridges to Ethernet for Internet") +plt.show() diff --git a/code/iottb/utils/file_utils.py b/code/iottb/utils/file_utils.py new file mode 100644 index 0000000..5ccb42f --- /dev/null +++ b/code/iottb/utils/file_utils.py @@ -0,0 +1,19 @@ +import json +from pathlib import Path + + +def load_json_template(template_path): + with open(template_path, 'r') as f: + return json.load(f) + + +def save_json(data, file_path): + with open(file_path, 'w') as f: + json.dump(data, f, indent=4) + + +def ensure_directory_exists(path): + path = Path(path) + if not path.exists(): + path.mkdir(parents=True, exist_ok=True) + return path diff --git a/code/iottb/utils/tcpdump_utils.py b/code/iottb/utils/tcpdump_utils.py index d9df1d5..53b77e8 100644 --- a/code/iottb/utils/tcpdump_utils.py +++ b/code/iottb/utils/tcpdump_utils.py @@ -1,41 +1,9 @@ -import ipaddress -import shutil import subprocess -from typing import Optional -def check_installed() -> bool: - """Check if tcpdump is installed and available on the system path.""" - return shutil.which('tcpdump') is not None - - -def ensure_installed(): - """Ensure that tcpdump is installed, raise an error if not.""" - if not check_installed(): - raise RuntimeError('tcpdump is not installed. Please install it to continue.') - - -def list_interfaces() -> str: - """List available network interfaces using tcpdump.""" - ensure_installed() +def check_installed(): try: - result = subprocess.run(['tcpdump', '--list-interfaces'], capture_output=True, text=True, check=True) - return result.stdout - except subprocess.CalledProcessError as e: - print(f'Failed to list interfaces: {e}') - return '' - - -def is_valid_ipv4(ip: str) -> bool: - try: - ipaddress.IPv4Address(ip) + subprocess.run(['tcpdump', '--version'], check=True, capture_output=True) return True - except ValueError: + except subprocess.CalledProcessError: return False - -def str_to_ipv4(ip: str) -> (bool, Optional[ipaddress]): - try: - address = ipaddress.IPv4Address(ip) - return address == ipaddress.IPv4Address(ip), address - except ipaddress.AddressValueError: - return False, None diff --git a/notes/journal/2024-05-15-wed.md b/notes/journal/2024-05-15-wed.md index 39af607..2145e86 100644 --- a/notes/journal/2024-05-15-wed.md +++ b/notes/journal/2024-05-15-wed.md @@ -11,4 +11,5 @@ With the above idea it would be possible to also refactor or rewrite how tcpdump I want an option such that one can automatically convert a captures resulting file into a csv. Probably will focus on tcpdump for now, since other tools like [[mitmproxy]] have different output files. ## Defining Experiment -I want a pair of commands that 1. provide a guided cli interface to define an experiment and 2. to run that experiment -> Here [Collective Knowledge Framework](https://github.com/mlcommons/ck) might actually come in handy. The already have tooling for setting up and defining aspects of experiments so that they become reproducible. So maybe one part of the `iottb` as a tool would be to write the correct json files into the directory which contain the informatin on how the command was run. Caveat: All all option values are the same, basically only, if it was used or not (flagging options) or that it was used (e.g. an ip address was used in the filter but the specific value of the ip is of no use for reproducing). Also, Collective Minds tooling relies very common ML algos/framework and static data. So maybe this only comes into play after a capture has been done. So maybe a feature extraction tool (see [[further considerations#Usage paths/ Workflows]]) should create the data and built the database separately. \ No newline at end of file +I want a pair of commands that 1. provide a guided cli interface to define an experiment and 2. to run that experiment -> Here [Collective Knowledge Framework](https://github.com/mlcommons/ck) might actually come in handy. The already have tooling for setting up and defining aspects of experiments so that they become reproducible. So maybe one part of the `iottb` as a tool would be to write the correct json files into the directory which contain the informatin on how the command was run. Caveat: All all option values are the same, basically only, if it was used or not (flagging options) or that it was used (e.g. an ip address was used in the filter but the specific value of the ip is of no use for reproducing). Also, Collective Minds tooling relies very common ML algos/framework and static data. So maybe this only comes into play after a capture has been done. So maybe a feature extraction tool (see [[further considerations#Usage paths/ Workflows]]) should create the data and built the database separately. +#remark TCP dump filter could also be exported into an environment variable? But then again what is the use of defining a conformance, then could use the raw capture idea for tcpdump, too. \ No newline at end of file diff --git a/notes/journal/Untitled.md b/notes/journal/Untitled.md new file mode 100644 index 0000000..e69de29 diff --git a/notes/scrible b/notes/scrible new file mode 100644 index 0000000..3663610 --- /dev/null +++ b/notes/scrible @@ -0,0 +1,13 @@ +`iottb sniff`: + min: nothing + min meaningfull: interface + min usefull: ip/mac addr of dev + good: ip/mac, device type + better: + +`iottb device` + `add`: add new device config + +`iottb db` + `init` initialize device database + `add` add device diff --git a/notes/scrible.py b/notes/scrible.py new file mode 100644 index 0000000..dae9072 --- /dev/null +++ b/notes/scrible.py @@ -0,0 +1,7 @@ +class Config: + db_dir = Path.home() + app_config_dir = Path.home /.Config + db_name = 'IoTtb.db' + app_config_name = 'iottb.conf' + +