Add add-device subcommand.

This commit is contained in:
Sebastian Lenzlinger 2024-06-28 17:29:23 +02:00
parent 30e3c17920
commit 0d16d5b6a2
6 changed files with 375 additions and 33 deletions

View File

@ -0,0 +1,152 @@
import json
import click
from pathlib import Path
import logging
import re
from iottb import definitions
from iottb.contexts import DeviceMetadata, IottbConfig
from iottb.definitions import CFG_FILE_PATH
logger = logging.getLogger(__name__)
def add_device_guided(ctx, cn, db):
click.echo('TODO: Implement')
logger.info('Adding device interactively')
#logger.debug(f'Parameters: {params}. value: {value}')
@click.command('add-device', help='Add a device to a database')
@click.option('--dev', '--device-name', type=str, required=True,
help='The name of the device to be added. If this string contains spaces or other special characters \
normalization is performed to derive a canonical name')
@click.option('--db', '--database', type=click.Path(exists=True, file_okay=False, dir_okay=True, path_type=Path),
envvar='IOTTB_DB', show_envvar=True,
help='Database in which to add this device. If not specified use default from config.')
@click.option('--guided', is_flag=True, default=False, show_default=True, envvar='IOTTB_GUIDED_ADD', show_envvar=True,
help='Add device interactively')
def add_device(dev, db, guided):
"""Add a new device to a database
Device name must be supplied unless in an interactive setup. Database is taken from config by default.
"""
logger.info('add-device invoked')
# Step 1: Load Config
# Dependency: Config file must exist
config = IottbConfig(Path(CFG_FILE_PATH))
logger.debug(f'Config loaded: {config}')
# Step 2: Load database
# dependency: Database folder must exist
if db:
database = db
path = config.db_path_dict
logger.debug(f'Resolved (path, db) {path}, {database}')
else:
path = config.default_db_location
database = config.default_database
logger.debug(f'Default (path, db) {path}, {database}')
click.secho(f'Using database {database}')
full_db_path = Path(path) / database
if not full_db_path.is_dir():
logger.warning(f'No database at {database}')
click.echo(f'Could not find a database.')
click.echo(f'You need to initialize the testbed before before you add devices!')
click.echo(f'To initialize the testbed in the default location run "iottb init-db"')
click.echo('Exiting...')
exit()
# Step 3: Check if device already exists in database
# dependency: DeviceMetadata object
device_metadata = DeviceMetadata(device_name=dev)
device_dir = full_db_path / device_metadata.canonical_name
# Check if device is already registered
if device_dir.exists():
logger.warning(f'Device directory {device_dir} already exists.')
click.echo(f'Device {dev} already exists in the database.')
click.echo('Exiting...')
exit()
try:
device_dir.mkdir()
except OSError as e:
logger.error(f'Error trying to create device {e}')
click.echo('Exiting...')
exit()
# Step 4: Save metadata into device_dir
metadata_path = device_dir / definitions.DEVICE_METADATA_FILE_NAME
with metadata_path.open('w') as metadata_file:
json.dump(device_metadata.__dict__, metadata_file, indent=4)
click.echo(f'Successfully added device {dev} to database')
logger.debug(f'Added device {dev} to database {database}. Full path of metadata {metadata_path}')
logger.info(f'Metadata for {dev} {device_metadata.print_attributes()}')
# @click.command('add-device', help='Add a device to a database')
# @click.option('-d', '--dev', '--device-name', type=str, required=True,
# help='The name of the device to be added. If this string contains spaces or other special characters \
# normalization is performed to derive a canonical name')
# @click.option('--db', '--database', type=click.Path(exists=True, file_okay=False, dir_okay=True, path_type=Path),
# envvar='IOTTB_DB', show_envvar=True,
# help='Name of in which to add this device. If not specified use default from config.')
# @click.option('--guided', is_flag=True, default=False, show_default=True, envvar='IOTTB_GUIDED_ADD', show_envvar=True,
# help='Add device interactively')
# # @click.option('-m', '--model', default="", help='Model of the device')
# # @click.option('--manufacturer', default="", help='Manufacturer of the device')
# # @click.option('--firmware-version', default="", help='Current firmware version of the device')
# # @click.option('--device-type', default="", help='Type of the device')
# # @click.option('-i', '--interfaces', default="", help='Supported interfaces of the device')
# # @click.option('--apps', '--companion-applications', default="", help='Companion applications of the device')
# # @click.option('--desc', '--description', default="", help='Description of the device')
# @click.pass_context
# def add_device_inactive(ctx, dev, db, guided):
# """Add a new device to a database
#
# Device name must be supplied unless in an interactive setup. Database is taken from config by default.
# """
# logger.info('add-device invoked')
# config = ctx.obj['CONFIG']
# logger.debug(f'{str(config)}')
# database = None
# # Setep1: Determine the current db
# if db:
# db_path = str(config.get_database_path)
# database = str(config.get_full_default_path())
# logger.debug(f'database: {database}. variable type: {type(database)}')
# click.echo(f'No db specified, using default database')
# if not Path(database).is_dir():
# logger.warning(f'No database at {database}')
# click.echo(f'Could not find a database.')
# click.echo(f'You need to initialize the testbed before before you add devices!')
# click.echo(f'To initialize the testbed in the default location run "iottb init-db"')
# exit()
#
# # Step 2: Check if device already exists
# if guided:
# add_device_guided(ctx, dev, database)
# else:
# device_metadata = DeviceMetadata(device_name=dev)
# device_metadata.print_attributes()
# click.echo('TODO: Unguided device add path')
#
# logger.info(f'Device {dev} added.')
def normalize_device_name(name):
"""Normalizes the device name to get a shorter representation which is easier to use at the command line.
This function derives a device name which can be more easily specified at the command line.
The first two occurrences of white space are turned into a dash, the rest of the name is dropped.
Name should be an ASCII string.
"""
logger.info(f'Normalizing name {name}')
chars_to_replace = definitions.REPLACEMENT_SET_CANONICAL_DEVICE_NAMES
pattern = re.compile('|'.join(re.escape(char) for char in chars_to_replace))
norm_name = pattern.sub('-', name, count=2)
logger.debug(f'Name after first subst: {norm_name}')
norm_name = pattern.sub('', norm_name)
logger.debug(f'Fully normalized name: {norm_name}')
return norm_name

