diff --git a/.idea/webResources.xml b/.idea/webResources.xml deleted file mode 100644 index aa647dc..0000000 --- a/.idea/webResources.xml +++ /dev/null @@ -1,14 +0,0 @@ - - - - - - - - - - - - - - \ No newline at end of file diff --git a/code/iottb-project/.gitignore b/code/iottb-project/.gitignore new file mode 100644 index 0000000..a457143 --- /dev/null +++ b/code/iottb-project/.gitignore @@ -0,0 +1,36 @@ +__pycache__ +.venv +iottb.egg-info +.idea +*.log +logs/ +*.pyc +.obsidian + +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/**/usage.statistics.xml +.idea/**/dictionaries +.idea/**/shelf + +# AWS User-specific +.idea/**/aws.xml + +# Generated files +.idea/**/contentModel.xml + +# Sensitive or high-churn files +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml +.idea/**/dbnavigator.xml + +.private/ +*.pcap \ No newline at end of file diff --git a/code/iottb-project/README.md b/code/iottb-project/README.md new file mode 100644 index 0000000..d7a21e2 --- /dev/null +++ b/code/iottb-project/README.md @@ -0,0 +1,9 @@ +# Iottb +## Basic Invocation + +## Configuration +### Env Vars +- IOTTB_CONF_HOME + +By setting this variable you control where the basic iottb application +configuration should be looked for \ No newline at end of file diff --git a/code/iottb-project/iottb/__init__.py b/code/iottb-project/iottb/__init__.py new file mode 100644 index 0000000..1438731 --- /dev/null +++ b/code/iottb-project/iottb/__init__.py @@ -0,0 +1,11 @@ +from iottb import definitions +import logging +from iottb.utils.user_interaction import tb_echo +import click + +click.echo = tb_echo # This is very hacky +logging.basicConfig(level=definitions.LOGLEVEL) +log_dir = definitions.LOGDIR +# Ensure logs dir exists before new handlers are registered in main.py +if not log_dir.is_dir(): + log_dir.mkdir() diff --git a/code/iottb-project/iottb/commands/__init__.py b/code/iottb-project/iottb/commands/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/code/iottb-project/iottb/commands/add_device.py b/code/iottb-project/iottb/commands/add_device.py new file mode 100644 index 0000000..5524ea7 --- /dev/null +++ b/code/iottb-project/iottb/commands/add_device.py @@ -0,0 +1,89 @@ +import json + +import click +from pathlib import Path +import logging +import re + +from iottb import definitions +from iottb.models.device_metadata import DeviceMetadata +from iottb.models.iottb_config import IottbConfig +from iottb.definitions import CFG_FILE_PATH, TB_ECHO_STYLES + +logger = logging.getLogger(__name__) + + +def add_device_guided(ctx, cn, db): + click.echo('TODO: Implement') + logger.info('Adding device interactively') + #logger.debug(f'Parameters: {params}. value: {value}') + + +@click.command('add-device', help='Add a device to a database') +@click.option('--dev', '--device-name', type=str, required=True, + help='The name of the device to be added. If this string contains spaces or other special characters \ + normalization is performed to derive a canonical name') +@click.option('--db', '--database', type=click.Path(exists=True, file_okay=False, dir_okay=True), + envvar='IOTTB_DB', show_envvar=True, + help='Database in which to add this device. If not specified use default from config.') +@click.option('--guided', is_flag=True, default=False, show_default=True, envvar='IOTTB_GUIDED_ADD', show_envvar=True, + help='Add device interactively') +def add_device(dev, db, guided): + """Add a new device to a database + + Device name must be supplied unless in an interactive setup. Database is taken from config by default. + """ + logger.info('add-device invoked') + + # Step 1: Load Config + # Dependency: Config file must exist + config = IottbConfig(Path(CFG_FILE_PATH)) + logger.debug(f'Config loaded: {config}') + + # Step 2: Load database + # dependency: Database folder must exist + if db: + database = db + path = config.db_path_dict[database] + logger.debug(f'Resolved (path, db) {path}, {database}') + else: + path = config.default_db_location + database = config.default_database + logger.debug(f'Default (path, db) {path}, {database}') + click.secho(f'Using database {database}') + full_db_path = Path(path) / database + if not full_db_path.is_dir(): + logger.warning(f'No database at {database}') + click.echo(f'Could not find a database.') + click.echo(f'You need to initialize the testbed before before you add devices!') + click.echo(f'To initialize the testbed in the default location run "iottb init-db"') + click.echo('Exiting...') + exit() + + # Step 3: Check if device already exists in database + # dependency: DeviceMetadata object + device_metadata = DeviceMetadata(device_name=dev) + device_dir = full_db_path / device_metadata.canonical_name + + # Check if device is already registered + if device_dir.exists(): + logger.warning(f'Device directory {device_dir} already exists.') + click.echo(f'Device {dev} already exists in the database.') + click.echo('Exiting...') + exit() + try: + device_dir.mkdir() + except OSError as e: + logger.error(f'Error trying to create device {e}') + click.echo('Exiting...') + exit() + + # Step 4: Save metadata into device_dir + metadata_path = device_dir / definitions.DEVICE_METADATA_FILE_NAME + with metadata_path.open('w') as metadata_file: + json.dump(device_metadata.__dict__, metadata_file, indent=4) + click.echo(f'Successfully added device {dev} to database') + logger.debug(f'Added device {dev} to database {database}. Full path of metadata {metadata_path}') + logger.info(f'Metadata for {dev} {device_metadata.print_attributes()}') + + diff --git a/code/iottb-project/iottb/commands/developer.py b/code/iottb-project/iottb/commands/developer.py new file mode 100644 index 0000000..d1e992b --- /dev/null +++ b/code/iottb-project/iottb/commands/developer.py @@ -0,0 +1,130 @@ +from pathlib import Path +import logging +import click + +from iottb import tb_echo +from iottb.definitions import DB_NAME, CFG_FILE_PATH +from iottb.models.iottb_config import IottbConfig + +logger = logging.getLogger(__name__) + + +@click.group('util') +def tb(): + pass + + +@click.command() +@click.option('--file', default=DB_NAME) +@click.option('--table', type=str, default='DefaultDatabase') +@click.option('--key') +@click.option('--value') +@click.pass_context +def set_key_in_table_to(ctx, file, table, key, value): + """Edit config or metadata files. TODO: Implement""" + click.echo(f'set_key_in_table_to invoked') + logger.warning("Unimplemented subcommand invoked.") + + +@click.command() +@click.confirmation_option(prompt="Are you certain that you want to delete the cfg file?") +def rm_cfg(): + """ Removes the cfg file from the filesystem. + + This is mostly a utility during development. Once non-standard database locations are implemented, + deleting this would lead to iottb not being able to find them anymore. + """ + Path(CFG_FILE_PATH).unlink() + click.echo(f'Iottb configuration removed at {CFG_FILE_PATH}') + + +@click.command() +@click.confirmation_option(prompt="Are you certain that you want to delete the databases file?") +def rm_dbs(dbs): + """ Removes ALL(!) databases from the filesystem if they're empty. + + Development utility currently unfit for use. + """ + config = IottbConfig() + paths = config.get_know_database_paths() + logger.debug(f'Known db paths: {str(paths)}') + for dbs in paths: + try: + Path(dbs).rmdir() + click.echo(f'{dbs} deleted') + except Exception as e: + logger.debug(f'Failed unlinking db {dbs} with error {e}') + logger.info(f'All databases deleted') + + +@click.command('show-cfg', help='Show the current configuration context') +@click.option('--cfg-file', type=click.Path(), default=CFG_FILE_PATH, help='Path to the config file') +@click.option('-pp', is_flag=True, default=False, help='Pretty Print') +@click.pass_context +def show_cfg(ctx, cfg_file, pp): + logger.debug(f'Pretty print option set to {pp}') + if pp: + try: + config = IottbConfig(Path(cfg_file)) + click.echo("Configuration Context:") + click.echo(f"Default Database: {config.default_database}") + click.echo(f"Default Database Path: {config.default_db_location}") + click.echo("Database Locations:") + for db_name, db_path in config.db_path_dict.items(): + click.echo(f" - {db_name}: {db_path}") + except Exception as e: + logger.error(f"Error loading configuration: {e}") + click.echo(f"Failed to load configuration from {cfg_file}") + else: + path = Path(cfg_file) + + if path.is_file(): + with path.open('r') as file: + content = file.read() + click.echo(content) + else: + click.echo(f"Configuration file not found at {cfg_file}") + + +@click.command('show-all', help='Show everything: configuration, databases, and device metadata') +@click.pass_context +def show_everything(ctx): + """Show everything that can be recursively found based on config except file contents.""" + config = ctx.obj['CONFIG'] + click.echo("Configuration Context:") + click.echo(f"Default Database: {config.default_database}") + click.echo(f"Default Database Path: {config.default_db_location}") + click.echo("Database Locations:") + everything_dict = {} + for db_name, db_path in config.db_path_dict.items(): + + click.echo(f" - {db_name}: {db_path}") + for db_name, db_path in config.db_path_dict.items(): + full_db_path = Path(db_path) / db_name + if full_db_path.is_dir(): + click.echo(f"\nContents of {full_db_path}:") + flag = True + for item in full_db_path.iterdir(): + flag = False + if item.is_file(): + click.echo(f" - {item.name}") + try: + with item.open('r', encoding='utf-8') as file: + content = file.read() + click.echo(f" Content:\n{content}") + except UnicodeDecodeError: + click.echo(" Content is not readable as text") + elif item.is_dir(): + click.echo(f" - {item.name}/") + for subitem in item.iterdir(): + if subitem.is_file(): + click.echo(f" - {subitem.name}") + elif subitem.is_dir(): + click.echo(f" - {subitem.name}/") + if flag: + tb_echo(f'\t EMPTY') + else: + click.echo(f"{full_db_path} is not a directory") + + + diff --git a/code/iottb-project/iottb/commands/sniff.py b/code/iottb-project/iottb/commands/sniff.py new file mode 100644 index 0000000..0eef81c --- /dev/null +++ b/code/iottb-project/iottb/commands/sniff.py @@ -0,0 +1,327 @@ +import os +import shutil +import uuid +from time import time + +import click +import subprocess +import json +from pathlib import Path +import logging +import re +from datetime import datetime +from click_option_group import optgroup +from iottb.definitions import APP_NAME, CFG_FILE_PATH +from iottb.models.iottb_config import IottbConfig +from iottb.utils.string_processing import make_canonical_name + +# Setup logger +logger = logging.getLogger('iottb.sniff') + + +def is_ip_address(address): + ip_pattern = re.compile(r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$") + return ip_pattern.match(address) is not None + + +def is_mac_address(address): + mac_pattern = re.compile(r"^([0-9A-Fa-f]{2}:){5}[0-9A-Fa-f]{2}$") + return mac_pattern.match(address) is not None + + +def load_config(cfg_file): + """Loads configuration from the given file path.""" + with open(cfg_file, 'r') as config_file: + return json.load(config_file) + + +def validate_sniff(ctx, param, value): + logger.info('Validating sniff...') + if ctx.params.get('unsafe') and not value: + return None + if not ctx.params.get('unsafe') and not value: + raise click.BadParameter('Address is required unless --unsafe is set.') + if not is_ip_address(value) and not is_mac_address(value): + raise click.BadParameter('Address must be a valid IP address or MAC address.') + return value + + +@click.command('sniff', help='Sniff packets with tcpdump') +@optgroup.group('Testbed sources') +@optgroup.option('--db', '--database', type=str, envvar='IOTTB_DB', show_envvar=True, + help='Database of device. Only needed if not current default.') +@optgroup.option('--app', type=str, help='Companion app being used during capture', required=False) +@optgroup.group('Runtime behaviour') +@optgroup.option('--unsafe', is_flag=True, default=False, envvar='IOTTB_UNSAFE', is_eager=True, + help='Disable checks for otherwise required options.\n', show_envvar=True) +@optgroup.option('--guided', is_flag=True, default=False, envvar='IOTTB_GUIDED', show_envvar=True) +@optgroup.option('--pre', type=click.Path(exists=True, executable=True), help='Script to be executed before main ' + 'command' + 'is started.') +@optgroup.group('Tcpdump options') +@optgroup.option('-i', '--interface', + help='Network interface to capture on.' + + 'If not specified tcpdump tries to find and appropriate one.\n', show_envvar=True, + envvar='IOTTB_CAPTURE_INTERFACE') +@optgroup.option('-a', '--address', callback=validate_sniff, + help='IP or MAC address to filter packets by.\n', show_envvar=True, + envvar='IOTTB_CAPTURE_ADDRESS') +@optgroup.option('-I', '--monitor-mode', help='Put interface into monitor mode.', is_flag=True) +@optgroup.option('--ff', type=str, envvar='IOTTB_CAPTURE_FILTER', show_envvar=True, + help='tcpdump filter as string or file path.') +@optgroup.option('-#', '--print-pacno', is_flag=True, default=True, + help='Print packet number at beginning of line. True by default.') +@optgroup.option('-e', '--print-ll', is_flag=True, default=False, + help='Print link layer headers. True by default.') +@optgroup.option('-c', '--count', type=int, help='Number of packets to capture.', default=1000) +# @optgroup.option('--mins', type=int, help='Time in minutes to capture.', default=1) +@click.argument('tcpdump-args', nargs=-1, required=False, metavar='[TCPDUMP-ARGS]') +@click.argument('device', required=False) +@click.pass_context +def sniff(ctx, device, interface, print_pacno, ff, count, monitor_mode, print_ll, address, db, unsafe, guided, + app, tcpdump_args, **params): + """ Sniff packets from a device """ + logger.info('sniff command invoked') + + # Step1: Load Config + config = ctx.obj['CONFIG'] + logger.debug(f'Config loaded: {config}') + + # Step2: determine relevant database + database = db if db else config.default_database + path = config.db_path_dict[database] + full_db_path = Path(path) / database + logger.debug(f'Full db path is {str(full_db_path)}') + + # 2.2: Check if it exists + if not full_db_path.is_dir(): + logger.error('DB unexpectedly missing') + click.echo('DB unexpectedly missing') + return + + canonical_name, aliases = make_canonical_name(device) + click.echo(f'Using canonical device name {canonical_name}') + device_path = full_db_path / canonical_name + + # Step 3: now the device + if not device_path.exists(): + if not unsafe: + logger.error(f'Device path {device_path} does not exist') + click.echo(f'Device path {device_path} does not exist') + return + else: + device_path.mkdir(parents=True, exist_ok=True) + logger.info(f'Device path {device_path} created') + + click.echo(f'Found device at path {device_path}') + # Step 4: Generate filter + generic_filter = None + cap_filter = None + if ff: + logger.debug(f'ff: {ff}') + if Path(ff).is_file(): + logger.info('Given filter option is a file') + with open(ff, 'r') as f: + cap_filter = f.read().strip() + else: + logger.info('Given filter option is an expression') + cap_filter = ff + else: + if address is not None: + if is_ip_address(address): + generic_filter = 'net' + cap_filter = f'{generic_filter} {address}' + elif is_mac_address(address): + generic_filter = 'ether net' + cap_filter = f'{generic_filter} {address}' + elif not unsafe: + logger.error('Invalid address format') + click.echo('Invalid address format') + return + + logger.info(f'Generic filter {generic_filter}') + click.echo(f'Using filter {cap_filter}') + + # Step 5: prep capture directory + capture_date = datetime.now().strftime('%Y-%m-%d') + capture_base_dir = device_path / f'sniffs/{capture_date}' + capture_base_dir.mkdir(parents=True, exist_ok=True) + + logger.debug(f'Previous captures {capture_base_dir.glob('cap*')}') + capture_count = sum(1 for _ in capture_base_dir.glob('cap*')) + logger.debug(f'Capture count is {capture_count}') + + capture_dir = f'cap{capture_count:04d}-{datetime.now().strftime('%H%M')}' + logger.debug(f'capture_dir: {capture_dir}') + + # Full path + capture_dir_full_path = capture_base_dir / capture_dir + capture_dir_full_path.mkdir(parents=True, exist_ok=True) + + click.echo(f'Files will be placed in {str(capture_dir_full_path)}') + logger.debug(f'successfully created capture directory') + + # Step 6: Prepare capture file names + # Generate UUID for filenames + capture_uuid = str(uuid.uuid4()) + click.echo(f'Capture has id {capture_uuid}') + + pcap_file = f"{canonical_name}_{capture_uuid}.pcap" + pcap_file_full_path = capture_dir_full_path / pcap_file + stdout_log_file = f'stdout_{capture_uuid}.log' + stderr_log_file = f'stderr_{capture_uuid}.log' + + logger.debug(f'Full pcap file path is {pcap_file_full_path}') + logger.info(f'pcap file name is {pcap_file}') + logger.info(f'stdout log file is {stdout_log_file}') + logger.info(f'stderr log file is {stderr_log_file}') + + # Step 7: Build tcpdump command + logger.debug(f'pgid {os.getpgrp()}') + logger.debug(f'ppid {os.getppid()}') + logger.debug(f'(real, effective, saved) user id: {os.getresuid()}') + logger.debug(f'(real, effective, saved) group id: {os.getresgid()}') + + cmd = ['sudo', 'tcpdump'] + + # 7.1 process flags + flags = [] + if print_pacno: + flags.append('-#') + if print_ll: + flags.append('-e') + if monitor_mode: + flags.append('-I') + flags.append('-n') # TODO: Integrate, in case name resolution is wanted! + cmd.extend(flags) + flags_string = " ".join(flags) + logger.debug(f'Flags: {flags_string}') + + # debug interlude + verbosity = ctx.obj['VERBOSITY'] + if verbosity > 0: + verbosity_flag = '-' + for i in range(0, verbosity): + verbosity_flag = verbosity_flag + 'v' + logger.debug(f'verbosity string to pass to tcpdump: {verbosity_flag}') + cmd.append(verbosity_flag) + + # 7.2 generic (i.e. reusable) kw args + generic_kw_args = [] + if count: + generic_kw_args.extend(['-c', str(count)]) + # if mins: + # generic_kw_args.extend(['-G', str(mins * 60)]) TODO: this currently loads to errors with sudo + cmd.extend(generic_kw_args) + generic_kw_args_string = " ".join(generic_kw_args) + logger.debug(f'KW args: {generic_kw_args_string}') + + # 7.3 special kw args (not a priori reusable) + non_generic_kw_args = [] + if interface: + non_generic_kw_args.extend(['-i', interface]) + non_generic_kw_args.extend(['-w', str(pcap_file_full_path)]) + cmd.extend(non_generic_kw_args) + non_generic_kw_args_string = " ".join(non_generic_kw_args) + logger.debug(f'Non transferable (special) kw args: {non_generic_kw_args_string}') + + # 7.4 add filter expression + if cap_filter: + logger.debug(f'cap_filter (not generic): {cap_filter}') + cmd.append(cap_filter) + + full_cmd_string = " ".join(cmd) + + logger.info(f'tcpdump command: {"".join(full_cmd_string)}') + click.echo('Capture setup complete!') + # Step 8: Execute tcpdump command + start_time = datetime.now().strftime("%H:%M:%S") + start = time() + try: + if guided: + click.confirm(f'Execute following command: {full_cmd_string}') + stdout_log_file_abs_path = capture_dir_full_path / stdout_log_file + stderr_log_file_abs_path = capture_dir_full_path / stderr_log_file + stdout_log_file_abs_path.touch(mode=0o777) + stderr_log_file_abs_path.touch(mode=0o777) + with open(stdout_log_file_abs_path, 'w') as out, open(stderr_log_file_abs_path, 'w') as err: + logger.debug(f'\nstdout: {out}.\nstderr: {err}.\n') + + tcp_complete = subprocess.run(cmd, check=True, capture_output=True, text=True) + + out.write(tcp_complete.stdout) + err.write(tcp_complete.stderr) + + #click.echo(f'Mock sniff execution') + click.echo(f"Capture complete. Saved to {pcap_file}") + except subprocess.CalledProcessError as e: + logger.error(f'Failed to capture packets: {e}') + click.echo(f'Failed to capture packets: {e}') + click.echo(f'Check {stderr_log_file} for more info.') + if ctx.obj['DEBUG']: + msg = [f'STDERR log {stderr_log_file} contents:\n'] + with open(capture_dir_full_path / stderr_log_file) as log: + for line in log: + msg.append(line) + + click.echo("\t".join(msg), lvl='e') + # print('DEBUG ACTIVE') + if guided: + click.prompt('Create metadata anyway?') + else: + click.echo('Aborting capture...') + exit() + end_time = datetime.now().strftime("%H:%M:%S") + end = time() + delta = end - start + click.echo(f'tcpdump took {delta:.2f} seconds.') + # Step 9: Register metadata + metadata = { + 'device': canonical_name, + 'device_id': device, + 'capture_id': capture_uuid, + 'capture_date_iso': datetime.now().isoformat(), + 'invoked_command': " ".join(map(str, cmd)), + 'capture_duration': delta, + 'generic_parameters': { + 'flags': flags_string, + 'kwargs': generic_kw_args_string, + 'filter': generic_filter + }, + 'non_generic_parameters': { + 'kwargs': non_generic_kw_args_string, + 'filter': cap_filter + }, + 'features': { + 'interface': interface, + 'address': address + }, + 'resources': { + 'pcap_file': str(pcap_file), + 'stdout_log': str(stdout_log_file), + 'stderr_log': str(stderr_log_file) + }, + 'environment': { + 'capture_dir': capture_dir, + 'database': database, + 'capture_base_dir': str(capture_base_dir), + 'capture_dir_abs_path': str(capture_dir_full_path) + } + } + + click.echo('Ensuring correct ownership of created files.') + username = os.getlogin() + gid = os.getgid() + + # Else there are issues when running with sudo: + try: + subprocess.run(f'sudo chown -R {username}:{username} {device_path}', shell=True) + except OSError as e: + click.echo(f'Some error {e}') + + click.echo(f'Saving metadata.') + metadata_abs_path = capture_dir_full_path / 'capture_metadata.json' + with open(metadata_abs_path, 'w') as f: + json.dump(metadata, f, indent=4) + + click.echo(f'END SNIFF SUBCOMMAND') diff --git a/code/iottb-project/iottb/commands/testbed.py b/code/iottb-project/iottb/commands/testbed.py new file mode 100644 index 0000000..eb26a9f --- /dev/null +++ b/code/iottb-project/iottb/commands/testbed.py @@ -0,0 +1,120 @@ +import click +from pathlib import Path +import logging +from logging.handlers import RotatingFileHandler +import sys +from iottb.models.iottb_config import IottbConfig +from iottb.definitions import DB_NAME, CFG_FILE_PATH + +logger = logging.getLogger(__name__) + + +@click.command() +@click.option('-d', '--dest', type=click.Path(), help='Location to put (new) iottb database') +@click.option('-n', '--name', default=DB_NAME, type=str, help='Name of new database.') +@click.option('--update-default/--no-update-default', default=True, help='If new db should be set as the new default') +@click.pass_context +def init_db(ctx, dest, name, update_default): + logger.info('init-db invoked') + config = ctx.obj['CONFIG'] + logger.debug(f'str(config)') + # Use the default path from config if dest is not provided + known_dbs = config.get_known_databases() + logger.debug(f'Known databases: {known_dbs}') + if name in known_dbs: + dest = config.get_database_location(name) + if Path(dest).joinpath(name).is_dir(): + click.echo(f'A database {name} already exists.') + logger.debug(f'DB {name} exists in {dest}') + click.echo(f'Exiting...') + exit() + logger.debug(f'DB name {name} registered but does not exist.') + if not dest: + logger.info('No dest set, choosing default destination.') + dest = Path(config.default_db_location).parent + + db_path = Path(dest).joinpath(name) + logger.debug(f'Full path for db {str(db_path)}') + # Create the directory if it doesn't exist + db_path.mkdir(parents=True, exist_ok=True) + logger.info(f"mkdir {db_path} successful") + click.echo(f'Created {db_path}') + + # Update configuration + config.set_database_location(name, str(dest)) + if update_default: + config.set_default_database(name, str(dest)) + config.save_config() + logger.info(f"Updated configuration with database {name} at {db_path}") + + +# @click.group('config') +# @click.pass_context +# def cfg(ctx): +# pass +# +# @click.command('set', help='Set the location of a database.') +# @click.argument('database', help='Name of database') +# @click.argument('location', help='Where the database is located (i.e. its parent directory)') +# @click.pass_context +# def set(ctx, key, value): +# click.echo(f'Setting {key} to {value} in config') +# config = ctx.obj['CONFIG'] +# logger.warning('No checks performed!') +# config.set_database_location(key, value) +# config.save_config() + + + + + +@click.command() +@click.option('-d', '--dest', type=click.Path(), help='Location to put (new) iottb database') +@click.option('-n', '--name', default=DB_NAME, type=str, help='Name of new database.') +@click.option('--update-default/--no-update-default', default=True, help='If new db should be set as the new default') +@click.pass_context +def init_db_inactive(ctx, dest, name, update_default): + logger.info('init-db invoked') + config = ctx.obj['CONFIG'] + logger.debug(f'str(config)') + + # Retrieve known databases + known_dbs = config.get_known_databases() + + # Determine destination path + if name in known_dbs: + dest = Path(config.get_database_location(name)) + if dest.joinpath(name).is_dir(): + click.echo(f'A database {name} already exists.') + logger.debug(f'DB {name} exists in {dest}') + click.echo(f'Exiting...') + exit() + logger.debug(f'DB name {name} registered but does not exist.') + elif not dest: + logger.info('No destination set, using default path from config.') + dest = Path(config.default_db_location).parent + + # Ensure destination path is absolute + dest = dest.resolve() + + # Combine destination path with database name + db_path = dest / name + logger.debug(f'Full path for database: {str(db_path)}') + + # Create the directory if it doesn't exist + try: + db_path.mkdir(parents=True, exist_ok=True) + logger.info(f'Directory {db_path} created successfully.') + click.echo(f'Created {db_path}') + except Exception as e: + logger.error(f'Failed to create directory {db_path}: {e}') + click.echo(f'Failed to create directory {db_path}: {e}', err=True) + exit(1) + + # Update configuration + config.set_database_location(name, str(db_path)) + if update_default: + config.set_default_database(name, str(db_path)) + config.save_config() + logger.info(f'Updated configuration with database {name} at {db_path}') + click.echo(f'Updated configuration with database {name} at {db_path}') diff --git a/code/iottb-project/iottb/definitions.py b/code/iottb-project/iottb/definitions.py new file mode 100644 index 0000000..76d594c --- /dev/null +++ b/code/iottb-project/iottb/definitions.py @@ -0,0 +1,48 @@ +import logging +from pathlib import Path + +import click + +APP_NAME = 'iottb' +DB_NAME = 'iottb.db' +CFG_FILE_PATH = str(Path(click.get_app_dir(APP_NAME)).joinpath('iottb.cfg')) +CONSOLE_LOG_FORMATS = { + 0: '%(levelname)s - %(message)s', + 1: '%(levelname)s - %(module)s - %(message)s', + 2: '%(levelname)s - %(module)s - %(funcName)s - %(lineno)d - %(message)s' +} + +LOGFILE_LOG_FORMAT = { + 0: '%(levelname)s - %(asctime)s - %(module)s - %(message)s', + 1: '%(levelname)s - %(asctime)s - %(module)s - %(funcName)s - %(message)s', + 2: '%(levelname)s - %(asctime)s - %(module)s - %(funcName)s - %(lineno)d - %(message)s' +} +MAX_VERBOSITY = len(CONSOLE_LOG_FORMATS) - 1 +assert len(LOGFILE_LOG_FORMAT) == len(CONSOLE_LOG_FORMATS), 'Log formats must be same size' + +LOGLEVEL = logging.DEBUG +LOGDIR = Path.cwd() / 'logs' + +# Characters to just replace +REPLACEMENT_SET_CANONICAL_DEVICE_NAMES = {' ', '_', ',', '!', '@', '#', '$', '%', '^', '&', '*', '(', ')', '+', '=', + '{', '}', '[', ']', + '|', + '\\', ':', ';', '"', "'", '<', '>', '?', '/', '`', '~'} +# Characters to possibly error on +ERROR_SET_CANONICAL_DEVICE_NAMES = {',', '!', '@', '#', '$', '%', '^', '&', '*', '(', ')', '+', '=', '{', '}', '[', ']', + '|', + '\\', ':', ';', '"', "'", '<', '>', '?', '/', '`', '~'} + +DEVICE_METADATA_FILE_NAME = 'device_metadata.json' + +TB_ECHO_STYLES = { + 'w': {'fg': 'yellow', 'bold': True}, + 'i': {'fg': 'blue', 'italic': True}, + 's': {'fg': 'green', 'bold': True}, + 'e': {'fg': 'red', 'bold': True}, + 'header': {'fg': 'bright_cyan', 'bold': True, 'italic': True} +} + +NAME_OF_CAPTURE_DIR = 'sniffs' + + diff --git a/code/iottb-project/iottb/main.py b/code/iottb-project/iottb/main.py new file mode 100644 index 0000000..acba6dd --- /dev/null +++ b/code/iottb-project/iottb/main.py @@ -0,0 +1,77 @@ +import os +import shutil + +import click +from pathlib import Path +import logging + +from iottb.commands.sniff import sniff +from iottb.commands.developer import set_key_in_table_to, rm_cfg, rm_dbs, show_cfg, show_everything + +################################################## +# Import package modules +################################################# +from iottb.utils.logger_config import setup_logging +from iottb import definitions +from iottb.models.iottb_config import IottbConfig +from iottb.commands.testbed import init_db +from iottb.commands.add_device import add_device + +############################################################################ +# Module shortcuts for global definitions +########################################################################### +APP_NAME = definitions.APP_NAME +DB_NAME = definitions.DB_NAME +CFG_FILE_PATH = definitions.CFG_FILE_PATH +# These are (possibly) redundant when defined in definitions.py +# keeping them here until refactored and tested +MAX_VERBOSITY = definitions.MAX_VERBOSITY + +# Logger stuff +loglevel = definitions.LOGLEVEL +logger = logging.getLogger(__name__) + + +@click.group(context_settings=dict(auto_envvar_prefix='IOTTB', show_default=True)) +@click.option('-v', '--verbosity', count=True, type=click.IntRange(0, 3), default=0, is_eager=True, + help='Set verbosity') +@click.option('-d', '--debug', is_flag=True, default=False, is_eager=True, + help='Enable debug mode') +@click.option('--dry-run', is_flag=True, default=True, is_eager=True) +@click.option('--cfg-file', type=click.Path(), + default=Path(click.get_app_dir(APP_NAME)).joinpath('iottb.cfg'), + envvar='IOTTB_CONF_HOME', help='Path to iottb config file') +@click.pass_context +def cli(ctx, verbosity, debug, dry_run, cfg_file): + setup_logging(verbosity, debug) # Setup logging based on the loaded configuration and other options + ctx.ensure_object(dict) # Make sure context is ready for use + logger.info("Starting execution.") + ctx.obj['CONFIG'] = IottbConfig(cfg_file) # Load configuration directly + ctx.meta['FULL_PATH_CONFIG_FILE'] = str(cfg_file) + ctx.meta['DRY_RUN'] = dry_run + logger.debug(f'Verbosity: {verbosity}') + ctx.obj['VERBOSITY'] = verbosity + logger.debug(f'Debug: {debug}') + ctx.obj['DEBUG'] = debug + + +################################################################################## +# Add all subcommands to group here +################################################################################# +# TODO: Is there a way to do this without pylint freaking out? +# noinspection PyTypeChecker +cli.add_command(init_db) +cli.add_command(rm_cfg) +cli.add_command(set_key_in_table_to) +cli.add_command(rm_dbs) +# noinspection PyTypeChecker +cli.add_command(add_device) +cli.add_command(show_cfg) +cli.add_command(sniff) +cli.add_command(show_everything) + + +if __name__ == '__main__': + cli() + for log in Path.cwd().iterdir(): + log.chmod(0o777) diff --git a/code/iottb-project/iottb/models/__init__.py b/code/iottb-project/iottb/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/code/iottb-project/iottb/models/database.py b/code/iottb-project/iottb/models/database.py new file mode 100644 index 0000000..63105f2 --- /dev/null +++ b/code/iottb-project/iottb/models/database.py @@ -0,0 +1,6 @@ +class Database: + + def __init__(self, name, path): + self.name = name + self.path = path + self.device_list = [] # List of the canonical names of devices registered in this database diff --git a/code/iottb-project/iottb/models/device_metadata.py b/code/iottb-project/iottb/models/device_metadata.py new file mode 100644 index 0000000..505677a --- /dev/null +++ b/code/iottb-project/iottb/models/device_metadata.py @@ -0,0 +1,44 @@ +import logging +import uuid +from datetime import datetime +import logging +import click + +from iottb.utils.string_processing import make_canonical_name + +logger = logging.getLogger(__name__) + + +class DeviceMetadata: + def __init__(self, device_name, description="", model="", manufacturer="", firmware_version="", device_type="", + supported_interfaces="", companion_applications="", save_to_file=None): + self.device_id = str(uuid.uuid4()) + self.device_name = device_name + cn, aliases = make_canonical_name(device_name) + logger.debug(f'cn, aliases = {cn}, {str(aliases)}') + self.aliases = aliases + self.canonical_name = cn + self.date_added = datetime.now().isoformat() + self.description = description + self.model = model + self.manufacturer = manufacturer + self.current_firmware_version = firmware_version + self.device_type = device_type + self.supported_interfaces = supported_interfaces + self.companion_applications = companion_applications + self.last_metadata_update = datetime.now().isoformat() + if save_to_file is not None: + click.echo('TODO: Implement saving config to file after creation!') + + def add_alias(self, alias: str = ""): + if alias == "": + return + self.aliases.append(alias) + + def get_canonical_name(self): + return self.canonical_name + + def print_attributes(self): + print(f'Printing attribute value pairs in {__name__}') + for attr, value in self.__dict__.items(): + print(f'{attr}: {value}') diff --git a/code/iottb-project/iottb/models/iottb_config.py b/code/iottb-project/iottb/models/iottb_config.py new file mode 100644 index 0000000..25736dc --- /dev/null +++ b/code/iottb-project/iottb/models/iottb_config.py @@ -0,0 +1,124 @@ +import json +from pathlib import Path + +from iottb import definitions +import logging + +logger = logging.getLogger(__name__) + +DB_NAME = definitions.DB_NAME + + +class IottbConfig: + """ Class to handle testbed configuration. + + TODO: Add instead of overwrite Database locations when initializing if a location with valid db + exists. + """ + + @staticmethod + def warn(): + logger.warning(f'DatabaseLocations are DatabaseLocationMap in the class {__name__}') + + def __init__(self, cfg_file=definitions.CFG_FILE_PATH): + logger.info('Initializing Config object') + IottbConfig.warn() + self.cfg_file = Path(cfg_file) + self.default_database = None + self.default_db_location = None + self.db_path_dict = dict() + self.load_config() + + def create_default_config(self): + """Create default iottb config file.""" + logger.info(f'Creating default config file at {self.cfg_file}') + self.default_database = DB_NAME + self.default_db_location = str(Path.home()) + self.db_path_dict = { + DB_NAME: self.default_db_location + } + + defaults = { + 'DefaultDatabase': self.default_database, + 'DefaultDatabasePath': self.default_db_location, + 'DatabaseLocations': self.db_path_dict + } + + try: + self.cfg_file.parent.mkdir(parents=True, exist_ok=True) + with self.cfg_file.open('w') as config_file: + json.dump(defaults, config_file, indent=4) + except IOError as e: + logger.error(f"Failed to create default configuration file at {self.cfg_file}: {e}") + raise RuntimeError(f"Failed to create configuration file: {e}") from e + + def load_config(self): + """Loads or creates default configuration from given file path.""" + logger.info('Loading configuration file') + if not self.cfg_file.is_file(): + logger.info('Config file does not exist.') + self.create_default_config() + else: + logger.info('Config file exists, opening.') + with self.cfg_file.open('r') as config_file: + data = json.load(config_file) + self.default_database = data.get('DefaultDatabase') + self.default_db_location = data.get('DefaultDatabasePath') + self.db_path_dict = data.get('DatabaseLocations', {}) + + def save_config(self): + """Save the current configuration to the config file.""" + data = { + 'DefaultDatabase': self.default_database, + 'DefaultDatabasePath': self.default_db_location, + 'DatabaseLocations': self.db_path_dict + } + try: + with self.cfg_file.open('w') as config_file: + json.dump(data, config_file, indent=4) + except IOError as e: + logger.error(f"Failed to save configuration file at {self.cfg_file}: {e}") + raise RuntimeError(f"Failed to save configuration file: {e}") from e + + def set_default_database(self, name, path): + """Set the default database and its path.""" + self.default_database = name + self.default_db_location = path + self.db_path_dict[name] = path + + def get_default_database_location(self): + return self.default_db_location + + def get_default_database(self): + return self.default_database + + def get_database_location(self, name): + """Get the location of a specific database.""" + return self.db_path_dict.get(name) + + def set_database_location(self, name, path): + """Set the location for a database.""" + logger.debug(f'Type of "path" parameter {type(path)}') + logger.debug(f'String value of "path" parameter {str(path)}') + logger.debug(f'Type of "name" parameter {type(name)}') + logger.debug(f'String value of "name" parameter {str(name)}') + path = Path(path) + name = Path(name) + logger.debug(f'path:name = {path}:{name}') + if path.name == name: + path = path.parent + self.db_path_dict[str(name)] = str(path) + + def get_known_databases(self): + """Get the set of known databases""" + logger.info(f'Getting known databases.') + + return self.db_path_dict.keys() + + def get_know_database_paths(self): + """Get the paths of all known databases""" + logger.info(f'Getting known database paths.') + return self.db_path_dict.values() + + def get_full_default_path(self): + return Path(self.default_db_location) / self.default_database diff --git a/code/iottb-project/iottb/models/sniff_metadata.py b/code/iottb-project/iottb/models/sniff_metadata.py new file mode 100644 index 0000000..9fa5e11 --- /dev/null +++ b/code/iottb-project/iottb/models/sniff_metadata.py @@ -0,0 +1,39 @@ +import json +import logging +import uuid +from datetime import datetime +from pathlib import Path + +logger = logging.getLogger('iottb.sniff') # Log with sniff subcommand + +class CaptureMetadata: + def __init__(self, device_id, capture_dir, interface, address, capture_file, tcpdump_command, tcpdump_stdout, tcpdump_stderr, packet_filter, alias): + self.base_data = { + 'device_id': device_id, + 'capture_id': str(uuid.uuid4()), + 'capture_date': datetime.now().isoformat(), + 'capture_dir': str(capture_dir), + 'capture_file': capture_file, + 'start_time': "", + 'stop_time': "", + 'alias': alias + } + self.features = { + 'interface': interface, + 'device_ip_address': address if address else "No IP Address set", + 'tcpdump_stdout': str(tcpdump_stdout), + 'tcpdump_stderr': str(tcpdump_stderr), + 'packet_filter': packet_filter + } + self.command = tcpdump_command + + def save_to_file(self): + metadata = { + 'base_data': self.base_data, + 'features': self.features, + 'command': self.command + } + metadata_file_path = Path(self.base_data['capture_dir']) / 'metadata.json' + with open(metadata_file_path, 'w') as f: + json.dump(metadata, f, indent=4) + logger.info(f'Metadata saved to {metadata_file_path}') diff --git a/code/iottb-project/iottb/scripts/generate_help.py b/code/iottb-project/iottb/scripts/generate_help.py new file mode 100755 index 0000000..fd4b683 --- /dev/null +++ b/code/iottb-project/iottb/scripts/generate_help.py @@ -0,0 +1,52 @@ +import click +from io import StringIO +import sys + +# Import your CLI app here +from iottb.main import cli + +"""Script to generate the help text and write to file. + + Definitely needs better formatting. + Script is also not very flexible. +""" + + +def get_help_text(command): + """Get the help text for a given command.""" + help_text = StringIO() + with click.Context(command) as ctx: + # chatgpt says this helps: was right + sys_stdout = sys.stdout + sys.stdout = help_text + try: + click.echo(command.get_help(ctx)) + finally: + sys.stdout = sys_stdout + return help_text.getvalue() + + +def write_help_to_file(cli, filename): + """Write help messages of all commands and subcommands to a file.""" + with open(filename, 'w') as f: + # main + f.write(f"Main Command: iottb\n") + f.write(get_help_text(cli)) + f.write("\n\n") + + # go through subcommands + for cmd_name, cmd in cli.commands.items(): + f.write(f"Command: {cmd_name}\n") + f.write(get_help_text(cmd)) + f.write("\n\n") + + # subcommands of subcommands + if isinstance(cmd, click.Group): + for sub_cmd_name, sub_cmd in cmd.commands.items(): + f.write(f"Subcommand: {cmd_name} {sub_cmd_name}\n") + f.write(get_help_text(sub_cmd)) + f.write("\n\n") + + +if __name__ == "__main__": + write_help_to_file(cli, "help_messages.md") diff --git a/code/iottb-project/iottb/scripts/sudo_iottb b/code/iottb-project/iottb/scripts/sudo_iottb new file mode 100644 index 0000000..1f1af62 --- /dev/null +++ b/code/iottb-project/iottb/scripts/sudo_iottb @@ -0,0 +1,4 @@ +#/bin/sh +echo 'Running iottb as sudo' +sudo $(which python) iottb $@ +echo 'Finished executing iottb with sudo' \ No newline at end of file diff --git a/code/iottb-project/iottb/utils/__init__.py b/code/iottb-project/iottb/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/code/iottb-project/iottb/utils/logger_config.py b/code/iottb-project/iottb/utils/logger_config.py new file mode 100644 index 0000000..5cf76ad --- /dev/null +++ b/code/iottb-project/iottb/utils/logger_config.py @@ -0,0 +1,41 @@ +import logging +import sys +from logging.handlers import RotatingFileHandler + +from iottb import definitions +from iottb.definitions import MAX_VERBOSITY, CONSOLE_LOG_FORMATS, APP_NAME, LOGFILE_LOG_FORMAT + +loglevel = definitions.LOGLEVEL + + +def setup_logging(verbosity, debug=loglevel): + """ Setup root logger for iottb """ + log_level = loglevel + handlers = [] + date_format = '%Y-%m-%d %H:%M:%S' + if verbosity > 0: + log_level = logging.WARNING + if verbosity > MAX_VERBOSITY: + verbosity = MAX_VERBOSITY + log_level = logging.INFO + assert verbosity <= MAX_VERBOSITY, f'Verbosity must be <= {MAX_VERBOSITY}' + console_handler = logging.StreamHandler(sys.stdout) + print(str(sys.stdout)) + console_handler.setFormatter(logging.Formatter(CONSOLE_LOG_FORMATS[verbosity], datefmt=date_format)) + console_handler.setLevel(logging.DEBUG) # can keep at debug since it depends on global level? + handlers.append(console_handler) + + if debug: + log_level = logging.DEBUG + + # Logfile logs INFO+, no debugs though + file_handler = RotatingFileHandler(f'{str(definitions.LOGDIR / APP_NAME)}.log', maxBytes=10240, backupCount=5) + file_handler.setFormatter(logging.Formatter(LOGFILE_LOG_FORMAT[verbosity], datefmt=date_format)) + file_handler.setLevel(logging.INFO) + + # finnish root logger setup + handlers.append(file_handler) + # Force this config to be applied to root logger + logging.basicConfig(level=log_level, handlers=handlers, force=True) + + diff --git a/code/iottb-project/iottb/utils/string_processing.py b/code/iottb-project/iottb/utils/string_processing.py new file mode 100644 index 0000000..7b2ae39 --- /dev/null +++ b/code/iottb-project/iottb/utils/string_processing.py @@ -0,0 +1,40 @@ +import re +from iottb import definitions +import logging + +logger = logging.getLogger(__name__) + + +def normalize_string(s, chars_to_replace=None, replacement=None, allow_unicode=False): + pass + + +def make_canonical_name(name): + """ + Normalize the device name to a canonical form: + - Replace the first two occurrences of spaces and transform characters with dashes. + - Remove any remaining spaces and non-ASCII characters. + - Convert to lowercase. + """ + aliases = [name] + logger.info(f'Normalizing name {name}') + + # We first normalize + chars_to_replace = definitions.REPLACEMENT_SET_CANONICAL_DEVICE_NAMES + pattern = re.compile('|'.join(re.escape(char) for char in chars_to_replace)) + norm_name = pattern.sub('-', name) + norm_name = re.sub(r'[^\x00-\x7F]+', '', norm_name) # removes non ascii chars + + aliases.append(norm_name) + # Lower case + norm_name = norm_name.lower() + aliases.append(norm_name) + + # canonical name is only first two parts of resulting string + parts = norm_name.split('-') + canonical_name = canonical_name = '-'.join(parts[:2]) + aliases.append(canonical_name) + aliases = list(set(aliases)) + logger.debug(f'Canonical name: {canonical_name}') + logger.debug(f'Aliases: {aliases}') + return canonical_name, aliases diff --git a/code/iottb-project/iottb/utils/user_interaction.py b/code/iottb-project/iottb/utils/user_interaction.py new file mode 100644 index 0000000..767e286 --- /dev/null +++ b/code/iottb-project/iottb/utils/user_interaction.py @@ -0,0 +1,42 @@ +# iottb/utils/user_interaction.py + +import click +from iottb.definitions import TB_ECHO_STYLES +import sys +import os + + +def tb_echo2(msg: str, lvl='i', log=True): + style = TB_ECHO_STYLES.get(lvl, {}) + click.secho(f'[IOTTB]', **style) + click.secho(f'[IOTTB] \t {msg}', **style) + + +last_prefix = None + + +def tb_echo(msg: str, lvl='i', log=True): + global last_prefix + prefix = f'Testbed [{lvl.upper()}]\n' + + if last_prefix != prefix: + click.secho(prefix, nl=False, **TB_ECHO_STYLES['header']) + last_prefix = prefix + + click.secho(f' {msg}', **TB_ECHO_STYLES[lvl]) + + +def main(): + tb_echo('Info message', 'i') + tb_echo('Warning message', 'w') + tb_echo('Error message', 'e') + tb_echo('Success message', 's') + + +if __name__ == '__main__': + # arrrgggg hacky + current_dir = os.path.dirname(os.path.abspath(__file__)) + project_root = os.path.abspath(os.path.join(current_dir, '../../')) + sys.path.insert(0, project_root) + + main() diff --git a/code/iottb-project/poetry.lock b/code/iottb-project/poetry.lock new file mode 100644 index 0000000..b23fb0e --- /dev/null +++ b/code/iottb-project/poetry.lock @@ -0,0 +1,103 @@ +# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. + +[[package]] +name = "click" +version = "8.1.7" +description = "Composable command line interface toolkit" +optional = false +python-versions = ">=3.7" +files = [ + {file = "click-8.1.7-py3-none-any.whl", hash = "sha256:ae74fb96c20a0277a1d615f1e4d73c8414f5a98db8b799a7931d1582f3390c28"}, + {file = "click-8.1.7.tar.gz", hash = "sha256:ca9853ad459e787e2192211578cc907e7594e294c7ccc834310722b41b9ca6de"}, +] + +[package.dependencies] +colorama = {version = "*", markers = "platform_system == \"Windows\""} + +[[package]] +name = "colorama" +version = "0.4.6" +description = "Cross-platform colored terminal text." +optional = false +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7" +files = [ + {file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"}, + {file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"}, +] + +[[package]] +name = "iniconfig" +version = "2.0.0" +description = "brain-dead simple config-ini parsing" +optional = false +python-versions = ">=3.7" +files = [ + {file = "iniconfig-2.0.0-py3-none-any.whl", hash = "sha256:b6a85871a79d2e3b22d2d1b94ac2824226a63c6b741c88f7ae975f18b6778374"}, + {file = "iniconfig-2.0.0.tar.gz", hash = "sha256:2d91e135bf72d31a410b17c16da610a82cb55f6b0477d1a902134b24a455b8b3"}, +] + +[[package]] +name = "packaging" +version = "24.1" +description = "Core utilities for Python packages" +optional = false +python-versions = ">=3.8" +files = [ + {file = "packaging-24.1-py3-none-any.whl", hash = "sha256:5b8f2217dbdbd2f7f384c41c628544e6d52f2d0f53c6d0c3ea61aa5d1d7ff124"}, + {file = "packaging-24.1.tar.gz", hash = "sha256:026ed72c8ed3fcce5bf8950572258698927fd1dbda10a5e981cdf0ac37f4f002"}, +] + +[[package]] +name = "pluggy" +version = "1.5.0" +description = "plugin and hook calling mechanisms for python" +optional = false +python-versions = ">=3.8" +files = [ + {file = "pluggy-1.5.0-py3-none-any.whl", hash = "sha256:44e1ad92c8ca002de6377e165f3e0f1be63266ab4d554740532335b9d75ea669"}, + {file = "pluggy-1.5.0.tar.gz", hash = "sha256:2cffa88e94fdc978c4c574f15f9e59b7f4201d439195c3715ca9e2486f1d0cf1"}, +] + +[package.extras] +dev = ["pre-commit", "tox"] +testing = ["pytest", "pytest-benchmark"] + +[[package]] +name = "pytest" +version = "8.2.2" +description = "pytest: simple powerful testing with Python" +optional = false +python-versions = ">=3.8" +files = [ + {file = "pytest-8.2.2-py3-none-any.whl", hash = "sha256:c434598117762e2bd304e526244f67bf66bbd7b5d6cf22138be51ff661980343"}, + {file = "pytest-8.2.2.tar.gz", hash = "sha256:de4bb8104e201939ccdc688b27a89a7be2079b22e2bd2b07f806b6ba71117977"}, +] + +[package.dependencies] +colorama = {version = "*", markers = "sys_platform == \"win32\""} +iniconfig = "*" +packaging = "*" +pluggy = ">=1.5,<2.0" + +[package.extras] +dev = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"] + +[[package]] +name = "scapy" +version = "2.5.0" +description = "Scapy: interactive packet manipulation tool" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, <4" +files = [ + {file = "scapy-2.5.0.tar.gz", hash = "sha256:5b260c2b754fd8d409ba83ee7aee294ecdbb2c235f9f78fe90bc11cb6e5debc2"}, +] + +[package.extras] +basic = ["ipython"] +complete = ["cryptography (>=2.0)", "ipython", "matplotlib", "pyx"] +docs = ["sphinx (>=3.0.0)", "sphinx_rtd_theme (>=0.4.3)", "tox (>=3.0.0)"] + +[metadata] +lock-version = "2.0" +python-versions = "^3.12" +content-hash = "10b2c268b0f10db15eab2cca3d2dc9dc25bc60f4b218ebf786fb780fa85557e0" diff --git a/code/iottb-project/pyproject.toml b/code/iottb-project/pyproject.toml new file mode 100644 index 0000000..80acba6 --- /dev/null +++ b/code/iottb-project/pyproject.toml @@ -0,0 +1,21 @@ +[tool.poetry] +name = "iottb" +version = "0.1.0" +description = "IoT Testbed" +authors = ["Sebastian Lenzlinger "] +readme = "README.md" + +[tool.poetry.dependencies] +python = "^3.12" +click = "^8.1" +scapy = "^2.5" + +[tool.poetry.scripts] +iottb = "iottb.main:cli" + +[tool.poetry.group.test.dependencies] +pytest = "^8.2.2" + +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" diff --git a/code/iottb-project/tests/__init__.py b/code/iottb-project/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/code/iottb-project/tests/test_make_canonical_name.py b/code/iottb-project/tests/test_make_canonical_name.py new file mode 100644 index 0000000..eac541c --- /dev/null +++ b/code/iottb-project/tests/test_make_canonical_name.py @@ -0,0 +1,23 @@ +from iottb.utils.string_processing import make_canonical_name + +import pytest + + +class TestMakeCanonicalName: + + def test_normalizes_name_with_spaces_to_dashes(self): + name = "Device Name With Spaces" + expected_canonical_name = "device-name" + canonical_name, aliases = make_canonical_name(name) + assert canonical_name == expected_canonical_name + assert "device-name-with-spaces" in aliases + assert "device-name" in aliases + assert "Device Name With Spaces" in aliases + + def test_name_with_no_spaces_or_special_characters(self): + name = "DeviceName123" + expected_canonical_name = "devicename123" + canonical_name, aliases = make_canonical_name(name) + assert canonical_name == expected_canonical_name + assert "DeviceName123" in aliases + assert "devicename123" in aliases diff --git a/code/iottb/scripts/wifi_ctl.sh b/code/iottb/scripts/wifi_ctl.sh new file mode 100644 index 0000000..076d2fd --- /dev/null +++ b/code/iottb/scripts/wifi_ctl.sh @@ -0,0 +1,55 @@ +#!/usr/bin/env bash +# Note, this is not my original work. Source: https://linuxtldr.com/changing-interface-mode/ + +function list_nic_info () { + ip addr show +} + +function enable_monm_iw () { + interface=$1 + sudo ip link set "$interface" down + sudo iw "$interface" set monitor control + sudo ip link set "$interface" up +} + +function disable_monm_iw () { + interface=$1 + sudo ip link set "$interface" down + sudo iw "$interface" set type managed + sudo ip link set "$interface" up +} + +function enable_monm_iwconfig () { + interface=$1 + sudo ifconfig "$interface" down + sudo iwconfig "$interface" mode monitor + sudo ifconfig "$interface" up +} + +function disable_monm_iwconfig () { + interface=$1 + sudo ifconfig "$interface" down + sudo iwconfig "$interface" mode managed + sudo ifconfig "$interface" up +} + +function enable_monm_acng () { + interface=$1 + sudo airmon-ng check + sudo airmon-ng check kill + sudo airmon-ng start "$interface" +} + +function disable_monm_acng () { + interface="${1}mon" + sudo airmon-ng stop "$interface" + sudo systemctl restart NetworkManager +} + +if declare -f "$1" > /dev/null +then + "$@" +else + echo "Unknown function '$1'" >&2 + exit 1 +fi \ No newline at end of file diff --git a/code/iottb/utils/diagram1.py b/code/iottb/utils/diagram1.py new file mode 100644 index 0000000..28a9657 --- /dev/null +++ b/code/iottb/utils/diagram1.py @@ -0,0 +1,29 @@ +import matplotlib.pyplot as plt +import networkx as nx + +# Create the graph +G1 = nx.DiGraph() + +# Add nodes with positions +G1.add_node("IoT Device", pos=(1, 3)) +G1.add_node("AP", pos=(3, 3)) +G1.add_node("Switch (Port Mirroring Enabled)", pos=(5, 3)) +G1.add_node("Gateway Router", pos=(7, 3)) +G1.add_node("Internet", pos=(9, 3)) +G1.add_node("Capture Device", pos=(5, 1)) + +# Add edges +G1.add_edge("IoT Device", "AP") +G1.add_edge("AP", "Switch (Port Mirroring Enabled)") +G1.add_edge("Switch (Port Mirroring Enabled)", "Gateway Router") +G1.add_edge("Gateway Router", "Internet") +G1.add_edge("Switch (Port Mirroring Enabled)", "Capture Device") + +# Draw the graph +pos = nx.get_node_attributes(G1, 'pos') +plt.figure(figsize=(12, 8)) +nx.draw(G1, pos, with_labels=True, node_size=3000, node_color='lightblue', font_size=10, font_weight='bold') +nx.draw_networkx_edge_labels(G1, pos, edge_labels={("Switch (Port Mirroring Enabled)", "Capture Device"): "Mirrored Traffic"}, font_color='red') + +plt.title("IoT Device Connected via AP to Gateway Router via Switch with Port Mirroring Enabled") +plt.show() diff --git a/code/iottb/utils/diagramm2.py b/code/iottb/utils/diagramm2.py new file mode 100644 index 0000000..4e723da --- /dev/null +++ b/code/iottb/utils/diagramm2.py @@ -0,0 +1,27 @@ +import matplotlib.pyplot as plt +import networkx as nx + +# Create the graph +G2 = nx.DiGraph() + +# Add nodes with positions +G2.add_node("IoT Device", pos=(1, 3)) +G2.add_node("Capture Device (Hotspot)", pos=(3, 3)) +G2.add_node("Ethernet Connection", pos=(5, 3)) +G2.add_node("Gateway Router", pos=(7, 3)) +G2.add_node("Internet", pos=(9, 3)) + +# Add edges +G2.add_edge("IoT Device", "Capture Device (Hotspot)") +G2.add_edge("Capture Device (Hotspot)", "Ethernet Connection") +G2.add_edge("Ethernet Connection", "Gateway Router") +G2.add_edge("Gateway Router", "Internet") + +# Draw the graph +pos = nx.get_node_attributes(G2, 'pos') +plt.figure(figsize=(12, 8)) +nx.draw(G2, pos, with_labels=True, node_size=3000, node_color='lightblue', font_size=10, font_weight='bold') +nx.draw_networkx_edge_labels(G2, pos, edge_labels={("Capture Device (Hotspot)", "Ethernet Connection"): "Bridged Traffic"}, font_color='red') + +plt.title("Capture Device Provides Hotspot and Bridges to Ethernet for Internet") +plt.show() diff --git a/notes/journal/Untitled.md b/notes/journal/Untitled.md new file mode 100644 index 0000000..e69de29