diff --git a/code/.gitkeep b/archive/.gitkeep similarity index 100% rename from code/.gitkeep rename to archive/.gitkeep diff --git a/code/iottb/subcommands/__init__.py b/archive/__init__.py similarity index 100% rename from code/iottb/subcommands/__init__.py rename to archive/__init__.py diff --git a/code/tests/__init__.py b/archive/iottb/__init__.py similarity index 100% rename from code/tests/__init__.py rename to archive/iottb/__init__.py diff --git a/archive/iottb/__main__.py b/archive/iottb/__main__.py new file mode 100644 index 0000000..208076e --- /dev/null +++ b/archive/iottb/__main__.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python3 +import argparse +from os import environ +from pathlib import Path +import logging +from archive.iottb.subcommands.add_device import setup_init_device_root_parser +# from iottb.subcommands.capture import setup_capture_parser +from iottb.subcommands.sniff import setup_sniff_parser +from iottb.utils.tcpdump_utils import list_interfaces +from iottb.logger import setup_logging + +logger = logging.getLogger('iottbLogger.__main__') +logger.setLevel(logging.DEBUG) + + +###################### +# Argparse setup +###################### +def setup_argparse(): + # create top level parser + root_parser = argparse.ArgumentParser(prog='iottb') + # shared options + root_parser.add_argument('--verbose', '-v', action='count', default=0) + root_parser.add_argument('--script-mode', action='store_true', help='Run in script mode (non-interactive)') + # Group of args w.r.t iottb.db creation + group = root_parser.add_argument_group('database options') + group.add_argument('--db-home', default=Path.home() / 'IoTtb.db') + group.add_argument('--config-home', default=Path.home() / '.config' / 'iottb.conf', type=Path, ) + group.add_argument('--user', default=Path.home().stem, type=Path, ) + + # configure subcommands + subparsers = root_parser.add_subparsers(title='subcommands', required=True, dest='command') + # setup_capture_parser(subparsers) + setup_init_device_root_parser(subparsers) + setup_sniff_parser(subparsers) + # Utility to list interfaces directly with iottb instead of relying on external tooling + + interfaces_parser = subparsers.add_parser('list-interfaces', aliases=['li', 'if'], + help='List available network interfaces.') + interfaces_parser.set_defaults(func=list_interfaces) + + return root_parser + + +### +# Where put ?! +### +class IoTdb: + def __init__(self, db_home=Path.home() / 'IoTtb.db', iottb_config=Path.home() / '.conf' / 'iottb.conf', + user=Path.home().stem): + self.db_home = db_home + self.config_home = iottb_config + self.default_filters_home = db_home / 'default_filters' + self.user = user + + def create_db(self, mode=0o777, parents=False, exist_ok=False): + logger.info(f'Creating db at {self.db_home}') + try: + self.db_home.mkdir(mode=mode, parents=parents, exist_ok=exist_ok) + except FileExistsError: + logger.error(f'Database path already at {self.db_home} exists and is not a directory') + finally: + logger.debug(f'Leaving finally clause in create_db') + + def create_device_tree(self, mode=0o777, parents=False, exist_ok=False): + logger.info(f'Creating device tree at {self.db_home / 'devices'}') + #TODO + + def parse_db_config(self): + pass + + def parse_iottb_config(self): + pass + + def get_known_devices(self): + pass + + +def iottb_db_exists(db_home=Path.home() / 'IoTtb.db'): + res = db_home.is_dir() + + +def main(): + logger.debug(f'Pre setup_argparse()') + parser = setup_argparse() + logger.debug('Post setup_argparse().') + args = parser.parse_args() + logger.debug(f'Args parsed: {args}') + if args.command: + try: + args.func(args) + except KeyboardInterrupt: + print('Received keyboard interrupt. Exiting...') + exit(1) + except Exception as e: + logger.debug(f'Error in main: {e}') + print(f'Error: {e}') + # create_capture_directory(args.device_name) + + +if __name__ == '__main__': + setup_logging() + logger.debug("Debug level is working") + logger.info("Info level is working") + logger.warning("Warning level is working") + + main() diff --git a/code/iottb/definitions.py b/archive/iottb/definitions.py similarity index 100% rename from code/iottb/definitions.py rename to archive/iottb/definitions.py diff --git a/archive/iottb/logger.py b/archive/iottb/logger.py new file mode 100644 index 0000000..cc0cdb5 --- /dev/null +++ b/archive/iottb/logger.py @@ -0,0 +1,35 @@ +import logging +import sys +import os +from logging.handlers import RotatingFileHandler + + +def setup_logging(): + # Ensure the logs directory exists + log_directory = 'logs' + if not os.path.exists(log_directory): + os.makedirs(log_directory) + + # Create handlers + file_handler = RotatingFileHandler(os.path.join(log_directory, 'iottb.log'), maxBytes=1048576, backupCount=5) + console_handler = logging.StreamHandler(sys.stdout) + + # Create formatters and add it to handlers + file_fmt = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + console_fmt = logging.Formatter( + '%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s') + + file_handler.setFormatter(file_fmt) + console_handler.setFormatter(console_fmt) + + # Get the root logger and add handlers + root_logger = logging.getLogger() + root_logger.setLevel(logging.DEBUG) + root_logger.addHandler(file_handler) + root_logger.addHandler(console_handler) + + # Prevent propagation to the root logger to avoid duplicate logs + root_logger.propagate = False + + +setup_logging() diff --git a/code/tests/fixtures/__init__.py b/archive/iottb/models/__init__.py similarity index 100% rename from code/tests/fixtures/__init__.py rename to archive/iottb/models/__init__.py diff --git a/code/iottb/models/capture_metadata_model.py b/archive/iottb/models/capture_metadata_model.py similarity index 100% rename from code/iottb/models/capture_metadata_model.py rename to archive/iottb/models/capture_metadata_model.py diff --git a/code/iottb/models/device_metadata_model.py b/archive/iottb/models/device_metadata_model.py similarity index 100% rename from code/iottb/models/device_metadata_model.py rename to archive/iottb/models/device_metadata_model.py diff --git a/code/tests/models/test_capture_metadata_model.py b/archive/iottb/subcommands/__init__.py similarity index 100% rename from code/tests/models/test_capture_metadata_model.py rename to archive/iottb/subcommands/__init__.py diff --git a/code/iottb/subcommands/add_device.py b/archive/iottb/subcommands/add_device.py similarity index 100% rename from code/iottb/subcommands/add_device.py rename to archive/iottb/subcommands/add_device.py diff --git a/code/iottb/subcommands/capture.py b/archive/iottb/subcommands/capture.py similarity index 100% rename from code/iottb/subcommands/capture.py rename to archive/iottb/subcommands/capture.py diff --git a/code/iottb/subcommands/sniff.py b/archive/iottb/subcommands/sniff.py similarity index 100% rename from code/iottb/subcommands/sniff.py rename to archive/iottb/subcommands/sniff.py diff --git a/code/tests/models/test_device_metadata_model.py b/archive/iottb/utils/__init__.py similarity index 100% rename from code/tests/models/test_device_metadata_model.py rename to archive/iottb/utils/__init__.py diff --git a/archive/iottb/utils/capture_utils.py b/archive/iottb/utils/capture_utils.py new file mode 100644 index 0000000..8c4d60f --- /dev/null +++ b/archive/iottb/utils/capture_utils.py @@ -0,0 +1,44 @@ +import uuid +from pathlib import Path +from iottb.models.device_metadata_model import dir_contains_device_metadata +from iottb.utils.utils import get_iso_date + + +def get_capture_uuid(): + return str(uuid.uuid4()) + + +def get_capture_date_folder(device_root: Path): + today_iso = get_iso_date() + today_folder = device_root / today_iso + if dir_contains_device_metadata(device_root): + if not today_folder.is_dir(): + try: + today_folder.mkdir() + except FileExistsError: + print(f'Folder {today_folder} already exists') + return today_folder + raise FileNotFoundError(f'Given path {device_root} is not a device root directory') + + +def get_capture_src_folder(device_folder: Path): + assert device_folder.is_dir(), f'Given path {device_folder} is not a folder' + today_iso = get_iso_date() + max_sequence_number = 1 + for d in device_folder.iterdir(): + if d.is_dir() and d.name.startswith(f'{today_iso}_capture_'): + name = d.name + num = int(name.split('_')[2]) + max_sequence_number = max(max_sequence_number, num) + + next_sequence_number = max_sequence_number + 1 + return device_folder.joinpath(f'{today_iso}_capture_{next_sequence_number:03}') + + +def make_capture_src_folder(capture_src_folder: Path): + try: + capture_src_folder.mkdir() + except FileExistsError: + print(f'Folder {capture_src_folder} already exists') + finally: + return capture_src_folder diff --git a/archive/iottb/utils/tcpdump_utils.py b/archive/iottb/utils/tcpdump_utils.py new file mode 100644 index 0000000..6870202 --- /dev/null +++ b/archive/iottb/utils/tcpdump_utils.py @@ -0,0 +1,41 @@ +import ipaddress +import shutil +import subprocess +from typing import Optional + + +def check_installed() -> bool: + """Check if tcpdump is installed and available on the system path.""" + return shutil.which('tcpdump') is not None + + +def ensure_installed(): + """Ensure that tcpdump is installed, raise an error if not.""" + if not check_installed(): + raise RuntimeError('tcpdump is not installed. Please install it to continue.') + + +def list_interfaces(args) -> str: + """List available network interfaces using tcpdump.""" + ensure_installed() + try: + result = subprocess.run(['tcpdump', '--list-interfaces'], capture_output=True, text=True, check=True) + print(result.stdout) + except subprocess.CalledProcessError as e: + print(f'Failed to list interfaces: {e}') + return '' + + +def is_valid_ipv4(ip: str) -> bool: + try: + ipaddress.IPv4Address(ip) + return True + except ValueError: + return False + +def str_to_ipv4(ip: str) -> (bool, Optional[ipaddress]): + try: + address = ipaddress.IPv4Address(ip) + return address == ipaddress.IPv4Address(ip), address + except ipaddress.AddressValueError: + return False, None diff --git a/code/iottb/utils/utils.py b/archive/iottb/utils/utils.py similarity index 100% rename from code/iottb/utils/utils.py rename to archive/iottb/utils/utils.py diff --git a/code/misc/dnsmasq.conf b/archive/misc/dnsmasq.conf similarity index 100% rename from code/misc/dnsmasq.conf rename to archive/misc/dnsmasq.conf diff --git a/code/misc/enable-forwarding.sh b/archive/misc/enable-forwarding.sh similarity index 100% rename from code/misc/enable-forwarding.sh rename to archive/misc/enable-forwarding.sh diff --git a/code/misc/hostapd.conf b/archive/misc/hostapd.conf similarity index 100% rename from code/misc/hostapd.conf rename to archive/misc/hostapd.conf diff --git a/code/misc/hostapd.conf.bak b/archive/misc/hostapd.conf.bak similarity index 100% rename from code/misc/hostapd.conf.bak rename to archive/misc/hostapd.conf.bak diff --git a/code/misc/initSwAP b/archive/misc/initSwAP similarity index 100% rename from code/misc/initSwAP rename to archive/misc/initSwAP diff --git a/code/misc/initSwAP_nftables b/archive/misc/initSwAP_nftables similarity index 100% rename from code/misc/initSwAP_nftables rename to archive/misc/initSwAP_nftables diff --git a/code/misc/make_ap.sh b/archive/misc/make_ap.sh similarity index 100% rename from code/misc/make_ap.sh rename to archive/misc/make_ap.sh diff --git a/code/pyproject.toml b/archive/pyproject.toml similarity index 100% rename from code/pyproject.toml rename to archive/pyproject.toml diff --git a/code/tests/test_main.py b/archive/tests/__init__.py similarity index 100% rename from code/tests/test_main.py rename to archive/tests/__init__.py diff --git a/code/tests/utils/test_device_metadata_utils.py b/archive/tests/fixtures/__init__.py similarity index 100% rename from code/tests/utils/test_device_metadata_utils.py rename to archive/tests/fixtures/__init__.py diff --git a/code/tests/fixtures/shared_fixtures.py b/archive/tests/fixtures/shared_fixtures.py similarity index 100% rename from code/tests/fixtures/shared_fixtures.py rename to archive/tests/fixtures/shared_fixtures.py diff --git a/code/tests/utils/test_tcpdump_utils.py b/archive/tests/models/test_capture_metadata_model.py similarity index 100% rename from code/tests/utils/test_tcpdump_utils.py rename to archive/tests/models/test_capture_metadata_model.py diff --git a/archive/tests/models/test_device_metadata_model.py b/archive/tests/models/test_device_metadata_model.py new file mode 100644 index 0000000..e69de29 diff --git a/code/tests/subcommands/test_add_device.py b/archive/tests/subcommands/test_add_device.py similarity index 100% rename from code/tests/subcommands/test_add_device.py rename to archive/tests/subcommands/test_add_device.py diff --git a/code/tests/test_capture_metadata_model.py b/archive/tests/test_capture_metadata_model.py similarity index 100% rename from code/tests/test_capture_metadata_model.py rename to archive/tests/test_capture_metadata_model.py diff --git a/archive/tests/test_main.py b/archive/tests/test_main.py new file mode 100644 index 0000000..e69de29 diff --git a/code/tests/utils/test_capture_metadata_utils.py b/archive/tests/utils/test_capture_metadata_utils.py similarity index 100% rename from code/tests/utils/test_capture_metadata_utils.py rename to archive/tests/utils/test_capture_metadata_utils.py diff --git a/archive/tests/utils/test_device_metadata_utils.py b/archive/tests/utils/test_device_metadata_utils.py new file mode 100644 index 0000000..e69de29 diff --git a/archive/tests/utils/test_tcpdump_utils.py b/archive/tests/utils/test_tcpdump_utils.py new file mode 100644 index 0000000..e69de29 diff --git a/code/iottb/.gitignore b/code/iottb/.gitignore new file mode 100644 index 0000000..852cc91 --- /dev/null +++ b/code/iottb/.gitignore @@ -0,0 +1,3 @@ +__pycache__ +.venv +iottb.egg-info \ No newline at end of file diff --git a/code/iottb/__main__.py b/code/iottb/__main__.py index 9e6d869..fb99a10 100644 --- a/code/iottb/__main__.py +++ b/code/iottb/__main__.py @@ -1,108 +1,30 @@ -#!/usr/bin/env python3 import argparse -from os import environ -from pathlib import Path import logging -from iottb.subcommands.add_device import setup_init_device_root_parser -# from iottb.subcommands.capture import setup_capture_parser -from iottb.subcommands.sniff import setup_sniff_parser -from iottb.utils.tcpdump_utils import list_interfaces -from iottb.definitions import IOTTB_HOME_ABS, ReturnCodes -from iottb.logger import setup_logging +from pathlib import Path -logger = logging.getLogger('iottbLogger.__main__') -logger.setLevel(logging.DEBUG) +from .commands.sniff import setup_sniff_parser +from .config import Config +from .utils.file_utils import ensure_directory_exists -###################### -# Argparse setup -###################### -def setup_argparse(): - # create top level parser - root_parser = argparse.ArgumentParser(prog='iottb') - # shared options - root_parser.add_argument('--verbose', '-v', action='count', default=0) - - # Group of args w.r.t iottb.db creation - group = root_parser.add_argument_group('database options') - group.add_argument('--db-home', default=Path.home() / 'IoTtb.db') - group.add_argument('--config-home', default=Path.home() / '.config' / 'iottb.conf', type=Path, ) - group.add_argument('--user', default=Path.home().stem, type=Path, ) - - # configure subcommands - subparsers = root_parser.add_subparsers(title='subcommands', required=True, dest='command') - # setup_capture_parser(subparsers) - setup_init_device_root_parser(subparsers) - setup_sniff_parser(subparsers) - # Utility to list interfaces directly with iottb instead of relying on external tooling - - interfaces_parser = subparsers.add_parser('list-interfaces', aliases=['li', 'if'], - help='List available network interfaces.') - interfaces_parser.set_defaults(func=list_interfaces) - - return root_parser - - -### -# Where put ?! -### -class IoTdb: - def __init__(self, db_home=Path.home() / 'IoTtb.db', iottb_config=Path.home() / '.conf' / 'iottb.conf', - user=Path.home().stem): - self.db_home = db_home - self.config_home = iottb_config - self.default_filters_home = db_home / 'default_filters' - self.user = user - - def create_db(self, mode=0o777, parents=False, exist_ok=False): - logger.info(f'Creating db at {self.db_home}') - try: - self.db_home.mkdir(mode=mode, parents=parents, exist_ok=exist_ok) - except FileExistsError: - logger.error(f'Database path already at {self.db_home} exists and is not a directory') - finally: - logger.debug(f'Leaving finally clause in create_db') - - def create_device_tree(self, mode=0o777, parents=False, exist_ok=False): - logger.info(f'Creating device tree at {self.db_home / 'devices'}') - #TODO - - def parse_db_config(self): - pass - - def parse_iottb_config(self): - pass - - def get_known_devices(self): - pass - - -def iottb_db_exists(db_home=Path.home() / 'IoTtb.db'): - res = db_home.is_dir() +def setup_logging(): + logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(name)s: %(message)s') def main(): - logger.debug(f'Pre setup_argparse()') - parser = setup_argparse() - logger.debug('Post setup_argparse().') - args = parser.parse_args() - logger.debug(f'Args parsed: {args}') - if args.command: - try: - args.func(args) - except KeyboardInterrupt: - print('Received keyboard interrupt. Exiting...') - exit(1) - except Exception as e: - logger.debug(f'Error in main: {e}') - print(f'Error: {e}') - # create_capture_directory(args.device_name) - - -if __name__ == '__main__': setup_logging() - logger.debug("Debug level is working") - logger.info("Info level is working") - logger.warning("Warning level is working") + parser = argparse.ArgumentParser(description='IoT Testbed') + subparsers = parser.add_subparsers() + + setup_sniff_parser(subparsers) + + args = parser.parse_args() + if hasattr(args, 'func'): + args.func(args) + else: + parser.print_help() + + +if __name__ == "__main__": main() diff --git a/code/iottb/commands/__init__.py b/code/iottb/commands/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/code/iottb/commands/sniff.py b/code/iottb/commands/sniff.py new file mode 100644 index 0000000..f8e97ad --- /dev/null +++ b/code/iottb/commands/sniff.py @@ -0,0 +1,108 @@ +import subprocess +import re +from datetime import datetime +from pathlib import Path +import logging +from models.capture_metadata import CaptureMetadata +from models.device_metadata import DeviceMetadata +from utils.capture_utils import get_capture_src_folder, make_capture_src_folder +from utils.tcpdump_utils import check_installed +from utils.file_utils import ensure_directory_exists +from config import Config + +logger = logging.getLogger('iottb.sniff') + + +def is_ip_address(address): + ip_pattern = re.compile(r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$") + return ip_pattern.match(address) is not None + + +def is_mac_address(address): + mac_pattern = re.compile(r"^([0-9A-Fa-f]{2}:){5}[0-9A-Fa-f]{2}$") + return mac_pattern.match(address) is not None + + +class Sniffer: + def __init__(self, device_id, capture_interface, address=None, safe=True): + #TODO Decide if we want to use the device_id as the device_name (seems unhandy) + self.device_id = device_id + self.capture_interface = capture_interface + self.address = address + self.safe = safe + self.config = Config().load_config() + self.device_path = self.initialize_device(device_id) + self.filter = self.generate_filter() + + def initialize_device(self, device_id): + db_path = Path(self.config.get('database_path', '~/iottb.db')).expanduser() + device_path = db_path / device_id + ensure_directory_exists(device_path) + + metadata_file = device_path / 'device_metadata.json' + if not metadata_file.exists(): + device_metadata = DeviceMetadata(device_name=device_id, device_root_path=device_path) + device_metadata.save_to_file() + return device_path + + def get_capture_metadata(self, capture_dir): + metadata = CaptureMetadata(device_id=self.device_id, capture_dir=capture_dir) + metadata.build_capture_file_name() + metadata.interface = self.capture_interface + metadata.device_ip_address = self.address or "No IP Address set" + return metadata + + def generate_filter(self): + + if not self.address and self.safe: + raise ValueError("Address must be provided in safe mode.") + + if is_ip_address(self.address): + return f"host {self.address}" + elif is_mac_address(self.address): + return f"ether host {self.address}" + else: + raise ValueError("Invalid address format.") + + def capture(self): + if not check_installed(): + print('Please install tcpdump first') + return + + capture_dir = make_capture_src_folder(get_capture_src_folder(self.device_path)) + metadata = self.get_capture_metadata(capture_dir) + pcap_file = capture_dir / metadata.capture_file + cmd = ['sudo', 'tcpdump', '-i', self.capture_interface, '-w', str(pcap_file)] + + if self.filter: + cmd.append(self.filter) + + metadata.tcpdump_command = ' '.join(cmd) + print(f'Executing: {metadata.tcpdump_command}') + + try: + metadata.start_time = datetime.now().isoformat() + subprocess.run(cmd, check=True) + metadata.stop_time = datetime.now().isoformat() + except subprocess.CalledProcessError as e: + logger.error(f'Failed to capture packets: {e}') + return + + metadata.save_to_file() + print(f"Capture complete. Metadata saved to {capture_dir / 'metadata.json'}") + + +def setup_sniff_parser(subparsers): + parser = subparsers.add_parser('sniff', help='Sniff packets with tcpdump') + parser.add_argument('device_id', help='ID of the device to sniff') + parser.add_argument('-i', '--interface', required=True, help='Network interface to capture on') + parser.add_argument('-a', '--address', help='IP or MAC address to filter packets by') + parser.add_argument('-u', '--unsafe', action='store_true', help='Run in unsafe mode without supplying an address. ' + 'Highly discouraged.') + parser.set_defaults(func=handle_sniff) + + +def handle_sniff(args): + sniffer = Sniffer(device_id=args.device_id, capture_interface=args.interface, address=args.address, + safe=not args.unsafe) + sniffer.capture() diff --git a/code/iottb/config.json b/code/iottb/config.json new file mode 100644 index 0000000..ec311ec --- /dev/null +++ b/code/iottb/config.json @@ -0,0 +1 @@ +{"database_path": "~/.iottb.db", "log_level": "INFO"} \ No newline at end of file diff --git a/code/iottb/config.py b/code/iottb/config.py new file mode 100644 index 0000000..6c0e532 --- /dev/null +++ b/code/iottb/config.py @@ -0,0 +1,45 @@ +import json +from pathlib import Path +import logging + +logger = logging.getLogger('iottb.config') + + +class Config: + DEFAULT_CONFIG = { + "database_path": "~/.iottb.db", + "log_level": "INFO" + } + + def __init__(self, config_file=None): + self.config_file = Path(config_file or "config.json") + if not self.config_file.exists(): + self.create_default_config() + else: + self.config = self.load_config() + + def create_default_config(self): + try: + self.save_config(self.DEFAULT_CONFIG) + except (IsADirectoryError, PermissionError) as e: + logger.error(f"Error creating default config: {e}") + raise + + def load_config(self): + try: + with open(self.config_file, "r") as file: + return json.load(file) + except IsADirectoryError as e: + logger.error(f"Error loading config: {e}") + raise + except PermissionError as e: + logger.error(f"Error loading config: {e}") + raise + + def save_config(self, config): + try: + with open(self.config_file, "w") as f: + json.dump(config, f, indent=2) + except (IsADirectoryError, PermissionError) as e: + logger.error(f"Error saving config: {e}") + raise diff --git a/code/iottb/logger.py b/code/iottb/logger.py index cc0cdb5..e69de29 100644 --- a/code/iottb/logger.py +++ b/code/iottb/logger.py @@ -1,35 +0,0 @@ -import logging -import sys -import os -from logging.handlers import RotatingFileHandler - - -def setup_logging(): - # Ensure the logs directory exists - log_directory = 'logs' - if not os.path.exists(log_directory): - os.makedirs(log_directory) - - # Create handlers - file_handler = RotatingFileHandler(os.path.join(log_directory, 'iottb.log'), maxBytes=1048576, backupCount=5) - console_handler = logging.StreamHandler(sys.stdout) - - # Create formatters and add it to handlers - file_fmt = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') - console_fmt = logging.Formatter( - '%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s') - - file_handler.setFormatter(file_fmt) - console_handler.setFormatter(console_fmt) - - # Get the root logger and add handlers - root_logger = logging.getLogger() - root_logger.setLevel(logging.DEBUG) - root_logger.addHandler(file_handler) - root_logger.addHandler(console_handler) - - # Prevent propagation to the root logger to avoid duplicate logs - root_logger.propagate = False - - -setup_logging() diff --git a/code/iottb/models/capture_metadata.py b/code/iottb/models/capture_metadata.py new file mode 100644 index 0000000..4c7e1c0 --- /dev/null +++ b/code/iottb/models/capture_metadata.py @@ -0,0 +1,29 @@ +import json +import uuid +from datetime import datetime +from pathlib import Path + + +class CaptureMetadata: + def __init__(self, device_id, capture_dir): + self.device_id = device_id + self.capture_id = str(uuid.uuid4()) + self.capture_date = datetime.now().isoformat() + self.capture_dir = Path(capture_dir) + self.capture_file = "" + self.start_time = "" + self.stop_time = "" + self.tcpdump_command = "" + self.interface = "" + self.device_ip_address = "" + + def build_capture_file_name(self): + self.capture_file = f"{self.device_id}_{self.capture_id}.pcap" + + def to_dict(self): + return self.__dict__ + + def save_to_file(self, file_path=None): + file_path = file_path or self.capture_dir / 'metadata.json' + with open(file_path, 'w') as f: + json.dump(self.to_dict(), f, indent=4) diff --git a/code/iottb/models/device_metadata.py b/code/iottb/models/device_metadata.py new file mode 100644 index 0000000..5c5cb91 --- /dev/null +++ b/code/iottb/models/device_metadata.py @@ -0,0 +1,27 @@ +import json +import uuid +from datetime import datetime +from pathlib import Path + + +class DeviceMetadata: + def __init__(self, device_name, device_root_path): + self.device_name = device_name + self.device_short_name = device_name.lower().replace(' ', '_') + self.device_id = str(uuid.uuid4()) + self.date_created = datetime.now().isoformat() + self.device_root_path = Path(device_root_path) + + def to_dict(self): + return self.__dict__ + + def save_to_file(self): + file_path = self.device_root_path / 'device_metadata.json' + with open(file_path, 'w') as f: + json.dump(self.to_dict(), f, indent=4) + + @classmethod + def load_from_file(cls, file_path): + with open(file_path, 'r') as f: + data = json.load(f) + return cls(**data) diff --git a/code/iottb/pyproject.toml b/code/iottb/pyproject.toml new file mode 100644 index 0000000..580c3e8 --- /dev/null +++ b/code/iottb/pyproject.toml @@ -0,0 +1,18 @@ +[build-system] +requires = ["setuptools>=42", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "iottb" +version = "0.1.0" +authors = [{name = "Sebastian Lenzlinger", email = "sebastian.lenzlinger@unibas.ch"}] +description = "Automation Tool for Capturing Network packets of IoT devices." +requires-python = ">=3.8" +dependencies = [] + +[tool.setuptools.packages.find] +where = ["."] +exclude = ["tests*", "docs*"] + +[project.scripts] +iottb = "iottb.__main__:main" diff --git a/code/iottb/templates/capture_metadata_template.json b/code/iottb/templates/capture_metadata_template.json new file mode 100644 index 0000000..ec49e81 --- /dev/null +++ b/code/iottb/templates/capture_metadata_template.json @@ -0,0 +1,15 @@ +{ + "device_id": "", + "capture_id": "", + "capture_date": "", + "capture_file": "", + "start_time": "", + "stop_time": "", + "capture_duration": "", + "interfaces": "", + "device_ip_address": "", + "device_mac_address": "", + "contacted_ip_address": [], + "device_firmware_version": "", + "campanion_app": "" +} \ No newline at end of file diff --git a/code/iottb/templates/config_template.json b/code/iottb/templates/config_template.json new file mode 100644 index 0000000..89338dc --- /dev/null +++ b/code/iottb/templates/config_template.json @@ -0,0 +1,4 @@ +{ + "database_path": "~/.iottb.db", + "log_level": "INFO" +} \ No newline at end of file diff --git a/code/iottb/templates/device_metadata_template.json b/code/iottb/templates/device_metadata_template.json new file mode 100644 index 0000000..79a17f1 --- /dev/null +++ b/code/iottb/templates/device_metadata_template.json @@ -0,0 +1,14 @@ +{ + "device_id": "", + "device_name": "", + "device_short_name": "", + "date_created": "", + "description": "", + "model": "", + "manufacturer": "", + "firmware_version": "", + "device_type": "", + "supported_interfaces": "", + "companion_applications": "", + "last_metadata_update": "" +} \ No newline at end of file diff --git a/code/iottb/test/test_Config.py b/code/iottb/test/test_Config.py new file mode 100644 index 0000000..cb21267 --- /dev/null +++ b/code/iottb/test/test_Config.py @@ -0,0 +1,43 @@ +import json +from pathlib import Path +from unittest import mock + +from config import Config + +import unittest + + +class TestConfig(unittest.TestCase): + + def test_creates_new_config_file_if_not_exists(self): + config_path = Path("test_config.json") + if config_path.exists(): + config_path.unlink() + config = Config(config_file=config_path) + self.assertTrue(config_path.exists()) + config_path.unlink() + + def test_writes_default_configuration_to_config_file(self): + config_path = Path("test_config.json") + if config_path.exists(): + config_path.unlink() + config = Config(config_file=config_path) + with open(config_path, "r") as f: + data = json.load(f) + self.assertEqual(data, {"database_path": "~/.iottb.db", "log_level": "INFO"}) + config_path.unlink() + + @unittest.mock.patch("builtins.open", side_effect=PermissionError) + def test_config_file_path_not_writable(self, mock_open): + config_path = Path("test_config.json") + with self.assertRaises(PermissionError): + config = Config(config_file=config_path) + config.create_default_config() + + def test_config_file_path_is_directory(self): + config_dir = Path("test_config_dir") + config_dir.mkdir(exist_ok=True) + with self.assertRaises(IsADirectoryError): + config = Config(config_file=config_dir) + config.create_default_config() + config_dir.rmdir() diff --git a/code/iottb/test/test_ensure_directory_exists.py b/code/iottb/test/test_ensure_directory_exists.py new file mode 100644 index 0000000..ac5b1fd --- /dev/null +++ b/code/iottb/test/test_ensure_directory_exists.py @@ -0,0 +1,38 @@ +from pathlib import Path + +# Generated by CodiumAI +import unittest + +from utils.file_utils import ensure_directory_exists + + +class TestEnsureDirectoryExists(unittest.TestCase): + + # creates directory if it does not exist + def test_creates_directory_if_not_exists(self): + path = Path('/tmp/testdir') + if path.exists(): + path.rmdir() + ensure_directory_exists(path) + self.assertTrue(path.exists()) + path.rmdir() + + # does not create directory if it already exists + def test_does_not_create_directory_if_exists(self): + path = Path('/tmp/testdir') + path.mkdir(exist_ok=True) + ensure_directory_exists(path) + self.assertTrue(path.exists()) + path.rmdir() + + # path is a symbolic link + def test_path_is_a_symbolic_link(self): + target_dir = Path('/tmp/targetdir') + symlink_path = Path('/tmp/symlinkdir') + target_dir.mkdir(exist_ok=True) + symlink_path.symlink_to(target_dir) + ensure_directory_exists(symlink_path) + self.assertTrue(symlink_path.exists()) + self.assertTrue(symlink_path.is_symlink()) + symlink_path.unlink() + target_dir.rmdir() diff --git a/code/iottb/test/test_is_ip_address.py b/code/iottb/test/test_is_ip_address.py new file mode 100644 index 0000000..a13c679 --- /dev/null +++ b/code/iottb/test/test_is_ip_address.py @@ -0,0 +1,62 @@ +from commands.sniff import is_ip_address + +import unittest + + +class TestIsIpAddress(unittest.TestCase): + + def test_valid_ipv4_address_all_octets_in_range(self): + self.assertTrue(is_ip_address("192.168.1.1")) + self.assertTrue(is_ip_address("0.0.0.0")) + self.assertTrue(is_ip_address("255.255.255.255")) + + def test_ipv4_address_with_leading_zeros(self): + self.assertTrue(is_ip_address("192.168.001.001")) + self.assertTrue(is_ip_address("0.0.0.0")) + self.assertTrue(is_ip_address("255.255.255.255")) + + def test_ipv4_address_mixed_single_double_digit_octets(self): + self.assertTrue(is_ip_address("192.168.1.01")) + self.assertTrue(is_ip_address("0.0.0.0")) + self.assertTrue(is_ip_address("255.255.255.255")) + + def test_ipv4_address_maximum_values_in_octets(self): + self.assertTrue(is_ip_address("255.255.255.255")) + self.assertTrue(is_ip_address("0.0.0.0")) + self.assertTrue(is_ip_address("192.168.1.1")) + + + def test_ipv4_address_minimum_values_in_octets(self): + self.assertTrue(is_ip_address("0.0.0.0")) + self.assertTrue(is_ip_address("192.168.1.1")) + self.assertTrue(is_ip_address("255.255.255.255")) + + + def test_ipv4_address_more_than_four_octets_invalid(self): + self.assertFalse(is_ip_address("192.168.1.1.1")) + self.assertFalse(is_ip_address("0.0.0.0.0")) + self.assertFalse(is_ip_address("255.255.255.255.255")) + + + def test_ipv4_address_fewer_than_four_octets_invalid(self): + self.assertFalse(is_ip_address("192.168.1")) + self.assertFalse(is_ip_address("0.0")) + self.assertFalse(is_ip_address("255")) + + + def test_ipv4_address_non_numeric_characters_invalid(self): + self.assertFalse(is_ip_address("192.a.b.c")) + self.assertFalse(is_ip_address("0.x.y.z")) + self.assertFalse(is_ip_address("255.q.w.e")) + + + def test_ipv4_address_octets_out_of_range_invalid(self): + self.assertFalse(is_ip_address("256.256.256.256")) + self.assertFalse(is_ip_address("300.300.300.300")) + self.assertFalse(is_ip_address("999.999.999.999")) + + + def test_ipv4_address_empty_string_invalid(self): + self.assertFalse(is_ip_address("")) + self.assertFalse(is_ip_address(" ")) + self.assertFalse(is_ip_address(None)) diff --git a/code/iottb/test/test_is_mac_address.py b/code/iottb/test/test_is_mac_address.py new file mode 100644 index 0000000..09e11a2 --- /dev/null +++ b/code/iottb/test/test_is_mac_address.py @@ -0,0 +1,64 @@ + +from commands.sniff import is_mac_address + +import unittest + +class TestIsMacAddress(unittest.TestCase): + + + def test_valid_mac_address_lowercase(self): + self.assertTrue(is_mac_address("aa:bb:cc:dd:ee:ff")) + self.assertFalse(is_mac_address("192.168.1.1")) + self.assertFalse(is_mac_address("aa:bb:cc:dd:ee:ff:gg")) + + + def test_valid_mac_address_uppercase(self): + self.assertTrue(is_mac_address("AA:BB:CC:DD:EE:FF")) + self.assertFalse(is_mac_address("10.0.0.1")) + self.assertFalse(is_mac_address("AA:BB:CC:DD:EE")) + + + def test_valid_mac_address_mixed_case(self): + self.assertTrue(is_mac_address("Aa:Bb:Cc:Dd:Ee:Ff")) + self.assertFalse(is_mac_address("172.16.0.1")) + self.assertFalse(is_mac_address("Aa:Bb:Cc:Dd:Ee:Ff:Gg")) + + + def test_valid_mac_address_digits(self): + self.assertTrue(is_mac_address("00:11:22:33:44:55")) + self.assertFalse(is_mac_address("8.8.8.8")) + self.assertFalse(is_mac_address("00:11:22:33:44")) + + # returns False for an empty string + def test_empty_string(self): + self.assertFalse(is_mac_address("")) + self.assertFalse(is_mac_address(":")) + + def test_invalid_characters(self): + self.assertFalse(is_mac_address("gh:ij:kl:mn:op:qr")) + self.assertFalse(is_mac_address("192.168.0.256")) + self.assertFalse(is_mac_address("ghij::klmn::opqr")) + + # returns False for a MAC address with incorrect length + def test_incorrect_length(self): + self.assertFalse(is_mac_address("aa:bb:cc")) + self.assertFalse(is_mac_address("10.0.0.256")) + self.assertFalse(is_mac_address("aa::bb::cc::dd::ee::ff::gg")) + + # returns False for a MAC address with missing colons + def test_missing_colons(self): + self.assertFalse(is_mac_address("aabbccddeeff")) + self.assertFalse(is_mac_address("127.0.0.1")) + self.assertFalse(is_mac_address("aabbccddeeffgg")) + + # returns False for a MAC address with extra colons + def test_extra_colons(self): + self.assertFalse(is_mac_address("aa::bb::cc::dd::ee::ff")) + self.assertFalse(is_mac_address("192.168.1.256")) + self.assertFalse(is_mac_address("aa::bb::cc::dd::ee::ff::gg")) + + # returns False for a MAC address with spaces + def test_spaces_in_mac(self): + self.assertFalse(is_mac_address("aa bb cc dd ee ff")) + self.assertFalse(is_mac_address("8.8.4.4")) + self.assertFalse(is_mac_address("aa bb cc dd ee ff gg")) diff --git a/code/iottb/utils/capture_utils.py b/code/iottb/utils/capture_utils.py index 8c4d60f..d6d73b3 100644 --- a/code/iottb/utils/capture_utils.py +++ b/code/iottb/utils/capture_utils.py @@ -1,44 +1,20 @@ -import uuid from pathlib import Path -from iottb.models.device_metadata_model import dir_contains_device_metadata -from iottb.utils.utils import get_iso_date +from datetime import datetime -def get_capture_uuid(): - return str(uuid.uuid4()) +def get_capture_src_folder(device_path): + today_str = datetime.now().strftime('%Y-%m-%d') + capture_base_path = device_path / today_str + capture_base_path.mkdir(parents=True, exist_ok=True) + + existing_captures = [d for d in capture_base_path.iterdir() if d.is_dir()] + nth_capture = len(existing_captures) + 1 + capture_dir = capture_base_path / f'capture_{nth_capture}' + capture_dir.mkdir(parents=True, exist_ok=True) + + return capture_dir -def get_capture_date_folder(device_root: Path): - today_iso = get_iso_date() - today_folder = device_root / today_iso - if dir_contains_device_metadata(device_root): - if not today_folder.is_dir(): - try: - today_folder.mkdir() - except FileExistsError: - print(f'Folder {today_folder} already exists') - return today_folder - raise FileNotFoundError(f'Given path {device_root} is not a device root directory') - - -def get_capture_src_folder(device_folder: Path): - assert device_folder.is_dir(), f'Given path {device_folder} is not a folder' - today_iso = get_iso_date() - max_sequence_number = 1 - for d in device_folder.iterdir(): - if d.is_dir() and d.name.startswith(f'{today_iso}_capture_'): - name = d.name - num = int(name.split('_')[2]) - max_sequence_number = max(max_sequence_number, num) - - next_sequence_number = max_sequence_number + 1 - return device_folder.joinpath(f'{today_iso}_capture_{next_sequence_number:03}') - - -def make_capture_src_folder(capture_src_folder: Path): - try: - capture_src_folder.mkdir() - except FileExistsError: - print(f'Folder {capture_src_folder} already exists') - finally: - return capture_src_folder +def make_capture_src_folder(capture_src_folder): + capture_src_folder.mkdir(parents=True, exist_ok=True) + return capture_src_folder diff --git a/code/iottb/utils/file_utils.py b/code/iottb/utils/file_utils.py new file mode 100644 index 0000000..5ccb42f --- /dev/null +++ b/code/iottb/utils/file_utils.py @@ -0,0 +1,19 @@ +import json +from pathlib import Path + + +def load_json_template(template_path): + with open(template_path, 'r') as f: + return json.load(f) + + +def save_json(data, file_path): + with open(file_path, 'w') as f: + json.dump(data, f, indent=4) + + +def ensure_directory_exists(path): + path = Path(path) + if not path.exists(): + path.mkdir(parents=True, exist_ok=True) + return path diff --git a/code/iottb/utils/tcpdump_utils.py b/code/iottb/utils/tcpdump_utils.py index 6870202..53b77e8 100644 --- a/code/iottb/utils/tcpdump_utils.py +++ b/code/iottb/utils/tcpdump_utils.py @@ -1,41 +1,9 @@ -import ipaddress -import shutil import subprocess -from typing import Optional -def check_installed() -> bool: - """Check if tcpdump is installed and available on the system path.""" - return shutil.which('tcpdump') is not None - - -def ensure_installed(): - """Ensure that tcpdump is installed, raise an error if not.""" - if not check_installed(): - raise RuntimeError('tcpdump is not installed. Please install it to continue.') - - -def list_interfaces(args) -> str: - """List available network interfaces using tcpdump.""" - ensure_installed() +def check_installed(): try: - result = subprocess.run(['tcpdump', '--list-interfaces'], capture_output=True, text=True, check=True) - print(result.stdout) - except subprocess.CalledProcessError as e: - print(f'Failed to list interfaces: {e}') - return '' - - -def is_valid_ipv4(ip: str) -> bool: - try: - ipaddress.IPv4Address(ip) + subprocess.run(['tcpdump', '--version'], check=True, capture_output=True) return True - except ValueError: + except subprocess.CalledProcessError: return False - -def str_to_ipv4(ip: str) -> (bool, Optional[ipaddress]): - try: - address = ipaddress.IPv4Address(ip) - return address == ipaddress.IPv4Address(ip), address - except ipaddress.AddressValueError: - return False, None