View File

@ -10,8 +10,8 @@ logger = logging.getLogger(__name__)
@click.command() @click.command()
@click.option('-d', '--dest', type=Path, help='Location to put (new) iottb database') @click.option('-d', '--dest', type=click.Path(), help='Location to put (new) iottb database')
@click.option('--name', default=DB_NAME, type=str, help='Name of new database.') @click.option('-n', '--name', default=DB_NAME, type=str, help='Name of new database.')
@click.option('--update-default/--no-update-default', default=True, help='If new db should be set as the new default') @click.option('--update-default/--no-update-default', default=True, help='If new db should be set as the new default')
@click.pass_context @click.pass_context
def init_db(ctx, dest, name, update_default): def init_db(ctx, dest, name, update_default):
@ -20,25 +20,81 @@ def init_db(ctx, dest, name, update_default):
logger.debug(f'str(config)') logger.debug(f'str(config)')
# Use the default path from config if dest is not provided # Use the default path from config if dest is not provided
known_dbs = config.get_known_databases() known_dbs = config.get_known_databases()
logger.debug(f'Known databases: {known_dbs}')
if name in known_dbs: if name in known_dbs:
click.echo(f'A database {name} already exists.') dest = config.get_database_location(name)
logger.info(f'Exiting...') if Path(dest).joinpath(name).is_dir():
exit() click.echo(f'A database {name} already exists.')
logger.debug(f'DB {name} exists in {dest}')
click.echo(f'Exiting...')
exit()
logger.debug(f'DB name {name} registered but does not exist.')
if not dest: if not dest:
logger.info('No dest set, choosing default destination.') logger.info('No dest set, choosing default destination.')
dest = Path(config.default_path).parent dest = Path(config.default_db_location).parent
db_path = dest / name db_path = Path(dest).joinpath(name)
logger.debug(f'Full path for db {db_path}') logger.debug(f'Full path for db {str(db_path)}')
# Create the directory if it doesn't exist # Create the directory if it doesn't exist
db_path.mkdir(parents=True, exist_ok=True) db_path.mkdir(parents=True, exist_ok=True)
logger.info(f"Created directory {db_path.parent} for the database") logger.info(f"mkdir {db_path} successful")
click.echo(f'Created {db_path}')
# Update configuration
config.set_database_location(name, str(dest))
if update_default:
config.set_default_database(name, str(dest))
config.save_config()
logger.info(f"Updated configuration with database {name} at {db_path}")
@click.command()
@click.option('-d', '--dest', type=click.Path(), help='Location to put (new) iottb database')
@click.option('-n', '--name', default=DB_NAME, type=str, help='Name of new database.')
@click.option('--update-default/--no-update-default', default=True, help='If new db should be set as the new default')
@click.pass_context
def init_db_inactive(ctx, dest, name, update_default):
logger.info('init-db invoked')
config = ctx.obj['CONFIG']
logger.debug(f'str(config)')
# Retrieve known databases
known_dbs = config.get_known_databases()
# Determine destination path
if name in known_dbs:
dest = Path(config.get_database_location(name))
if dest.joinpath(name).is_dir():
click.echo(f'A database {name} already exists.')
logger.debug(f'DB {name} exists in {dest}')
click.echo(f'Exiting...')
exit()
logger.debug(f'DB name {name} registered but does not exist.')
elif not dest:
logger.info('No destination set, using default path from config.')
dest = Path(config.default_db_location).parent
# Ensure destination path is absolute
dest = dest.resolve()
# Combine destination path with database name
db_path = dest / name
logger.debug(f'Full path for database: {str(db_path)}')
# Create the directory if it doesn't exist
try:
db_path.mkdir(parents=True, exist_ok=True)
logger.info(f'Directory {db_path} created successfully.')
click.echo(f'Created {db_path}')
except Exception as e:
logger.error(f'Failed to create directory {db_path}: {e}')
click.echo(f'Failed to create directory {db_path}: {e}', err=True)
exit(1)
# Update configuration # Update configuration
config.set_database_location(name, str(db_path)) config.set_database_location(name, str(db_path))
if update_default: if update_default:
config.set_default_database(name, str(db_path)) config.set_default_database(name, str(db_path))
config.save_config() config.save_config()
logger.info(f"Updated configuration with database {name} at {db_path}") logger.info(f'Updated configuration with database {name} at {db_path}')
click.echo(f'Updated configuration with database {name} at {db_path}')

