207 lines
7.2 KiB
Python
207 lines
7.2 KiB
Python
import json
|
|
from pathlib import Path
|
|
import logging
|
|
import json
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
import uuid
|
|
import unicodedata
|
|
import re
|
|
|
|
import click
|
|
|
|
from iottb import definitions
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
DB_NAME = 'iottb.db'
|
|
|
|
|
|
class IottbConfig:
|
|
""" Class to handle operations on the testbed configuration.
|
|
|
|
TODO: Add instead of overwrite Database locations when initializing if a location with valid db
|
|
exists.
|
|
"""
|
|
|
|
@staticmethod
|
|
def warn():
|
|
logger.warning(f'DatabaseLocations are DatabaseLocationMap in the class {__name__}')
|
|
|
|
def __init__(self, cfg_file=definitions.CFG_FILE_PATH):
|
|
logger.info('Initializing Config object')
|
|
IottbConfig.warn()
|
|
self.cfg_file = Path(cfg_file)
|
|
self.default_database = None
|
|
self.default_db_location = None
|
|
self.db_path_dict = dict()
|
|
self.load_config()
|
|
|
|
def create_default_config(self):
|
|
"""Create default iottb config file."""
|
|
logger.info(f'Creating default config file at {self.cfg_file}')
|
|
self.default_database = DB_NAME
|
|
self.default_db_location = str(Path.home())
|
|
self.db_path_dict = {
|
|
DB_NAME: self.default_db_location
|
|
}
|
|
|
|
defaults = {
|
|
'DefaultDatabase': self.default_database,
|
|
'DefaultDatabasePath': self.default_db_location,
|
|
'DatabaseLocations': self.db_path_dict
|
|
}
|
|
|
|
try:
|
|
self.cfg_file.parent.mkdir(parents=True, exist_ok=True)
|
|
with self.cfg_file.open('w') as config_file:
|
|
json.dump(defaults, config_file, indent=4)
|
|
except IOError as e:
|
|
logger.error(f"Failed to create default configuration file at {self.cfg_file}: {e}")
|
|
raise RuntimeError(f"Failed to create configuration file: {e}") from e
|
|
|
|
def load_config(self):
|
|
"""Loads or creates default configuration from given file path."""
|
|
logger.info('Loading configuration file')
|
|
if not self.cfg_file.is_file():
|
|
logger.info('Config file does not exist.')
|
|
self.create_default_config()
|
|
else:
|
|
logger.info('Config file exists, opening.')
|
|
with self.cfg_file.open('r') as config_file:
|
|
data = json.load(config_file)
|
|
self.default_database = data.get('DefaultDatabase')
|
|
self.default_db_location = data.get('DefaultDatabasePath')
|
|
self.db_path_dict = data.get('DatabaseLocations', {})
|
|
|
|
def save_config(self):
|
|
"""Save the current configuration to the config file."""
|
|
data = {
|
|
'DefaultDatabase': self.default_database,
|
|
'DefaultDatabasePath': self.default_db_location,
|
|
'DatabaseLocations': self.db_path_dict
|
|
}
|
|
try:
|
|
with self.cfg_file.open('w') as config_file:
|
|
json.dump(data, config_file, indent=4)
|
|
except IOError as e:
|
|
logger.error(f"Failed to save configuration file at {self.cfg_file}: {e}")
|
|
raise RuntimeError(f"Failed to save configuration file: {e}") from e
|
|
|
|
def set_default_database(self, name, path):
|
|
"""Set the default database and its path."""
|
|
self.default_database = name
|
|
self.default_db_location = path
|
|
self.db_path_dict[name] = path
|
|
|
|
def get_default_database_location(self):
|
|
return self.default_db_location
|
|
|
|
def get_default_database(self):
|
|
return self.default_database
|
|
|
|
def get_database_location(self, name):
|
|
"""Get the location of a specific database."""
|
|
return self.db_path_dict.get(name)
|
|
|
|
def set_database_location(self, name, path):
|
|
"""Set the location for a database."""
|
|
logger.debug(f'Type of "path" parameter {type(path)}')
|
|
logger.debug(f'String value of "path" parameter {str(path)}')
|
|
logger.debug(f'Type of "name" parameter {type(name)}')
|
|
logger.debug(f'String value of "name" parameter {str(name)}')
|
|
path = Path(path)
|
|
name = Path(name)
|
|
logger.debug(f'path:name = {path}:{name}')
|
|
if path.name == name:
|
|
path = path.parent
|
|
self.db_path_dict[str(name)] = str(path)
|
|
|
|
def get_known_databases(self):
|
|
"""Get the set of known databases"""
|
|
logger.info(f'Getting known databases.')
|
|
|
|
return self.db_path_dict.keys()
|
|
|
|
def get_know_database_paths(self):
|
|
"""Get the paths of all known databases"""
|
|
logger.info(f'Getting known database paths.')
|
|
return self.db_path_dict.values()
|
|
|
|
def get_full_default_path(self):
|
|
return Path(self.default_db_location) / self.default_database
|
|
|
|
|
|
class Database:
|
|
|
|
def __init__(self, name, path):
|
|
self.name = name
|
|
self.path = path
|
|
self.device_list = [] # List of the canonical names of devices registered in this database
|
|
|
|
|
|
class DeviceMetadata:
|
|
def __init__(self, device_name, description="", model="", manufacturer="", firmware_version="", device_type="",
|
|
supported_interfaces="", companion_applications="", save_to_file=None):
|
|
self.device_id = str(uuid.uuid4())
|
|
self.device_name = device_name
|
|
cn, aliases = make_canonical_name(device_name)
|
|
logger.debug(f'cn, aliases = {cn}, {str(aliases)}')
|
|
self.aliases = aliases
|
|
self.canonical_name = cn
|
|
self.date_added = datetime.now().isoformat()
|
|
self.description = description
|
|
self.model = model
|
|
self.manufacturer = manufacturer
|
|
self.current_firmware_version = firmware_version
|
|
self.device_type = device_type
|
|
self.supported_interfaces = supported_interfaces
|
|
self.companion_applications = companion_applications
|
|
self.last_metadata_update = datetime.now().isoformat()
|
|
if save_to_file is not None:
|
|
click.echo('TODO: Implement saving config to file after creation!')
|
|
|
|
def add_alias(self, alias: str = ""):
|
|
if alias == "":
|
|
return
|
|
self.aliases.append(alias)
|
|
|
|
def get_canonical_name(self):
|
|
return self.canonical_name
|
|
|
|
def print_attributes(self):
|
|
print(f'Printing attribute value pairs in {__name__}')
|
|
for attr, value in self.__dict__.items():
|
|
print(f'{attr}: {value}')
|
|
|
|
|
|
def make_canonical_name(name):
|
|
"""
|
|
Normalize the device name to a canonical form:
|
|
- Replace the first two occurrences of spaces and transform characters with dashes.
|
|
- Remove any remaining spaces and non-ASCII characters.
|
|
- Convert to lowercase.
|
|
"""
|
|
aliases = [name]
|
|
logger.info(f'Normalizing name {name}')
|
|
|
|
# We first normalize
|
|
chars_to_replace = definitions.REPLACEMENT_SET_CANONICAL_DEVICE_NAMES
|
|
pattern = re.compile('|'.join(re.escape(char) for char in chars_to_replace))
|
|
norm_name = pattern.sub('-', name)
|
|
norm_name = re.sub(r'[^\x00-\x7F]+', '', norm_name) # removes non ascii chars
|
|
|
|
aliases.append(norm_name)
|
|
# Lower case
|
|
norm_name = norm_name.lower()
|
|
aliases.append(norm_name)
|
|
|
|
# canonical name is only first two parts of resulting string
|
|
parts = norm_name.split('-')
|
|
canonical_name = canonical_name = '-'.join(parts[:2])
|
|
aliases.append(canonical_name)
|
|
|
|
logger.debug(f'Canonical name: {canonical_name}')
|
|
logger.debug(f'Aliases: {aliases}')
|
|
return canonical_name, list(set(aliases))
|