From a5325fc35fc7589323df9df57bdbfa1f197fa83c Mon Sep 17 00:00:00 2001 From: Sebastian Lenzlinger Date: Sat, 29 Jun 2024 00:43:47 +0200 Subject: [PATCH] Another try --- .gitignore | 33 +++++ code/iottb-project/README.md | 9 ++ code/iottb-project/iottb/__init__.py | 11 ++ code/iottb-project/iottb/commands/__init__.py | 0 .../iottb/commands/add_device.py | 89 +++++++++++++ .../iottb-project/iottb/commands/developer.py | 123 +++++++++++++++++ .../iottb/commands/initialize_testbed.py | 100 ++++++++++++++ code/iottb-project/iottb/commands/sniff.py | 118 +++++++++++++++++ code/iottb-project/iottb/definitions.py | 46 +++++++ code/iottb-project/iottb/main.py | 62 +++++++++ code/iottb-project/iottb/models/__init__.py | 0 code/iottb-project/iottb/models/database.py | 6 + .../iottb/models/device_metadata.py | 44 +++++++ .../iottb/models/iottb_config.py | 124 ++++++++++++++++++ .../iottb/models/sniff_metadata.py | 4 + code/iottb-project/iottb/utils/__init__.py | 0 .../iottb/utils/logger_config.py | 41 ++++++ .../iottb/utils/string_processing.py | 40 ++++++ .../iottb/utils/user_interaction.py | 42 ++++++ code/iottb-project/poetry.lock | 103 +++++++++++++++ code/iottb-project/pyproject.toml | 22 ++++ code/iottb-project/tests/__init__.py | 0 .../tests/test_make_canonical_name.py | 23 ++++ 23 files changed, 1040 insertions(+) create mode 100644 code/iottb-project/README.md create mode 100644 code/iottb-project/iottb/__init__.py create mode 100644 code/iottb-project/iottb/commands/__init__.py create mode 100644 code/iottb-project/iottb/commands/add_device.py create mode 100644 code/iottb-project/iottb/commands/developer.py create mode 100644 code/iottb-project/iottb/commands/initialize_testbed.py create mode 100644 code/iottb-project/iottb/commands/sniff.py create mode 100644 code/iottb-project/iottb/definitions.py create mode 100644 code/iottb-project/iottb/main.py create mode 100644 code/iottb-project/iottb/models/__init__.py create mode 100644 code/iottb-project/iottb/models/database.py create mode 100644 code/iottb-project/iottb/models/device_metadata.py create mode 100644 code/iottb-project/iottb/models/iottb_config.py create mode 100644 code/iottb-project/iottb/models/sniff_metadata.py create mode 100644 code/iottb-project/iottb/utils/__init__.py create mode 100644 code/iottb-project/iottb/utils/logger_config.py create mode 100644 code/iottb-project/iottb/utils/string_processing.py create mode 100644 code/iottb-project/iottb/utils/user_interaction.py create mode 100644 code/iottb-project/poetry.lock create mode 100644 code/iottb-project/pyproject.toml create mode 100644 code/iottb-project/tests/__init__.py create mode 100644 code/iottb-project/tests/test_make_canonical_name.py diff --git a/.gitignore b/.gitignore index d05fd96..b76c2fd 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,36 @@ __pycache__ /.idea .idea/ 2024-bsc-sebastian-lenzlinger.iml +__pycache__ +.venv +iottb.egg-info +.idea +*.log +logs/ +*.pyc +.obsidian + +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/**/usage.statistics.xml +.idea/**/dictionaries +.idea/**/shelf + +# AWS User-specific +.idea/**/aws.xml + +# Generated files +.idea/**/contentModel.xml + +# Sensitive or high-churn files +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml +.idea/**/dbnavigator.xml diff --git a/code/iottb-project/README.md b/code/iottb-project/README.md new file mode 100644 index 0000000..d7a21e2 --- /dev/null +++ b/code/iottb-project/README.md @@ -0,0 +1,9 @@ +# Iottb +## Basic Invocation + +## Configuration +### Env Vars +- IOTTB_CONF_HOME + +By setting this variable you control where the basic iottb application +configuration should be looked for \ No newline at end of file diff --git a/code/iottb-project/iottb/__init__.py b/code/iottb-project/iottb/__init__.py new file mode 100644 index 0000000..1438731 --- /dev/null +++ b/code/iottb-project/iottb/__init__.py @@ -0,0 +1,11 @@ +from iottb import definitions +import logging +from iottb.utils.user_interaction import tb_echo +import click + +click.echo = tb_echo # This is very hacky +logging.basicConfig(level=definitions.LOGLEVEL) +log_dir = definitions.LOGDIR +# Ensure logs dir exists before new handlers are registered in main.py +if not log_dir.is_dir(): + log_dir.mkdir() diff --git a/code/iottb-project/iottb/commands/__init__.py b/code/iottb-project/iottb/commands/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/code/iottb-project/iottb/commands/add_device.py b/code/iottb-project/iottb/commands/add_device.py new file mode 100644 index 0000000..d518080 --- /dev/null +++ b/code/iottb-project/iottb/commands/add_device.py @@ -0,0 +1,89 @@ +import json + +import click +from pathlib import Path +import logging +import re + +from iottb import definitions +from iottb.models.device_metadata import DeviceMetadata +from iottb.models.iottb_config import IottbConfig +from iottb.definitions import CFG_FILE_PATH, TB_ECHO_STYLES + +logger = logging.getLogger(__name__) + + +def add_device_guided(ctx, cn, db): + click.echo('TODO: Implement') + logger.info('Adding device interactively') + #logger.debug(f'Parameters: {params}. value: {value}') + + +@click.command('add-device', help='Add a device to a database') +@click.option('--dev', '--device-name', type=str, required=True, + help='The name of the device to be added. If this string contains spaces or other special characters \ + normalization is performed to derive a canonical name') +@click.option('--db', '--database', type=click.Path(exists=True, file_okay=False, dir_okay=True, path_type=Path), + envvar='IOTTB_DB', show_envvar=True, + help='Database in which to add this device. If not specified use default from config.') +@click.option('--guided', is_flag=True, default=False, show_default=True, envvar='IOTTB_GUIDED_ADD', show_envvar=True, + help='Add device interactively') +def add_device(dev, db, guided): + """Add a new device to a database + + Device name must be supplied unless in an interactive setup. Database is taken from config by default. + """ + logger.info('add-device invoked') + + # Step 1: Load Config + # Dependency: Config file must exist + config = IottbConfig(Path(CFG_FILE_PATH)) + logger.debug(f'Config loaded: {config}') + + # Step 2: Load database + # dependency: Database folder must exist + if db: + database = db + path = config.db_path_dict + logger.debug(f'Resolved (path, db) {path}, {database}') + else: + path = config.default_db_location + database = config.default_database + logger.debug(f'Default (path, db) {path}, {database}') + click.secho(f'Using database {database}') + full_db_path = Path(path) / database + if not full_db_path.is_dir(): + logger.warning(f'No database at {database}') + click.echo(f'Could not find a database.') + click.echo(f'You need to initialize the testbed before before you add devices!') + click.echo(f'To initialize the testbed in the default location run "iottb init-db"') + click.echo('Exiting...') + exit() + + # Step 3: Check if device already exists in database + # dependency: DeviceMetadata object + device_metadata = DeviceMetadata(device_name=dev) + device_dir = full_db_path / device_metadata.canonical_name + + # Check if device is already registered + if device_dir.exists(): + logger.warning(f'Device directory {device_dir} already exists.') + click.echo(f'Device {dev} already exists in the database.') + click.echo('Exiting...') + exit() + try: + device_dir.mkdir() + except OSError as e: + logger.error(f'Error trying to create device {e}') + click.echo('Exiting...') + exit() + + # Step 4: Save metadata into device_dir + metadata_path = device_dir / definitions.DEVICE_METADATA_FILE_NAME + with metadata_path.open('w') as metadata_file: + json.dump(device_metadata.__dict__, metadata_file, indent=4) + click.echo(f'Successfully added device {dev} to database') + logger.debug(f'Added device {dev} to database {database}. Full path of metadata {metadata_path}') + logger.info(f'Metadata for {dev} {device_metadata.print_attributes()}') + + diff --git a/code/iottb-project/iottb/commands/developer.py b/code/iottb-project/iottb/commands/developer.py new file mode 100644 index 0000000..89ec530 --- /dev/null +++ b/code/iottb-project/iottb/commands/developer.py @@ -0,0 +1,123 @@ +from pathlib import Path +import logging +import click + +from iottb.definitions import DB_NAME, CFG_FILE_PATH +from iottb.models.iottb_config import IottbConfig + +logger = logging.getLogger(__name__) + + +@click.group('util') +def tb(): + pass + + +@click.command() +@click.option('--file', default=DB_NAME) +@click.option('--table', type=str, default='DefaultDatabase') +@click.option('--key') +@click.option('--value') +@click.pass_context +def set_key_in_table_to(ctx, file, table, key, value): + """Edit config or metadata files. TODO: Implement""" + click.echo(f'set_key_in_table_to invoked') + logger.warning("Unimplemented subcommand invoked.") + + +@click.command() +@click.confirmation_option(prompt="Are you certain that you want to delete the cfg file?") +def rm_cfg(): + """ Removes the cfg file from the filesystem. + + This is mostly a utility during development. Once non-standard database locations are implemented, + deleting this would lead to iottb not being able to find them anymore. + """ + Path(CFG_FILE_PATH).unlink() + click.echo(f'Iottb configuration removed at {CFG_FILE_PATH}') + + +@click.command() +@click.confirmation_option(prompt="Are you certain that you want to delete the databases file?") +def rm_dbs(dbs): + """ Removes ALL(!) databases from the filesystem if they're empty. + + Development utility currently unfit for use. + """ + config = IottbConfig() + paths = config.get_know_database_paths() + logger.debug(f'Known db paths: {str(paths)}') + for dbs in paths: + try: + Path(dbs).rmdir() + click.echo(f'{dbs} deleted') + except Exception as e: + logger.debug(f'Failed unlinking db {dbs} with error {e}') + logger.info(f'All databases deleted') + + +@click.command('show-cfg', help='Show the current configuration context') +@click.option('--cfg-file', type=click.Path(), default=CFG_FILE_PATH, help='Path to the config file') +@click.option('-pp', is_flag=True, default=False, help='Pretty Print') +@click.pass_context +def show_cfg(ctx, cfg_file, pp): + logger.debug(f'Pretty print option set to {pp}') + if pp: + try: + config = IottbConfig(Path(cfg_file)) + click.echo("Configuration Context:") + click.echo(f"Default Database: {config.default_database}") + click.echo(f"Default Database Path: {config.default_db_location}") + click.echo("Database Locations:") + for db_name, db_path in config.db_path_dict.items(): + click.echo(f" - {db_name}: {db_path}") + except Exception as e: + logger.error(f"Error loading configuration: {e}") + click.echo(f"Failed to load configuration from {cfg_file}") + else: + path = Path(cfg_file) + + if path.is_file(): + with path.open('r') as file: + content = file.read() + click.echo(content) + else: + click.echo(f"Configuration file not found at {cfg_file}") + + +@click.command('show-all', help='Show everything: configuration, databases, and device metadata') +@click.pass_context +def show_everything(ctx): + """Show everything that can be recursively found based on config except file contents.""" + config = ctx.obj['CONFIG'] + click.echo("Configuration Context:") + click.echo(f"Default Database: {config.default_database}") + click.echo(f"Default Database Path: {config.default_db_location}") + click.echo("Database Locations:") + for db_name, db_path in config.db_path_dict.items(): + full_db_path = Path(db_path) / db_name + click.echo(f" - {db_name}: {full_db_path}") + if full_db_path.is_dir(): + click.echo(f"Contents of {db_name} at {full_db_path}:") + for item in full_db_path.iterdir(): + if item.is_file(): + click.echo(f" - {item.name}") + try: + with item.open('r', encoding='utf-8') as file: + content = file.read() + click.echo(f" Content:\n{content}") + except UnicodeDecodeError: + click.echo(" Content is not readable as text") + elif item.is_dir(): + click.echo(f" - {item.name}/") + for subitem in item.iterdir(): + if subitem.is_file(): + click.echo(f" - {subitem.name}") + elif subitem.is_dir(): + click.echo(f" - {subitem.name}/") + else: + click.echo(f" {full_db_path} is not a directory") + + +warnstyle = {'fg': 'red', 'bold': True} +click.secho('Developer command used', **warnstyle) diff --git a/code/iottb-project/iottb/commands/initialize_testbed.py b/code/iottb-project/iottb/commands/initialize_testbed.py new file mode 100644 index 0000000..d7373ea --- /dev/null +++ b/code/iottb-project/iottb/commands/initialize_testbed.py @@ -0,0 +1,100 @@ +import click +from pathlib import Path +import logging +from logging.handlers import RotatingFileHandler +import sys +from iottb.models.iottb_config import IottbConfig +from iottb.definitions import DB_NAME + +logger = logging.getLogger(__name__) + + +@click.command() +@click.option('-d', '--dest', type=click.Path(), help='Location to put (new) iottb database') +@click.option('-n', '--name', default=DB_NAME, type=str, help='Name of new database.') +@click.option('--update-default/--no-update-default', default=True, help='If new db should be set as the new default') +@click.pass_context +def init_db(ctx, dest, name, update_default): + logger.info('init-db invoked') + config = ctx.obj['CONFIG'] + logger.debug(f'str(config)') + # Use the default path from config if dest is not provided + known_dbs = config.get_known_databases() + logger.debug(f'Known databases: {known_dbs}') + if name in known_dbs: + dest = config.get_database_location(name) + if Path(dest).joinpath(name).is_dir(): + click.echo(f'A database {name} already exists.') + logger.debug(f'DB {name} exists in {dest}') + click.echo(f'Exiting...') + exit() + logger.debug(f'DB name {name} registered but does not exist.') + if not dest: + logger.info('No dest set, choosing default destination.') + dest = Path(config.default_db_location).parent + + db_path = Path(dest).joinpath(name) + logger.debug(f'Full path for db {str(db_path)}') + # Create the directory if it doesn't exist + db_path.mkdir(parents=True, exist_ok=True) + logger.info(f"mkdir {db_path} successful") + click.echo(f'Created {db_path}') + + # Update configuration + config.set_database_location(name, str(dest)) + if update_default: + config.set_default_database(name, str(dest)) + config.save_config() + logger.info(f"Updated configuration with database {name} at {db_path}") + + +@click.command() +@click.option('-d', '--dest', type=click.Path(), help='Location to put (new) iottb database') +@click.option('-n', '--name', default=DB_NAME, type=str, help='Name of new database.') +@click.option('--update-default/--no-update-default', default=True, help='If new db should be set as the new default') +@click.pass_context +def init_db_inactive(ctx, dest, name, update_default): + logger.info('init-db invoked') + config = ctx.obj['CONFIG'] + logger.debug(f'str(config)') + + # Retrieve known databases + known_dbs = config.get_known_databases() + + # Determine destination path + if name in known_dbs: + dest = Path(config.get_database_location(name)) + if dest.joinpath(name).is_dir(): + click.echo(f'A database {name} already exists.') + logger.debug(f'DB {name} exists in {dest}') + click.echo(f'Exiting...') + exit() + logger.debug(f'DB name {name} registered but does not exist.') + elif not dest: + logger.info('No destination set, using default path from config.') + dest = Path(config.default_db_location).parent + + # Ensure destination path is absolute + dest = dest.resolve() + + # Combine destination path with database name + db_path = dest / name + logger.debug(f'Full path for database: {str(db_path)}') + + # Create the directory if it doesn't exist + try: + db_path.mkdir(parents=True, exist_ok=True) + logger.info(f'Directory {db_path} created successfully.') + click.echo(f'Created {db_path}') + except Exception as e: + logger.error(f'Failed to create directory {db_path}: {e}') + click.echo(f'Failed to create directory {db_path}: {e}', err=True) + exit(1) + + # Update configuration + config.set_database_location(name, str(db_path)) + if update_default: + config.set_default_database(name, str(db_path)) + config.save_config() + logger.info(f'Updated configuration with database {name} at {db_path}') + click.echo(f'Updated configuration with database {name} at {db_path}') diff --git a/code/iottb-project/iottb/commands/sniff.py b/code/iottb-project/iottb/commands/sniff.py new file mode 100644 index 0000000..0c5334c --- /dev/null +++ b/code/iottb-project/iottb/commands/sniff.py @@ -0,0 +1,118 @@ +import click +import subprocess +import json +from pathlib import Path +import logging +import re +from datetime import datetime +from iottb.definitions import APP_NAME, CFG_FILE_PATH +from iottb.models.iottb_config import IottbConfig +from iottb.utils.string_processing import make_canonical_name +# Setup logger +logger = logging.getLogger('iottb.sniff') + + +def is_ip_address(address): + ip_pattern = re.compile(r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$") + return ip_pattern.match(address) is not None + + +def is_mac_address(address): + mac_pattern = re.compile(r"^([0-9A-Fa-f]{2}:){5}[0-9A-Fa-f]{2}$") + return mac_pattern.match(address) is not None + + +def load_config(cfg_file): + """Loads configuration from the given file path.""" + with open(cfg_file, 'r') as config_file: + return json.load(config_file) + + +def validate_sniff(ctx, param, value): + logger.info('Validating sniff...') + if ctx.params.get('unsafe') and not value: + return None + if not ctx.params.get('unsafe') and not value: + raise click.BadParameter('Address is required unless --unsafe is set.') + if not is_ip_address(value) and not is_mac_address(value): + raise click.BadParameter('Address must be a valid IP address or MAC address.') + return value + + +@click.command('sniff', help='Sniff packets with tcpdump') +@click.argument('device') +@click.option('-i', '--interface', callback=validate_sniff, help='Network interface to capture on', + envvar='IOTTB_CAPTURE_INTERFACE') +@click.option('-a', '--address', callback=validate_sniff, help='IP or MAC address to filter packets by', + envvar='IOTTB_CAPTURE_ADDRESS') +@click.option('--db', '--database', type=click.Path(exists=True, file_okay=False), envvar='IOTTB_DB', + help='Database of device. Only needed if not current default.') +@click.option('--unsafe', is_flag=True, default=False, envvar='IOTTB_UNSAFE', is_eager=True, + help='Disable checks for otherwise required options') +@click.option('--guided', is_flag=True, default=False) +def sniff(device, interface, address, db, unsafe, guided): + """ Sniff packets from a device """ + logger.info('sniff command invoked') + + # Step1: Load Config + config = IottbConfig(Path(CFG_FILE_PATH)) + logger.debug(f'Config loaded: {config}') + + # Step2: determine relevant database + database = db if db else config.default_database + path = config.default_db_location[database] + full_db_path = Path(path) / database + logger.debug(f'Full db path is {str(path)}') + + # 2.2: Check if it exists + if not full_db_path.is_dir(): + logger.error('DB unexpectedly missing') + click.echo('DB unexpectedly missing') + return + + canonical_name, aliases = make_canonical_name(device) + click.echo(f'Using canonical device name {canonical_name}') + device_path = full_db_path / canonical_name + + # Step 3: now the device + if not device_path.exists(): + if not unsafe: + logger.error(f'Device path {device_path} does not exist') + click.echo(f'Device path {device_path} does not exist') + return + else: + device_path.mkdir(parents=True, exist_ok=True) + logger.info(f'Device path {device_path} created') + + # Generate filter + if not unsafe: + if is_ip_address(address): + packet_filter = f"host {address}" + elif is_mac_address(address): + packet_filter = f"ether host {address}" + else: + logger.error('Invalid address format') + click.echo('Invalid address format') + return + else: + packet_filter = None + + + + +@click.command('sniff', help='Sniff packets with tcpdump') +@click.argument('device') +@click.option('-i', '--interface', required=False, help='Network interface to capture on', envvar='IOTTB_CAPTURE_INTERFACE') +@click.option('-a', '--address', required=True, help='IP or MAC address to filter packets by', envvar='IOTTB_CAPTURE_ADDRESS') +@click.option('--db', '--database', type=click.Path(exists=True, file_okay=False), envvar='IOTTB_DB', + help='Database of device. Only needed if not current default.') +@click.option('--unsafe', is_flag=True, default=False, envvar='IOTTB_UNSAFE', + help='Disable checks for otherwise required options') +@click.option('--guided', is_flag=True) +def sniff2(device, interface, address, cfg_file): + """ Sniff packets from a device """ + logger.info('sniff command invoked') + # Step 1: Load Config + # Dependency: Config file must exist + config = IottbConfig(Path(CFG_FILE_PATH)) + logger.debug(f'Config loaded: {config}') diff --git a/code/iottb-project/iottb/definitions.py b/code/iottb-project/iottb/definitions.py new file mode 100644 index 0000000..d4ee672 --- /dev/null +++ b/code/iottb-project/iottb/definitions.py @@ -0,0 +1,46 @@ +import logging +from pathlib import Path + +import click + +APP_NAME = 'iottb' +DB_NAME = 'iottb.db' +CFG_FILE_PATH = str(Path(click.get_app_dir(APP_NAME)).joinpath('iottb.cfg')) +CONSOLE_LOG_FORMATS = { + 0: '%(levelname)s - %(message)s', + 1: '%(levelname)s - %(module)s - %(message)s', + 2: '%(levelname)s - %(module)s - %(funcName)s - %(lineno)d - %(message)s' +} + +LOGFILE_LOG_FORMAT = { + 0: '%(levelname)s - %(asctime)s - %(module)s - %(message)s', + 1: '%(levelname)s - %(asctime)s - %(module)s - %(funcName)s - %(message)s', + 2: '%(levelname)s - %(asctime)s - %(module)s - %(funcName)s - %(lineno)d - %(message)s' +} +MAX_VERBOSITY = len(CONSOLE_LOG_FORMATS) - 1 +assert len(LOGFILE_LOG_FORMAT) == len(CONSOLE_LOG_FORMATS), 'Log formats must be same size' + +LOGLEVEL = logging.DEBUG +LOGDIR = Path.cwd() / 'logs' + +# Characters to just replace +REPLACEMENT_SET_CANONICAL_DEVICE_NAMES = {' ', '_', ',', '!', '@', '#', '$', '%', '^', '&', '*', '(', ')', '+', '=', + '{', '}', '[', ']', + '|', + '\\', ':', ';', '"', "'", '<', '>', '?', '/', '`', '~'} +# Characters to possibly error on +ERROR_SET_CANONICAL_DEVICE_NAMES = {',', '!', '@', '#', '$', '%', '^', '&', '*', '(', ')', '+', '=', '{', '}', '[', ']', + '|', + '\\', ':', ';', '"', "'", '<', '>', '?', '/', '`', '~'} + +DEVICE_METADATA_FILE_NAME = 'device_metadata.json' + +TB_ECHO_STYLES = { + 'w': {'fg': 'yellow', 'bold': True}, + 'i': {'fg': 'blue', 'italic': True}, + 's': {'fg': 'green', 'bold': True}, + 'e': {'fg': 'red', 'bold': True}, + 'header': {'fg': 'bright_cyan', 'bold': True, 'italic': True} +} + +NAME_OF_CAPTURE_DIR = 'sniffs' diff --git a/code/iottb-project/iottb/main.py b/code/iottb-project/iottb/main.py new file mode 100644 index 0000000..c3aaa7a --- /dev/null +++ b/code/iottb-project/iottb/main.py @@ -0,0 +1,62 @@ +import click +from pathlib import Path +import logging + +from iottb.commands.sniff import sniff +from iottb.commands.developer import set_key_in_table_to, rm_cfg, rm_dbs, show_cfg, show_everything +################################################## +# Import package modules +################################################# +from iottb.utils.logger_config import setup_logging +from iottb import definitions +from iottb.models.iottb_config import IottbConfig +from iottb.commands.initialize_testbed import init_db +from iottb.commands.add_device import add_device + +############################################################################ +# Module shortcuts for global definitions +########################################################################### +APP_NAME = definitions.APP_NAME +DB_NAME = definitions.DB_NAME +CFG_FILE_PATH = definitions.CFG_FILE_PATH +# These are (possibly) redundant when defined in definitions.py +# keeping them here until refactored and tested +MAX_VERBOSITY = definitions.MAX_VERBOSITY + +# Logger stuff +loglevel = definitions.LOGLEVEL +logger = logging.getLogger(__name__) + + +@click.group() +@click.option('-v', '--verbosity', count=True, type=click.IntRange(0, 3), default=0, + help='Set verbosity') +@click.option('-d', '--debug', is_flag=True, default=False, + help='Enable debug mode') +@click.option('--cfg-file', type=click.Path(), + default=Path(click.get_app_dir(APP_NAME)).joinpath('iottb.cfg'), + envvar='IOTTB_CONF_HOME', help='Path to iottb config file') +@click.pass_context +def cli(ctx, verbosity, debug, cfg_file): + setup_logging(verbosity, debug) # Setup logging based on the loaded configuration and other options + ctx.ensure_object(dict) # Make sure context is ready for use + logger.info("Starting execution.") + ctx.obj['CONFIG'] = IottbConfig(cfg_file) # Load configuration directly + ctx.meta['FULL_PATH_CONFIG_FILE'] = str(cfg_file) + + +################################################################################## +# Add all subcommands to group here +################################################################################# +# noinspection PyTypeChecker +cli.add_command(init_db) +cli.add_command(rm_cfg) +cli.add_command(set_key_in_table_to) +cli.add_command(rm_dbs) +# noinspection PyTypeChecker +cli.add_command(add_device) +cli.add_command(show_cfg) +cli.add_command(sniff) +cli.add_command(show_everything) +if __name__ == '__main__': + cli(auto_envvar_prefix='IOTTB', show_default=True, show_envvars=True) diff --git a/code/iottb-project/iottb/models/__init__.py b/code/iottb-project/iottb/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/code/iottb-project/iottb/models/database.py b/code/iottb-project/iottb/models/database.py new file mode 100644 index 0000000..63105f2 --- /dev/null +++ b/code/iottb-project/iottb/models/database.py @@ -0,0 +1,6 @@ +class Database: + + def __init__(self, name, path): + self.name = name + self.path = path + self.device_list = [] # List of the canonical names of devices registered in this database diff --git a/code/iottb-project/iottb/models/device_metadata.py b/code/iottb-project/iottb/models/device_metadata.py new file mode 100644 index 0000000..505677a --- /dev/null +++ b/code/iottb-project/iottb/models/device_metadata.py @@ -0,0 +1,44 @@ +import logging +import uuid +from datetime import datetime +import logging +import click + +from iottb.utils.string_processing import make_canonical_name + +logger = logging.getLogger(__name__) + + +class DeviceMetadata: + def __init__(self, device_name, description="", model="", manufacturer="", firmware_version="", device_type="", + supported_interfaces="", companion_applications="", save_to_file=None): + self.device_id = str(uuid.uuid4()) + self.device_name = device_name + cn, aliases = make_canonical_name(device_name) + logger.debug(f'cn, aliases = {cn}, {str(aliases)}') + self.aliases = aliases + self.canonical_name = cn + self.date_added = datetime.now().isoformat() + self.description = description + self.model = model + self.manufacturer = manufacturer + self.current_firmware_version = firmware_version + self.device_type = device_type + self.supported_interfaces = supported_interfaces + self.companion_applications = companion_applications + self.last_metadata_update = datetime.now().isoformat() + if save_to_file is not None: + click.echo('TODO: Implement saving config to file after creation!') + + def add_alias(self, alias: str = ""): + if alias == "": + return + self.aliases.append(alias) + + def get_canonical_name(self): + return self.canonical_name + + def print_attributes(self): + print(f'Printing attribute value pairs in {__name__}') + for attr, value in self.__dict__.items(): + print(f'{attr}: {value}') diff --git a/code/iottb-project/iottb/models/iottb_config.py b/code/iottb-project/iottb/models/iottb_config.py new file mode 100644 index 0000000..25736dc --- /dev/null +++ b/code/iottb-project/iottb/models/iottb_config.py @@ -0,0 +1,124 @@ +import json +from pathlib import Path + +from iottb import definitions +import logging + +logger = logging.getLogger(__name__) + +DB_NAME = definitions.DB_NAME + + +class IottbConfig: + """ Class to handle testbed configuration. + + TODO: Add instead of overwrite Database locations when initializing if a location with valid db + exists. + """ + + @staticmethod + def warn(): + logger.warning(f'DatabaseLocations are DatabaseLocationMap in the class {__name__}') + + def __init__(self, cfg_file=definitions.CFG_FILE_PATH): + logger.info('Initializing Config object') + IottbConfig.warn() + self.cfg_file = Path(cfg_file) + self.default_database = None + self.default_db_location = None + self.db_path_dict = dict() + self.load_config() + + def create_default_config(self): + """Create default iottb config file.""" + logger.info(f'Creating default config file at {self.cfg_file}') + self.default_database = DB_NAME + self.default_db_location = str(Path.home()) + self.db_path_dict = { + DB_NAME: self.default_db_location + } + + defaults = { + 'DefaultDatabase': self.default_database, + 'DefaultDatabasePath': self.default_db_location, + 'DatabaseLocations': self.db_path_dict + } + + try: + self.cfg_file.parent.mkdir(parents=True, exist_ok=True) + with self.cfg_file.open('w') as config_file: + json.dump(defaults, config_file, indent=4) + except IOError as e: + logger.error(f"Failed to create default configuration file at {self.cfg_file}: {e}") + raise RuntimeError(f"Failed to create configuration file: {e}") from e + + def load_config(self): + """Loads or creates default configuration from given file path.""" + logger.info('Loading configuration file') + if not self.cfg_file.is_file(): + logger.info('Config file does not exist.') + self.create_default_config() + else: + logger.info('Config file exists, opening.') + with self.cfg_file.open('r') as config_file: + data = json.load(config_file) + self.default_database = data.get('DefaultDatabase') + self.default_db_location = data.get('DefaultDatabasePath') + self.db_path_dict = data.get('DatabaseLocations', {}) + + def save_config(self): + """Save the current configuration to the config file.""" + data = { + 'DefaultDatabase': self.default_database, + 'DefaultDatabasePath': self.default_db_location, + 'DatabaseLocations': self.db_path_dict + } + try: + with self.cfg_file.open('w') as config_file: + json.dump(data, config_file, indent=4) + except IOError as e: + logger.error(f"Failed to save configuration file at {self.cfg_file}: {e}") + raise RuntimeError(f"Failed to save configuration file: {e}") from e + + def set_default_database(self, name, path): + """Set the default database and its path.""" + self.default_database = name + self.default_db_location = path + self.db_path_dict[name] = path + + def get_default_database_location(self): + return self.default_db_location + + def get_default_database(self): + return self.default_database + + def get_database_location(self, name): + """Get the location of a specific database.""" + return self.db_path_dict.get(name) + + def set_database_location(self, name, path): + """Set the location for a database.""" + logger.debug(f'Type of "path" parameter {type(path)}') + logger.debug(f'String value of "path" parameter {str(path)}') + logger.debug(f'Type of "name" parameter {type(name)}') + logger.debug(f'String value of "name" parameter {str(name)}') + path = Path(path) + name = Path(name) + logger.debug(f'path:name = {path}:{name}') + if path.name == name: + path = path.parent + self.db_path_dict[str(name)] = str(path) + + def get_known_databases(self): + """Get the set of known databases""" + logger.info(f'Getting known databases.') + + return self.db_path_dict.keys() + + def get_know_database_paths(self): + """Get the paths of all known databases""" + logger.info(f'Getting known database paths.') + return self.db_path_dict.values() + + def get_full_default_path(self): + return Path(self.default_db_location) / self.default_database diff --git a/code/iottb-project/iottb/models/sniff_metadata.py b/code/iottb-project/iottb/models/sniff_metadata.py new file mode 100644 index 0000000..9e66e0d --- /dev/null +++ b/code/iottb-project/iottb/models/sniff_metadata.py @@ -0,0 +1,4 @@ +import logging + +logger = logging.getLogger('iottb.sniff') # Log with sniff subcommand + diff --git a/code/iottb-project/iottb/utils/__init__.py b/code/iottb-project/iottb/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/code/iottb-project/iottb/utils/logger_config.py b/code/iottb-project/iottb/utils/logger_config.py new file mode 100644 index 0000000..5cf76ad --- /dev/null +++ b/code/iottb-project/iottb/utils/logger_config.py @@ -0,0 +1,41 @@ +import logging +import sys +from logging.handlers import RotatingFileHandler + +from iottb import definitions +from iottb.definitions import MAX_VERBOSITY, CONSOLE_LOG_FORMATS, APP_NAME, LOGFILE_LOG_FORMAT + +loglevel = definitions.LOGLEVEL + + +def setup_logging(verbosity, debug=loglevel): + """ Setup root logger for iottb """ + log_level = loglevel + handlers = [] + date_format = '%Y-%m-%d %H:%M:%S' + if verbosity > 0: + log_level = logging.WARNING + if verbosity > MAX_VERBOSITY: + verbosity = MAX_VERBOSITY + log_level = logging.INFO + assert verbosity <= MAX_VERBOSITY, f'Verbosity must be <= {MAX_VERBOSITY}' + console_handler = logging.StreamHandler(sys.stdout) + print(str(sys.stdout)) + console_handler.setFormatter(logging.Formatter(CONSOLE_LOG_FORMATS[verbosity], datefmt=date_format)) + console_handler.setLevel(logging.DEBUG) # can keep at debug since it depends on global level? + handlers.append(console_handler) + + if debug: + log_level = logging.DEBUG + + # Logfile logs INFO+, no debugs though + file_handler = RotatingFileHandler(f'{str(definitions.LOGDIR / APP_NAME)}.log', maxBytes=10240, backupCount=5) + file_handler.setFormatter(logging.Formatter(LOGFILE_LOG_FORMAT[verbosity], datefmt=date_format)) + file_handler.setLevel(logging.INFO) + + # finnish root logger setup + handlers.append(file_handler) + # Force this config to be applied to root logger + logging.basicConfig(level=log_level, handlers=handlers, force=True) + + diff --git a/code/iottb-project/iottb/utils/string_processing.py b/code/iottb-project/iottb/utils/string_processing.py new file mode 100644 index 0000000..321e842 --- /dev/null +++ b/code/iottb-project/iottb/utils/string_processing.py @@ -0,0 +1,40 @@ +import re +from iottb import definitions +import logging + +logger = logging.getLogger(__name__) + + +def normalize_string(s, chars_to_replace=None, replacement=None, allow_unicode=False): + pass + + +def make_canonical_name(name): + """ + Normalize the device name to a canonical form: + - Replace the first two occurrences of spaces and transform characters with dashes. + - Remove any remaining spaces and non-ASCII characters. + - Convert to lowercase. + """ + aliases = [name] + logger.info(f'Normalizing name {name}') + + # We first normalize + chars_to_replace = definitions.REPLACEMENT_SET_CANONICAL_DEVICE_NAMES + pattern = re.compile('|'.join(re.escape(char) for char in chars_to_replace)) + norm_name = pattern.sub('-', name) + norm_name = re.sub(r'[^\x00-\x7F]+', '', norm_name) # removes non ascii chars + + aliases.append(norm_name) + # Lower case + norm_name = norm_name.lower() + aliases.append(norm_name) + + # canonical name is only first two parts of resulting string + parts = norm_name.split('-') + canonical_name = canonical_name = '-'.join(parts[:2]) + aliases.append(canonical_name) + + logger.debug(f'Canonical name: {canonical_name}') + logger.debug(f'Aliases: {aliases}') + return canonical_name, list(set(aliases)) diff --git a/code/iottb-project/iottb/utils/user_interaction.py b/code/iottb-project/iottb/utils/user_interaction.py new file mode 100644 index 0000000..767e286 --- /dev/null +++ b/code/iottb-project/iottb/utils/user_interaction.py @@ -0,0 +1,42 @@ +# iottb/utils/user_interaction.py + +import click +from iottb.definitions import TB_ECHO_STYLES +import sys +import os + + +def tb_echo2(msg: str, lvl='i', log=True): + style = TB_ECHO_STYLES.get(lvl, {}) + click.secho(f'[IOTTB]', **style) + click.secho(f'[IOTTB] \t {msg}', **style) + + +last_prefix = None + + +def tb_echo(msg: str, lvl='i', log=True): + global last_prefix + prefix = f'Testbed [{lvl.upper()}]\n' + + if last_prefix != prefix: + click.secho(prefix, nl=False, **TB_ECHO_STYLES['header']) + last_prefix = prefix + + click.secho(f' {msg}', **TB_ECHO_STYLES[lvl]) + + +def main(): + tb_echo('Info message', 'i') + tb_echo('Warning message', 'w') + tb_echo('Error message', 'e') + tb_echo('Success message', 's') + + +if __name__ == '__main__': + # arrrgggg hacky + current_dir = os.path.dirname(os.path.abspath(__file__)) + project_root = os.path.abspath(os.path.join(current_dir, '../../')) + sys.path.insert(0, project_root) + + main() diff --git a/code/iottb-project/poetry.lock b/code/iottb-project/poetry.lock new file mode 100644 index 0000000..b23fb0e --- /dev/null +++ b/code/iottb-project/poetry.lock @@ -0,0 +1,103 @@ +# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. + +[[package]] +name = "click" +version = "8.1.7" +description = "Composable command line interface toolkit" +optional = false +python-versions = ">=3.7" +files = [ + {file = "click-8.1.7-py3-none-any.whl", hash = "sha256:ae74fb96c20a0277a1d615f1e4d73c8414f5a98db8b799a7931d1582f3390c28"}, + {file = "click-8.1.7.tar.gz", hash = "sha256:ca9853ad459e787e2192211578cc907e7594e294c7ccc834310722b41b9ca6de"}, +] + +[package.dependencies] +colorama = {version = "*", markers = "platform_system == \"Windows\""} + +[[package]] +name = "colorama" +version = "0.4.6" +description = "Cross-platform colored terminal text." +optional = false +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7" +files = [ + {file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"}, + {file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"}, +] + +[[package]] +name = "iniconfig" +version = "2.0.0" +description = "brain-dead simple config-ini parsing" +optional = false +python-versions = ">=3.7" +files = [ + {file = "iniconfig-2.0.0-py3-none-any.whl", hash = "sha256:b6a85871a79d2e3b22d2d1b94ac2824226a63c6b741c88f7ae975f18b6778374"}, + {file = "iniconfig-2.0.0.tar.gz", hash = "sha256:2d91e135bf72d31a410b17c16da610a82cb55f6b0477d1a902134b24a455b8b3"}, +] + +[[package]] +name = "packaging" +version = "24.1" +description = "Core utilities for Python packages" +optional = false +python-versions = ">=3.8" +files = [ + {file = "packaging-24.1-py3-none-any.whl", hash = "sha256:5b8f2217dbdbd2f7f384c41c628544e6d52f2d0f53c6d0c3ea61aa5d1d7ff124"}, + {file = "packaging-24.1.tar.gz", hash = "sha256:026ed72c8ed3fcce5bf8950572258698927fd1dbda10a5e981cdf0ac37f4f002"}, +] + +[[package]] +name = "pluggy" +version = "1.5.0" +description = "plugin and hook calling mechanisms for python" +optional = false +python-versions = ">=3.8" +files = [ + {file = "pluggy-1.5.0-py3-none-any.whl", hash = "sha256:44e1ad92c8ca002de6377e165f3e0f1be63266ab4d554740532335b9d75ea669"}, + {file = "pluggy-1.5.0.tar.gz", hash = "sha256:2cffa88e94fdc978c4c574f15f9e59b7f4201d439195c3715ca9e2486f1d0cf1"}, +] + +[package.extras] +dev = ["pre-commit", "tox"] +testing = ["pytest", "pytest-benchmark"] + +[[package]] +name = "pytest" +version = "8.2.2" +description = "pytest: simple powerful testing with Python" +optional = false +python-versions = ">=3.8" +files = [ + {file = "pytest-8.2.2-py3-none-any.whl", hash = "sha256:c434598117762e2bd304e526244f67bf66bbd7b5d6cf22138be51ff661980343"}, + {file = "pytest-8.2.2.tar.gz", hash = "sha256:de4bb8104e201939ccdc688b27a89a7be2079b22e2bd2b07f806b6ba71117977"}, +] + +[package.dependencies] +colorama = {version = "*", markers = "sys_platform == \"win32\""} +iniconfig = "*" +packaging = "*" +pluggy = ">=1.5,<2.0" + +[package.extras] +dev = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"] + +[[package]] +name = "scapy" +version = "2.5.0" +description = "Scapy: interactive packet manipulation tool" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, <4" +files = [ + {file = "scapy-2.5.0.tar.gz", hash = "sha256:5b260c2b754fd8d409ba83ee7aee294ecdbb2c235f9f78fe90bc11cb6e5debc2"}, +] + +[package.extras] +basic = ["ipython"] +complete = ["cryptography (>=2.0)", "ipython", "matplotlib", "pyx"] +docs = ["sphinx (>=3.0.0)", "sphinx_rtd_theme (>=0.4.3)", "tox (>=3.0.0)"] + +[metadata] +lock-version = "2.0" +python-versions = "^3.12" +content-hash = "10b2c268b0f10db15eab2cca3d2dc9dc25bc60f4b218ebf786fb780fa85557e0" diff --git a/code/iottb-project/pyproject.toml b/code/iottb-project/pyproject.toml new file mode 100644 index 0000000..eda5a1f --- /dev/null +++ b/code/iottb-project/pyproject.toml @@ -0,0 +1,22 @@ +[tool.poetry] +name = "iottb" +version = "0.1.0" +description = "IoT Testbed" +authors = ["Sebastian Lenzlinger "] +readme = "README.md" +package-mode = false + +[tool.poetry.dependencies] +python = "^3.12" +click = "^8.1" +scapy = "^2.5" + +[tool.poetry.scripts] +iottb = "iottb.main:cli" + +[tool.poetry.group.test.dependencies] +pytest = "^8.2.2" + +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" diff --git a/code/iottb-project/tests/__init__.py b/code/iottb-project/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/code/iottb-project/tests/test_make_canonical_name.py b/code/iottb-project/tests/test_make_canonical_name.py new file mode 100644 index 0000000..eac541c --- /dev/null +++ b/code/iottb-project/tests/test_make_canonical_name.py @@ -0,0 +1,23 @@ +from iottb.utils.string_processing import make_canonical_name + +import pytest + + +class TestMakeCanonicalName: + + def test_normalizes_name_with_spaces_to_dashes(self): + name = "Device Name With Spaces" + expected_canonical_name = "device-name" + canonical_name, aliases = make_canonical_name(name) + assert canonical_name == expected_canonical_name + assert "device-name-with-spaces" in aliases + assert "device-name" in aliases + assert "Device Name With Spaces" in aliases + + def test_name_with_no_spaces_or_special_characters(self): + name = "DeviceName123" + expected_canonical_name = "devicename123" + canonical_name, aliases = make_canonical_name(name) + assert canonical_name == expected_canonical_name + assert "DeviceName123" in aliases + assert "devicename123" in aliases