View File

@ -1,6 +1,14 @@
import json import json
from pathlib import Path from pathlib import Path
import logging import logging
import json
from pathlib import Path
from datetime import datetime
import uuid
import unicodedata
import re
import click
from iottb import definitions from iottb import definitions
@ -25,23 +33,23 @@ class IottbConfig:
IottbConfig.warn() IottbConfig.warn()
self.cfg_file = Path(cfg_file) self.cfg_file = Path(cfg_file)
self.default_database = None self.default_database = None
self.default_path = None self.default_db_location = None
self.DatabaseLocationMap = {} self.db_path_dict = dict()
self.load_config() self.load_config()
def create_default_config(self): def create_default_config(self):
"""Create default iottb config file.""" """Create default iottb config file."""
logger.info(f'Creating default config file at {self.cfg_file}') logger.info(f'Creating default config file at {self.cfg_file}')
self.default_database = DB_NAME self.default_database = DB_NAME
self.default_path = str(Path.home() / DB_NAME) self.default_db_location = str(Path.home())
self.DatabaseLocationMap = { self.db_path_dict = {
DB_NAME: self.default_path DB_NAME: self.default_db_location
} }
defaults = { defaults = {
'DefaultDatabase': self.default_database, 'DefaultDatabase': self.default_database,
'DefaultDatabasePath': self.default_path, 'DefaultDatabasePath': self.default_db_location,
'DatabaseLocations': self.DatabaseLocationMap 'DatabaseLocations': self.db_path_dict
} }
try: try:
@ -63,15 +71,15 @@ class IottbConfig:
with self.cfg_file.open('r') as config_file: with self.cfg_file.open('r') as config_file:
data = json.load(config_file) data = json.load(config_file)
self.default_database = data.get('DefaultDatabase') self.default_database = data.get('DefaultDatabase')
self.default_path = data.get('DefaultDatabasePath') self.default_db_location = data.get('DefaultDatabasePath')
self.DatabaseLocationMap = data.get('DatabaseLocations', {}) self.db_path_dict = data.get('DatabaseLocations', {})
def save_config(self): def save_config(self):
"""Save the current configuration to the config file.""" """Save the current configuration to the config file."""
data = { data = {
'DefaultDatabase': self.default_database, 'DefaultDatabase': self.default_database,
'DefaultDatabasePath': self.default_path, 'DefaultDatabasePath': self.default_db_location,
'DatabaseLocations': self.DatabaseLocationMap 'DatabaseLocations': self.db_path_dict
} }
try: try:
with self.cfg_file.open('w') as config_file: with self.cfg_file.open('w') as config_file:
@ -83,29 +91,107 @@ class IottbConfig:
def set_default_database(self, name, path): def set_default_database(self, name, path):
"""Set the default database and its path.""" """Set the default database and its path."""
self.default_database = name self.default_database = name
self.default_path = path self.default_db_location = path
self.DatabaseLocationMap[name] = path self.db_path_dict[name] = path
def get_default_database_location(self):
return self.default_db_location
def get_default_database(self):
return self.default_database
def get_database_location(self, name): def get_database_location(self, name):
"""Get the location of a specific database.""" """Get the location of a specific database."""
return self.DatabaseLocationMap.get(name) return self.db_path_dict.get(name)
def set_database_location(self, name, path): def set_database_location(self, name, path):
"""Set the location for a database.""" """Set the location for a database."""
self.DatabaseLocationMap[name] = path logger.debug(f'Type of "path" parameter {type(path)}')
logger.debug(f'String value of "path" parameter {str(path)}')
logger.debug(f'Type of "name" parameter {type(name)}')
logger.debug(f'String value of "name" parameter {str(name)}')
path = Path(path)
name = Path(name)
logger.debug(f'path:name = {path}:{name}')
if path.name == name:
path = path.parent
self.db_path_dict[str(name)] = str(path)
def get_known_databases(self): def get_known_databases(self):
"""Get the set of known databases""" """Get the set of known databases"""
logger.info(f'Getting known databases.') logger.info(f'Getting known databases.')
return self.DatabaseLocationMap.keys() return self.db_path_dict.keys()
def get_know_database_paths(self): def get_know_database_paths(self):
"""Get the paths of all known databases""" """Get the paths of all known databases"""
logger.info(f'Getting known database paths.') logger.info(f'Getting known database paths.')
return self.DatabaseLocationMap.values() return self.db_path_dict.values()
def get_full_default_path(self):
return Path(self.default_db_location) / self.default_database
# TODO: Know issue:
class Database: class Database:
pass
def __init__(self, name, path):
self.name = name
self.path = path
self.device_list = [] # List of the canonical names of devices registered in this database
class DeviceMetadata:
def __init__(self, device_name, description="", model="", manufacturer="", firmware_version="", device_type="",
supported_interfaces="", companion_applications="", save_to_file=None):
self.device_id = str(uuid.uuid4())
self.device_name = device_name
cn, aliases = make_canonical_name(device_name)
logger.debug(f'cn, aliases = {cn}, {str(aliases)}')
self.aliases = aliases
self.canonical_name = cn
self.date_added = datetime.now().isoformat()
self.description = description
self.model = model
self.manufacturer = manufacturer
self.current_firmware_version = firmware_version
self.device_type = device_type
self.supported_interfaces = supported_interfaces
self.companion_applications = companion_applications
self.last_metadata_update = datetime.now().isoformat()
if save_to_file is not None:
click.echo('TODO: Implement saving config to file after creation!')
def add_alias(self, alias: str = ""):
if alias == "":
return
self.aliases.append(alias)
def get_canonical_name(self):
return self.canonical_name
def print_attributes(self):
print(f'Printing attribute value pairs in {__name__}')
for attr, value in self.__dict__.items():
print(f'{attr}: {value}')
def make_canonical_name(name):
"""
Normalize the device name to a canonical form:
- Replace the first two occurrences of spaces and transform characters with dashes.
- Remove any remaining spaces and non-ASCII characters.
- Convert to lowercase.
"""
aliases = [name]
logger.info(f'Normalizing name {name}')
chars_to_replace = definitions.REPLACEMENT_SET_CANONICAL_DEVICE_NAMES
pattern = re.compile('|'.join(re.escape(char) for char in chars_to_replace))
norm_name = pattern.sub('-', name, count=2)
aliases.append(norm_name)
logger.debug(f'Name after first subst: {norm_name}')
norm_name = pattern.sub('', norm_name)
aliases.append(norm_name)
logger.debug(f'Fully normalized name: {norm_name}')
norm_name = norm_name.lower()
aliases.append(norm_name)
return norm_name, list(set(aliases))

