From 0d16d5b6a2890788d87e5690b7bb21404b02de49 Mon Sep 17 00:00:00 2001 From: Sebastian Lenzlinger Date: Fri, 28 Jun 2024 17:29:23 +0200 Subject: [PATCH] Add add-device subcommand. --- iottb/commands/add_device.py | 152 +++++++++++++++++++++++++++ iottb/commands/initialize_testbed.py | 80 +++++++++++--- iottb/contexts.py | 124 ++++++++++++++++++---- iottb/definitions.py | 9 ++ iottb/main.py | 37 ++++++- iottb/utils/string_processing.py | 6 ++ 6 files changed, 375 insertions(+), 33 deletions(-) create mode 100644 iottb/commands/add_device.py create mode 100644 iottb/utils/string_processing.py diff --git a/iottb/commands/add_device.py b/iottb/commands/add_device.py new file mode 100644 index 0000000..8dbb071 --- /dev/null +++ b/iottb/commands/add_device.py @@ -0,0 +1,152 @@ +import json + +import click +from pathlib import Path +import logging +import re + +from iottb import definitions +from iottb.contexts import DeviceMetadata, IottbConfig +from iottb.definitions import CFG_FILE_PATH + +logger = logging.getLogger(__name__) + + +def add_device_guided(ctx, cn, db): + click.echo('TODO: Implement') + logger.info('Adding device interactively') + #logger.debug(f'Parameters: {params}. value: {value}') + +@click.command('add-device', help='Add a device to a database') +@click.option('--dev', '--device-name', type=str, required=True, + help='The name of the device to be added. If this string contains spaces or other special characters \ + normalization is performed to derive a canonical name') +@click.option('--db', '--database', type=click.Path(exists=True, file_okay=False, dir_okay=True, path_type=Path), + envvar='IOTTB_DB', show_envvar=True, + help='Database in which to add this device. If not specified use default from config.') +@click.option('--guided', is_flag=True, default=False, show_default=True, envvar='IOTTB_GUIDED_ADD', show_envvar=True, + help='Add device interactively') +def add_device(dev, db, guided): + """Add a new device to a database + + Device name must be supplied unless in an interactive setup. Database is taken from config by default. + """ + logger.info('add-device invoked') + + # Step 1: Load Config + # Dependency: Config file must exist + config = IottbConfig(Path(CFG_FILE_PATH)) + logger.debug(f'Config loaded: {config}') + + # Step 2: Load database + # dependency: Database folder must exist + if db: + database = db + path = config.db_path_dict + logger.debug(f'Resolved (path, db) {path}, {database}') + else: + path = config.default_db_location + database = config.default_database + logger.debug(f'Default (path, db) {path}, {database}') + click.secho(f'Using database {database}') + full_db_path = Path(path) / database + if not full_db_path.is_dir(): + logger.warning(f'No database at {database}') + click.echo(f'Could not find a database.') + click.echo(f'You need to initialize the testbed before before you add devices!') + click.echo(f'To initialize the testbed in the default location run "iottb init-db"') + click.echo('Exiting...') + exit() + + # Step 3: Check if device already exists in database + # dependency: DeviceMetadata object + device_metadata = DeviceMetadata(device_name=dev) + device_dir = full_db_path / device_metadata.canonical_name + + # Check if device is already registered + if device_dir.exists(): + logger.warning(f'Device directory {device_dir} already exists.') + click.echo(f'Device {dev} already exists in the database.') + click.echo('Exiting...') + exit() + try: + device_dir.mkdir() + except OSError as e: + logger.error(f'Error trying to create device {e}') + click.echo('Exiting...') + exit() + + # Step 4: Save metadata into device_dir + metadata_path = device_dir / definitions.DEVICE_METADATA_FILE_NAME + with metadata_path.open('w') as metadata_file: + json.dump(device_metadata.__dict__, metadata_file, indent=4) + click.echo(f'Successfully added device {dev} to database') + logger.debug(f'Added device {dev} to database {database}. Full path of metadata {metadata_path}') + logger.info(f'Metadata for {dev} {device_metadata.print_attributes()}') + + +# @click.command('add-device', help='Add a device to a database') +# @click.option('-d', '--dev', '--device-name', type=str, required=True, +# help='The name of the device to be added. If this string contains spaces or other special characters \ +# normalization is performed to derive a canonical name') +# @click.option('--db', '--database', type=click.Path(exists=True, file_okay=False, dir_okay=True, path_type=Path), +# envvar='IOTTB_DB', show_envvar=True, +# help='Name of in which to add this device. If not specified use default from config.') +# @click.option('--guided', is_flag=True, default=False, show_default=True, envvar='IOTTB_GUIDED_ADD', show_envvar=True, +# help='Add device interactively') +# # @click.option('-m', '--model', default="", help='Model of the device') +# # @click.option('--manufacturer', default="", help='Manufacturer of the device') +# # @click.option('--firmware-version', default="", help='Current firmware version of the device') +# # @click.option('--device-type', default="", help='Type of the device') +# # @click.option('-i', '--interfaces', default="", help='Supported interfaces of the device') +# # @click.option('--apps', '--companion-applications', default="", help='Companion applications of the device') +# # @click.option('--desc', '--description', default="", help='Description of the device') +# @click.pass_context +# def add_device_inactive(ctx, dev, db, guided): +# """Add a new device to a database +# +# Device name must be supplied unless in an interactive setup. Database is taken from config by default. +# """ +# logger.info('add-device invoked') +# config = ctx.obj['CONFIG'] +# logger.debug(f'{str(config)}') +# database = None +# # Setep1: Determine the current db +# if db: +# db_path = str(config.get_database_path) +# database = str(config.get_full_default_path()) +# logger.debug(f'database: {database}. variable type: {type(database)}') +# click.echo(f'No db specified, using default database') +# if not Path(database).is_dir(): +# logger.warning(f'No database at {database}') +# click.echo(f'Could not find a database.') +# click.echo(f'You need to initialize the testbed before before you add devices!') +# click.echo(f'To initialize the testbed in the default location run "iottb init-db"') +# exit() +# +# # Step 2: Check if device already exists +# if guided: +# add_device_guided(ctx, dev, database) +# else: +# device_metadata = DeviceMetadata(device_name=dev) +# device_metadata.print_attributes() +# click.echo('TODO: Unguided device add path') +# +# logger.info(f'Device {dev} added.') + + +def normalize_device_name(name): + """Normalizes the device name to get a shorter representation which is easier to use at the command line. + + This function derives a device name which can be more easily specified at the command line. + The first two occurrences of white space are turned into a dash, the rest of the name is dropped. + Name should be an ASCII string. + """ + logger.info(f'Normalizing name {name}') + chars_to_replace = definitions.REPLACEMENT_SET_CANONICAL_DEVICE_NAMES + pattern = re.compile('|'.join(re.escape(char) for char in chars_to_replace)) + norm_name = pattern.sub('-', name, count=2) + logger.debug(f'Name after first subst: {norm_name}') + norm_name = pattern.sub('', norm_name) + logger.debug(f'Fully normalized name: {norm_name}') + return norm_name diff --git a/iottb/commands/initialize_testbed.py b/iottb/commands/initialize_testbed.py index 114a0c7..56493a6 100644 --- a/iottb/commands/initialize_testbed.py +++ b/iottb/commands/initialize_testbed.py @@ -10,8 +10,8 @@ logger = logging.getLogger(__name__) @click.command() -@click.option('-d', '--dest', type=Path, help='Location to put (new) iottb database') -@click.option('--name', default=DB_NAME, type=str, help='Name of new database.') +@click.option('-d', '--dest', type=click.Path(), help='Location to put (new) iottb database') +@click.option('-n', '--name', default=DB_NAME, type=str, help='Name of new database.') @click.option('--update-default/--no-update-default', default=True, help='If new db should be set as the new default') @click.pass_context def init_db(ctx, dest, name, update_default): @@ -20,25 +20,81 @@ def init_db(ctx, dest, name, update_default): logger.debug(f'str(config)') # Use the default path from config if dest is not provided known_dbs = config.get_known_databases() + logger.debug(f'Known databases: {known_dbs}') if name in known_dbs: - click.echo(f'A database {name} already exists.') - logger.info(f'Exiting...') - exit() + dest = config.get_database_location(name) + if Path(dest).joinpath(name).is_dir(): + click.echo(f'A database {name} already exists.') + logger.debug(f'DB {name} exists in {dest}') + click.echo(f'Exiting...') + exit() + logger.debug(f'DB name {name} registered but does not exist.') if not dest: logger.info('No dest set, choosing default destination.') - dest = Path(config.default_path).parent + dest = Path(config.default_db_location).parent - db_path = dest / name - logger.debug(f'Full path for db {db_path}') + db_path = Path(dest).joinpath(name) + logger.debug(f'Full path for db {str(db_path)}') # Create the directory if it doesn't exist db_path.mkdir(parents=True, exist_ok=True) - logger.info(f"Created directory {db_path.parent} for the database") + logger.info(f"mkdir {db_path} successful") + click.echo(f'Created {db_path}') + + # Update configuration + config.set_database_location(name, str(dest)) + if update_default: + config.set_default_database(name, str(dest)) + config.save_config() + logger.info(f"Updated configuration with database {name} at {db_path}") + + +@click.command() +@click.option('-d', '--dest', type=click.Path(), help='Location to put (new) iottb database') +@click.option('-n', '--name', default=DB_NAME, type=str, help='Name of new database.') +@click.option('--update-default/--no-update-default', default=True, help='If new db should be set as the new default') +@click.pass_context +def init_db_inactive(ctx, dest, name, update_default): + logger.info('init-db invoked') + config = ctx.obj['CONFIG'] + logger.debug(f'str(config)') + + # Retrieve known databases + known_dbs = config.get_known_databases() + + # Determine destination path + if name in known_dbs: + dest = Path(config.get_database_location(name)) + if dest.joinpath(name).is_dir(): + click.echo(f'A database {name} already exists.') + logger.debug(f'DB {name} exists in {dest}') + click.echo(f'Exiting...') + exit() + logger.debug(f'DB name {name} registered but does not exist.') + elif not dest: + logger.info('No destination set, using default path from config.') + dest = Path(config.default_db_location).parent + + # Ensure destination path is absolute + dest = dest.resolve() + + # Combine destination path with database name + db_path = dest / name + logger.debug(f'Full path for database: {str(db_path)}') + + # Create the directory if it doesn't exist + try: + db_path.mkdir(parents=True, exist_ok=True) + logger.info(f'Directory {db_path} created successfully.') + click.echo(f'Created {db_path}') + except Exception as e: + logger.error(f'Failed to create directory {db_path}: {e}') + click.echo(f'Failed to create directory {db_path}: {e}', err=True) + exit(1) # Update configuration config.set_database_location(name, str(db_path)) if update_default: config.set_default_database(name, str(db_path)) config.save_config() - logger.info(f"Updated configuration with database {name} at {db_path}") - - + logger.info(f'Updated configuration with database {name} at {db_path}') + click.echo(f'Updated configuration with database {name} at {db_path}') diff --git a/iottb/contexts.py b/iottb/contexts.py index 9d5b7b8..cb0ffea 100644 --- a/iottb/contexts.py +++ b/iottb/contexts.py @@ -1,6 +1,14 @@ import json from pathlib import Path import logging +import json +from pathlib import Path +from datetime import datetime +import uuid +import unicodedata +import re + +import click from iottb import definitions @@ -25,23 +33,23 @@ class IottbConfig: IottbConfig.warn() self.cfg_file = Path(cfg_file) self.default_database = None - self.default_path = None - self.DatabaseLocationMap = {} + self.default_db_location = None + self.db_path_dict = dict() self.load_config() def create_default_config(self): """Create default iottb config file.""" logger.info(f'Creating default config file at {self.cfg_file}') self.default_database = DB_NAME - self.default_path = str(Path.home() / DB_NAME) - self.DatabaseLocationMap = { - DB_NAME: self.default_path + self.default_db_location = str(Path.home()) + self.db_path_dict = { + DB_NAME: self.default_db_location } defaults = { 'DefaultDatabase': self.default_database, - 'DefaultDatabasePath': self.default_path, - 'DatabaseLocations': self.DatabaseLocationMap + 'DefaultDatabasePath': self.default_db_location, + 'DatabaseLocations': self.db_path_dict } try: @@ -63,15 +71,15 @@ class IottbConfig: with self.cfg_file.open('r') as config_file: data = json.load(config_file) self.default_database = data.get('DefaultDatabase') - self.default_path = data.get('DefaultDatabasePath') - self.DatabaseLocationMap = data.get('DatabaseLocations', {}) + self.default_db_location = data.get('DefaultDatabasePath') + self.db_path_dict = data.get('DatabaseLocations', {}) def save_config(self): """Save the current configuration to the config file.""" data = { 'DefaultDatabase': self.default_database, - 'DefaultDatabasePath': self.default_path, - 'DatabaseLocations': self.DatabaseLocationMap + 'DefaultDatabasePath': self.default_db_location, + 'DatabaseLocations': self.db_path_dict } try: with self.cfg_file.open('w') as config_file: @@ -83,29 +91,107 @@ class IottbConfig: def set_default_database(self, name, path): """Set the default database and its path.""" self.default_database = name - self.default_path = path - self.DatabaseLocationMap[name] = path + self.default_db_location = path + self.db_path_dict[name] = path + + def get_default_database_location(self): + return self.default_db_location + + def get_default_database(self): + return self.default_database def get_database_location(self, name): """Get the location of a specific database.""" - return self.DatabaseLocationMap.get(name) + return self.db_path_dict.get(name) def set_database_location(self, name, path): """Set the location for a database.""" - self.DatabaseLocationMap[name] = path + logger.debug(f'Type of "path" parameter {type(path)}') + logger.debug(f'String value of "path" parameter {str(path)}') + logger.debug(f'Type of "name" parameter {type(name)}') + logger.debug(f'String value of "name" parameter {str(name)}') + path = Path(path) + name = Path(name) + logger.debug(f'path:name = {path}:{name}') + if path.name == name: + path = path.parent + self.db_path_dict[str(name)] = str(path) def get_known_databases(self): """Get the set of known databases""" logger.info(f'Getting known databases.') - return self.DatabaseLocationMap.keys() + return self.db_path_dict.keys() def get_know_database_paths(self): """Get the paths of all known databases""" logger.info(f'Getting known database paths.') - return self.DatabaseLocationMap.values() + return self.db_path_dict.values() + + def get_full_default_path(self): + return Path(self.default_db_location) / self.default_database -# TODO: Know issue: class Database: - pass + + def __init__(self, name, path): + self.name = name + self.path = path + self.device_list = [] # List of the canonical names of devices registered in this database + + +class DeviceMetadata: + def __init__(self, device_name, description="", model="", manufacturer="", firmware_version="", device_type="", + supported_interfaces="", companion_applications="", save_to_file=None): + self.device_id = str(uuid.uuid4()) + self.device_name = device_name + cn, aliases = make_canonical_name(device_name) + logger.debug(f'cn, aliases = {cn}, {str(aliases)}') + self.aliases = aliases + self.canonical_name = cn + self.date_added = datetime.now().isoformat() + self.description = description + self.model = model + self.manufacturer = manufacturer + self.current_firmware_version = firmware_version + self.device_type = device_type + self.supported_interfaces = supported_interfaces + self.companion_applications = companion_applications + self.last_metadata_update = datetime.now().isoformat() + if save_to_file is not None: + click.echo('TODO: Implement saving config to file after creation!') + + def add_alias(self, alias: str = ""): + if alias == "": + return + self.aliases.append(alias) + + def get_canonical_name(self): + return self.canonical_name + + def print_attributes(self): + print(f'Printing attribute value pairs in {__name__}') + for attr, value in self.__dict__.items(): + print(f'{attr}: {value}') + + +def make_canonical_name(name): + """ + Normalize the device name to a canonical form: + - Replace the first two occurrences of spaces and transform characters with dashes. + - Remove any remaining spaces and non-ASCII characters. + - Convert to lowercase. + """ + aliases = [name] + logger.info(f'Normalizing name {name}') + chars_to_replace = definitions.REPLACEMENT_SET_CANONICAL_DEVICE_NAMES + pattern = re.compile('|'.join(re.escape(char) for char in chars_to_replace)) + norm_name = pattern.sub('-', name, count=2) + aliases.append(norm_name) + logger.debug(f'Name after first subst: {norm_name}') + norm_name = pattern.sub('', norm_name) + aliases.append(norm_name) + logger.debug(f'Fully normalized name: {norm_name}') + norm_name = norm_name.lower() + aliases.append(norm_name) + return norm_name, list(set(aliases)) diff --git a/iottb/definitions.py b/iottb/definitions.py index c3a0780..db86d51 100644 --- a/iottb/definitions.py +++ b/iottb/definitions.py @@ -22,3 +22,12 @@ assert len(LOGFILE_LOG_FORMAT) == len(CONSOLE_LOG_FORMATS), 'Log formats must be LOGLEVEL = logging.DEBUG LOGDIR = Path.cwd() / 'logs' + +# Characters to just replace +REPLACEMENT_SET_CANONICAL_DEVICE_NAMES = {' ', '_', } +# Characters to possibly error on +ERROR_SET_CANONICAL_DEVICE_NAMES = {',', '!', '@', '#', '$', '%', '^', '&', '*', '(', ')', '+', '=', '{', '}', '[', ']', + '|', + '\\', ':', ';', '"', "'", '<', '>', '?', '/', '`', '~'} + +DEVICE_METADATA_FILE_NAME = 'device_metadata.json' diff --git a/iottb/main.py b/iottb/main.py index 8d014e1..4c0efdc 100644 --- a/iottb/main.py +++ b/iottb/main.py @@ -10,6 +10,7 @@ from iottb.utils.logger_config import setup_logging from iottb import definitions from iottb.contexts import IottbConfig from iottb.commands.initialize_testbed import init_db +from iottb.commands.add_device import add_device ############################################################################ # Module shortcuts for global definitions @@ -40,6 +41,7 @@ def cli(ctx, verbosity, debug, cfg_file): ctx.ensure_object(dict) # Make sure context is ready for use logger.info("Starting execution.") ctx.obj['CONFIG'] = IottbConfig(cfg_file) # Load configuration directly + # ctx.meta['FULL_PATH_CONFIG_FILE'] = @click.command() @@ -85,6 +87,35 @@ def rm_dbs(dbs): logger.info(f'All databases deleted') +@click.command('show-cfg', help='Show the current configuration context') +@click.option('--cfg-file', type=click.Path(), default=CFG_FILE_PATH, help='Path to the config file') +@click.option('-pp', is_flag=True, default=False, help='Pretty Print') +@click.pass_context +def show_cfg(ctx, cfg_file, pp): + logger.debug(f'Pretty print option set to {pp}') + if pp: + try: + config = IottbConfig(Path(cfg_file)) + click.echo("Configuration Context:") + click.echo(f"Default Database: {config.default_database}") + click.echo(f"Default Database Path: {config.default_db_location}") + click.echo("Database Locations:") + for db_name, db_path in config.db_path_dict.items(): + click.echo(f" - {db_name}: {db_path}") + except Exception as e: + logger.error(f"Error loading configuration: {e}") + click.echo(f"Failed to load configuration from {cfg_file}") + else: + path = Path(cfg_file) + + if path.is_file(): + with path.open('r') as file: + content = file.read() + click.echo(content) + else: + click.echo(f"Configuration file not found at {cfg_file}") + + ################################################################################## # Add all subcommands to group here ################################################################################# @@ -93,6 +124,8 @@ cli.add_command(init_db) cli.add_command(rm_cfg) cli.add_command(set_key_in_table_to) cli.add_command(rm_dbs) - +# noinspection PyTypeChecker +cli.add_command(add_device) +cli.add_command(show_cfg) if __name__ == '__main__': - cli(auto_envvar_prefix='IOTTB') + cli(auto_envvar_prefix='IOTTB', show_default=True, show_envvars=True) diff --git a/iottb/utils/string_processing.py b/iottb/utils/string_processing.py new file mode 100644 index 0000000..a18d095 --- /dev/null +++ b/iottb/utils/string_processing.py @@ -0,0 +1,6 @@ +import re +from iottb import definitions + + +def normalize_string(s, chars_to_replace=None, replacement=None, allow_unicode=False): + pass