View File

@ -22,3 +22,12 @@ assert len(LOGFILE_LOG_FORMAT) == len(CONSOLE_LOG_FORMATS), 'Log formats must be
LOGLEVEL = logging.DEBUG LOGLEVEL = logging.DEBUG
LOGDIR = Path.cwd() / 'logs' LOGDIR = Path.cwd() / 'logs'
# Characters to just replace
REPLACEMENT_SET_CANONICAL_DEVICE_NAMES = {' ', '_', }
# Characters to possibly error on
ERROR_SET_CANONICAL_DEVICE_NAMES = {',', '!', '@', '#', '$', '%', '^', '&', '*', '(', ')', '+', '=', '{', '}', '[', ']',
'|',
'\\', ':', ';', '"', "'", '<', '>', '?', '/', '`', '~'}
DEVICE_METADATA_FILE_NAME = 'device_metadata.json'

View File

@ -10,6 +10,7 @@ from iottb.utils.logger_config import setup_logging
from iottb import definitions from iottb import definitions
from iottb.contexts import IottbConfig from iottb.contexts import IottbConfig
from iottb.commands.initialize_testbed import init_db from iottb.commands.initialize_testbed import init_db
from iottb.commands.add_device import add_device
############################################################################ ############################################################################
# Module shortcuts for global definitions # Module shortcuts for global definitions
@ -40,6 +41,7 @@ def cli(ctx, verbosity, debug, cfg_file):
ctx.ensure_object(dict) # Make sure context is ready for use ctx.ensure_object(dict) # Make sure context is ready for use
logger.info("Starting execution.") logger.info("Starting execution.")
ctx.obj['CONFIG'] = IottbConfig(cfg_file) # Load configuration directly ctx.obj['CONFIG'] = IottbConfig(cfg_file) # Load configuration directly
# ctx.meta['FULL_PATH_CONFIG_FILE'] =
@click.command() @click.command()
@ -85,6 +87,35 @@ def rm_dbs(dbs):
logger.info(f'All databases deleted') logger.info(f'All databases deleted')
@click.command('show-cfg', help='Show the current configuration context')
@click.option('--cfg-file', type=click.Path(), default=CFG_FILE_PATH, help='Path to the config file')
@click.option('-pp', is_flag=True, default=False, help='Pretty Print')
@click.pass_context
def show_cfg(ctx, cfg_file, pp):
logger.debug(f'Pretty print option set to {pp}')
if pp:
try:
config = IottbConfig(Path(cfg_file))
click.echo("Configuration Context:")
click.echo(f"Default Database: {config.default_database}")
click.echo(f"Default Database Path: {config.default_db_location}")
click.echo("Database Locations:")
for db_name, db_path in config.db_path_dict.items():
click.echo(f" - {db_name}: {db_path}")
except Exception as e:
logger.error(f"Error loading configuration: {e}")
click.echo(f"Failed to load configuration from {cfg_file}")
else:
path = Path(cfg_file)
if path.is_file():
with path.open('r') as file:
content = file.read()
click.echo(content)
else:
click.echo(f"Configuration file not found at {cfg_file}")
################################################################################## ##################################################################################
# Add all subcommands to group here # Add all subcommands to group here
################################################################################# #################################################################################
@ -93,6 +124,8 @@ cli.add_command(init_db)
cli.add_command(rm_cfg) cli.add_command(rm_cfg)
cli.add_command(set_key_in_table_to) cli.add_command(set_key_in_table_to)
cli.add_command(rm_dbs) cli.add_command(rm_dbs)
# noinspection PyTypeChecker
cli.add_command(add_device)
cli.add_command(show_cfg)
if __name__ == '__main__': if __name__ == '__main__':
cli(auto_envvar_prefix='IOTTB') cli(auto_envvar_prefix='IOTTB', show_default=True, show_envvars=True)

View File

@ -0,0 +1,6 @@
import re
from iottb import definitions
def normalize_string(s, chars_to_replace=None, replacement=None, allow_unicode=False):
pass