Another try

This commit is contained in:
Sebastian Lenzlinger 2024-06-29 00:43:47 +02:00
parent 929b40983c
commit a5325fc35f
23 changed files with 1040 additions and 0 deletions

33
.gitignore vendored
View File

@ -8,3 +8,36 @@ __pycache__
/.idea
.idea/
2024-bsc-sebastian-lenzlinger.iml
__pycache__
.venv
iottb.egg-info
.idea
*.log
logs/
*.pyc
.obsidian
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
# User-specific stuff
.idea/**/workspace.xml
.idea/**/tasks.xml
.idea/**/usage.statistics.xml
.idea/**/dictionaries
.idea/**/shelf
# AWS User-specific
.idea/**/aws.xml
# Generated files
.idea/**/contentModel.xml
# Sensitive or high-churn files
.idea/**/dataSources/
.idea/**/dataSources.ids
.idea/**/dataSources.local.xml
.idea/**/sqlDataSources.xml
.idea/**/dynamic.xml
.idea/**/uiDesigner.xml
.idea/**/dbnavigator.xml

View File

@ -0,0 +1,9 @@
# Iottb
## Basic Invocation
## Configuration
### Env Vars
- IOTTB_CONF_HOME
By setting this variable you control where the basic iottb application
configuration should be looked for

View File

@ -0,0 +1,11 @@
from iottb import definitions
import logging
from iottb.utils.user_interaction import tb_echo
import click
click.echo = tb_echo # This is very hacky
logging.basicConfig(level=definitions.LOGLEVEL)
log_dir = definitions.LOGDIR
# Ensure logs dir exists before new handlers are registered in main.py
if not log_dir.is_dir():
log_dir.mkdir()

View File

@ -0,0 +1,89 @@
import json
import click
from pathlib import Path
import logging
import re
from iottb import definitions
from iottb.models.device_metadata import DeviceMetadata
from iottb.models.iottb_config import IottbConfig
from iottb.definitions import CFG_FILE_PATH, TB_ECHO_STYLES
logger = logging.getLogger(__name__)
def add_device_guided(ctx, cn, db):
click.echo('TODO: Implement')
logger.info('Adding device interactively')
#logger.debug(f'Parameters: {params}. value: {value}')
@click.command('add-device', help='Add a device to a database')
@click.option('--dev', '--device-name', type=str, required=True,
help='The name of the device to be added. If this string contains spaces or other special characters \
normalization is performed to derive a canonical name')
@click.option('--db', '--database', type=click.Path(exists=True, file_okay=False, dir_okay=True, path_type=Path),
envvar='IOTTB_DB', show_envvar=True,
help='Database in which to add this device. If not specified use default from config.')
@click.option('--guided', is_flag=True, default=False, show_default=True, envvar='IOTTB_GUIDED_ADD', show_envvar=True,
help='Add device interactively')
def add_device(dev, db, guided):
"""Add a new device to a database
Device name must be supplied unless in an interactive setup. Database is taken from config by default.
"""
logger.info('add-device invoked')
# Step 1: Load Config
# Dependency: Config file must exist
config = IottbConfig(Path(CFG_FILE_PATH))
logger.debug(f'Config loaded: {config}')
# Step 2: Load database
# dependency: Database folder must exist
if db:
database = db
path = config.db_path_dict
logger.debug(f'Resolved (path, db) {path}, {database}')
else:
path = config.default_db_location
database = config.default_database
logger.debug(f'Default (path, db) {path}, {database}')
click.secho(f'Using database {database}')
full_db_path = Path(path) / database
if not full_db_path.is_dir():
logger.warning(f'No database at {database}')
click.echo(f'Could not find a database.')
click.echo(f'You need to initialize the testbed before before you add devices!')
click.echo(f'To initialize the testbed in the default location run "iottb init-db"')
click.echo('Exiting...')
exit()
# Step 3: Check if device already exists in database
# dependency: DeviceMetadata object
device_metadata = DeviceMetadata(device_name=dev)
device_dir = full_db_path / device_metadata.canonical_name
# Check if device is already registered
if device_dir.exists():
logger.warning(f'Device directory {device_dir} already exists.')
click.echo(f'Device {dev} already exists in the database.')
click.echo('Exiting...')
exit()
try:
device_dir.mkdir()
except OSError as e:
logger.error(f'Error trying to create device {e}')
click.echo('Exiting...')
exit()
# Step 4: Save metadata into device_dir
metadata_path = device_dir / definitions.DEVICE_METADATA_FILE_NAME
with metadata_path.open('w') as metadata_file:
json.dump(device_metadata.__dict__, metadata_file, indent=4)
click.echo(f'Successfully added device {dev} to database')
logger.debug(f'Added device {dev} to database {database}. Full path of metadata {metadata_path}')
logger.info(f'Metadata for {dev} {device_metadata.print_attributes()}')

View File

@ -0,0 +1,123 @@
from pathlib import Path
import logging
import click
from iottb.definitions import DB_NAME, CFG_FILE_PATH
from iottb.models.iottb_config import IottbConfig
logger = logging.getLogger(__name__)
@click.group('util')
def tb():
pass
@click.command()
@click.option('--file', default=DB_NAME)
@click.option('--table', type=str, default='DefaultDatabase')
@click.option('--key')
@click.option('--value')
@click.pass_context
def set_key_in_table_to(ctx, file, table, key, value):
"""Edit config or metadata files. TODO: Implement"""
click.echo(f'set_key_in_table_to invoked')
logger.warning("Unimplemented subcommand invoked.")
@click.command()
@click.confirmation_option(prompt="Are you certain that you want to delete the cfg file?")
def rm_cfg():
""" Removes the cfg file from the filesystem.
This is mostly a utility during development. Once non-standard database locations are implemented,
deleting this would lead to iottb not being able to find them anymore.
"""
Path(CFG_FILE_PATH).unlink()
click.echo(f'Iottb configuration removed at {CFG_FILE_PATH}')
@click.command()
@click.confirmation_option(prompt="Are you certain that you want to delete the databases file?")
def rm_dbs(dbs):
""" Removes ALL(!) databases from the filesystem if they're empty.
Development utility currently unfit for use.
"""
config = IottbConfig()
paths = config.get_know_database_paths()
logger.debug(f'Known db paths: {str(paths)}')
for dbs in paths:
try:
Path(dbs).rmdir()
click.echo(f'{dbs} deleted')
except Exception as e:
logger.debug(f'Failed unlinking db {dbs} with error {e}')
logger.info(f'All databases deleted')
@click.command('show-cfg', help='Show the current configuration context')
@click.option('--cfg-file', type=click.Path(), default=CFG_FILE_PATH, help='Path to the config file')
@click.option('-pp', is_flag=True, default=False, help='Pretty Print')
@click.pass_context
def show_cfg(ctx, cfg_file, pp):
logger.debug(f'Pretty print option set to {pp}')
if pp:
try:
config = IottbConfig(Path(cfg_file))
click.echo("Configuration Context:")
click.echo(f"Default Database: {config.default_database}")
click.echo(f"Default Database Path: {config.default_db_location}")
click.echo("Database Locations:")
for db_name, db_path in config.db_path_dict.items():
click.echo(f" - {db_name}: {db_path}")
except Exception as e:
logger.error(f"Error loading configuration: {e}")
click.echo(f"Failed to load configuration from {cfg_file}")
else:
path = Path(cfg_file)
if path.is_file():
with path.open('r') as file:
content = file.read()
click.echo(content)
else:
click.echo(f"Configuration file not found at {cfg_file}")
@click.command('show-all', help='Show everything: configuration, databases, and device metadata')
@click.pass_context
def show_everything(ctx):
"""Show everything that can be recursively found based on config except file contents."""
config = ctx.obj['CONFIG']
click.echo("Configuration Context:")
click.echo(f"Default Database: {config.default_database}")
click.echo(f"Default Database Path: {config.default_db_location}")
click.echo("Database Locations:")
for db_name, db_path in config.db_path_dict.items():
full_db_path = Path(db_path) / db_name
click.echo(f" - {db_name}: {full_db_path}")
if full_db_path.is_dir():
click.echo(f"Contents of {db_name} at {full_db_path}:")
for item in full_db_path.iterdir():
if item.is_file():
click.echo(f" - {item.name}")
try:
with item.open('r', encoding='utf-8') as file:
content = file.read()
click.echo(f" Content:\n{content}")
except UnicodeDecodeError:
click.echo(" Content is not readable as text")
elif item.is_dir():
click.echo(f" - {item.name}/")
for subitem in item.iterdir():
if subitem.is_file():
click.echo(f" - {subitem.name}")
elif subitem.is_dir():
click.echo(f" - {subitem.name}/")
else:
click.echo(f" {full_db_path} is not a directory")
warnstyle = {'fg': 'red', 'bold': True}
click.secho('Developer command used', **warnstyle)

View File

@ -0,0 +1,100 @@
import click
from pathlib import Path
import logging
from logging.handlers import RotatingFileHandler
import sys
from iottb.models.iottb_config import IottbConfig
from iottb.definitions import DB_NAME
logger = logging.getLogger(__name__)
@click.command()
@click.option('-d', '--dest', type=click.Path(), help='Location to put (new) iottb database')
@click.option('-n', '--name', default=DB_NAME, type=str, help='Name of new database.')
@click.option('--update-default/--no-update-default', default=True, help='If new db should be set as the new default')
@click.pass_context
def init_db(ctx, dest, name, update_default):
logger.info('init-db invoked')
config = ctx.obj['CONFIG']
logger.debug(f'str(config)')
# Use the default path from config if dest is not provided
known_dbs = config.get_known_databases()
logger.debug(f'Known databases: {known_dbs}')
if name in known_dbs:
dest = config.get_database_location(name)
if Path(dest).joinpath(name).is_dir():
click.echo(f'A database {name} already exists.')
logger.debug(f'DB {name} exists in {dest}')
click.echo(f'Exiting...')
exit()
logger.debug(f'DB name {name} registered but does not exist.')
if not dest:
logger.info('No dest set, choosing default destination.')
dest = Path(config.default_db_location).parent
db_path = Path(dest).joinpath(name)
logger.debug(f'Full path for db {str(db_path)}')
# Create the directory if it doesn't exist
db_path.mkdir(parents=True, exist_ok=True)
logger.info(f"mkdir {db_path} successful")
click.echo(f'Created {db_path}')
# Update configuration
config.set_database_location(name, str(dest))
if update_default:
config.set_default_database(name, str(dest))
config.save_config()
logger.info(f"Updated configuration with database {name} at {db_path}")
@click.command()
@click.option('-d', '--dest', type=click.Path(), help='Location to put (new) iottb database')
@click.option('-n', '--name', default=DB_NAME, type=str, help='Name of new database.')
@click.option('--update-default/--no-update-default', default=True, help='If new db should be set as the new default')
@click.pass_context
def init_db_inactive(ctx, dest, name, update_default):
logger.info('init-db invoked')
config = ctx.obj['CONFIG']
logger.debug(f'str(config)')
# Retrieve known databases
known_dbs = config.get_known_databases()
# Determine destination path
if name in known_dbs:
dest = Path(config.get_database_location(name))
if dest.joinpath(name).is_dir():
click.echo(f'A database {name} already exists.')
logger.debug(f'DB {name} exists in {dest}')
click.echo(f'Exiting...')
exit()
logger.debug(f'DB name {name} registered but does not exist.')
elif not dest:
logger.info('No destination set, using default path from config.')
dest = Path(config.default_db_location).parent
# Ensure destination path is absolute
dest = dest.resolve()
# Combine destination path with database name
db_path = dest / name
logger.debug(f'Full path for database: {str(db_path)}')
# Create the directory if it doesn't exist
try:
db_path.mkdir(parents=True, exist_ok=True)
logger.info(f'Directory {db_path} created successfully.')
click.echo(f'Created {db_path}')
except Exception as e:
logger.error(f'Failed to create directory {db_path}: {e}')
click.echo(f'Failed to create directory {db_path}: {e}', err=True)
exit(1)
# Update configuration
config.set_database_location(name, str(db_path))
if update_default:
config.set_default_database(name, str(db_path))
config.save_config()
logger.info(f'Updated configuration with database {name} at {db_path}')
click.echo(f'Updated configuration with database {name} at {db_path}')

View File

@ -0,0 +1,118 @@
import click
import subprocess
import json
from pathlib import Path
import logging
import re
from datetime import datetime
from iottb.definitions import APP_NAME, CFG_FILE_PATH
from iottb.models.iottb_config import IottbConfig
from iottb.utils.string_processing import make_canonical_name
# Setup logger
logger = logging.getLogger('iottb.sniff')
def is_ip_address(address):
ip_pattern = re.compile(r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$")
return ip_pattern.match(address) is not None
def is_mac_address(address):
mac_pattern = re.compile(r"^([0-9A-Fa-f]{2}:){5}[0-9A-Fa-f]{2}$")
return mac_pattern.match(address) is not None
def load_config(cfg_file):
"""Loads configuration from the given file path."""
with open(cfg_file, 'r') as config_file:
return json.load(config_file)
def validate_sniff(ctx, param, value):
logger.info('Validating sniff...')
if ctx.params.get('unsafe') and not value:
return None
if not ctx.params.get('unsafe') and not value:
raise click.BadParameter('Address is required unless --unsafe is set.')
if not is_ip_address(value) and not is_mac_address(value):
raise click.BadParameter('Address must be a valid IP address or MAC address.')
return value
@click.command('sniff', help='Sniff packets with tcpdump')
@click.argument('device')
@click.option('-i', '--interface', callback=validate_sniff, help='Network interface to capture on',
envvar='IOTTB_CAPTURE_INTERFACE')
@click.option('-a', '--address', callback=validate_sniff, help='IP or MAC address to filter packets by',
envvar='IOTTB_CAPTURE_ADDRESS')
@click.option('--db', '--database', type=click.Path(exists=True, file_okay=False), envvar='IOTTB_DB',
help='Database of device. Only needed if not current default.')
@click.option('--unsafe', is_flag=True, default=False, envvar='IOTTB_UNSAFE', is_eager=True,
help='Disable checks for otherwise required options')
@click.option('--guided', is_flag=True, default=False)
def sniff(device, interface, address, db, unsafe, guided):
""" Sniff packets from a device """
logger.info('sniff command invoked')
# Step1: Load Config
config = IottbConfig(Path(CFG_FILE_PATH))
logger.debug(f'Config loaded: {config}')
# Step2: determine relevant database
database = db if db else config.default_database
path = config.default_db_location[database]
full_db_path = Path(path) / database
logger.debug(f'Full db path is {str(path)}')
# 2.2: Check if it exists
if not full_db_path.is_dir():
logger.error('DB unexpectedly missing')
click.echo('DB unexpectedly missing')
return
canonical_name, aliases = make_canonical_name(device)
click.echo(f'Using canonical device name {canonical_name}')
device_path = full_db_path / canonical_name
# Step 3: now the device
if not device_path.exists():
if not unsafe:
logger.error(f'Device path {device_path} does not exist')
click.echo(f'Device path {device_path} does not exist')
return
else:
device_path.mkdir(parents=True, exist_ok=True)
logger.info(f'Device path {device_path} created')
# Generate filter
if not unsafe:
if is_ip_address(address):
packet_filter = f"host {address}"
elif is_mac_address(address):
packet_filter = f"ether host {address}"
else:
logger.error('Invalid address format')
click.echo('Invalid address format')
return
else:
packet_filter = None
@click.command('sniff', help='Sniff packets with tcpdump')
@click.argument('device')
@click.option('-i', '--interface', required=False, help='Network interface to capture on', envvar='IOTTB_CAPTURE_INTERFACE')
@click.option('-a', '--address', required=True, help='IP or MAC address to filter packets by', envvar='IOTTB_CAPTURE_ADDRESS')
@click.option('--db', '--database', type=click.Path(exists=True, file_okay=False), envvar='IOTTB_DB',
help='Database of device. Only needed if not current default.')
@click.option('--unsafe', is_flag=True, default=False, envvar='IOTTB_UNSAFE',
help='Disable checks for otherwise required options')
@click.option('--guided', is_flag=True)
def sniff2(device, interface, address, cfg_file):
""" Sniff packets from a device """
logger.info('sniff command invoked')
# Step 1: Load Config
# Dependency: Config file must exist
config = IottbConfig(Path(CFG_FILE_PATH))
logger.debug(f'Config loaded: {config}')

View File

@ -0,0 +1,46 @@
import logging
from pathlib import Path
import click
APP_NAME = 'iottb'
DB_NAME = 'iottb.db'
CFG_FILE_PATH = str(Path(click.get_app_dir(APP_NAME)).joinpath('iottb.cfg'))
CONSOLE_LOG_FORMATS = {
0: '%(levelname)s - %(message)s',
1: '%(levelname)s - %(module)s - %(message)s',
2: '%(levelname)s - %(module)s - %(funcName)s - %(lineno)d - %(message)s'
}
LOGFILE_LOG_FORMAT = {
0: '%(levelname)s - %(asctime)s - %(module)s - %(message)s',
1: '%(levelname)s - %(asctime)s - %(module)s - %(funcName)s - %(message)s',
2: '%(levelname)s - %(asctime)s - %(module)s - %(funcName)s - %(lineno)d - %(message)s'
}
MAX_VERBOSITY = len(CONSOLE_LOG_FORMATS) - 1
assert len(LOGFILE_LOG_FORMAT) == len(CONSOLE_LOG_FORMATS), 'Log formats must be same size'
LOGLEVEL = logging.DEBUG
LOGDIR = Path.cwd() / 'logs'
# Characters to just replace
REPLACEMENT_SET_CANONICAL_DEVICE_NAMES = {' ', '_', ',', '!', '@', '#', '$', '%', '^', '&', '*', '(', ')', '+', '=',
'{', '}', '[', ']',
'|',
'\\', ':', ';', '"', "'", '<', '>', '?', '/', '`', '~'}
# Characters to possibly error on
ERROR_SET_CANONICAL_DEVICE_NAMES = {',', '!', '@', '#', '$', '%', '^', '&', '*', '(', ')', '+', '=', '{', '}', '[', ']',
'|',
'\\', ':', ';', '"', "'", '<', '>', '?', '/', '`', '~'}
DEVICE_METADATA_FILE_NAME = 'device_metadata.json'
TB_ECHO_STYLES = {
'w': {'fg': 'yellow', 'bold': True},
'i': {'fg': 'blue', 'italic': True},
's': {'fg': 'green', 'bold': True},
'e': {'fg': 'red', 'bold': True},
'header': {'fg': 'bright_cyan', 'bold': True, 'italic': True}
}
NAME_OF_CAPTURE_DIR = 'sniffs'

View File

@ -0,0 +1,62 @@
import click
from pathlib import Path
import logging
from iottb.commands.sniff import sniff
from iottb.commands.developer import set_key_in_table_to, rm_cfg, rm_dbs, show_cfg, show_everything
##################################################
# Import package modules
#################################################
from iottb.utils.logger_config import setup_logging
from iottb import definitions
from iottb.models.iottb_config import IottbConfig
from iottb.commands.initialize_testbed import init_db
from iottb.commands.add_device import add_device
############################################################################
# Module shortcuts for global definitions
###########################################################################
APP_NAME = definitions.APP_NAME
DB_NAME = definitions.DB_NAME
CFG_FILE_PATH = definitions.CFG_FILE_PATH
# These are (possibly) redundant when defined in definitions.py
# keeping them here until refactored and tested
MAX_VERBOSITY = definitions.MAX_VERBOSITY
# Logger stuff
loglevel = definitions.LOGLEVEL
logger = logging.getLogger(__name__)
@click.group()
@click.option('-v', '--verbosity', count=True, type=click.IntRange(0, 3), default=0,
help='Set verbosity')
@click.option('-d', '--debug', is_flag=True, default=False,
help='Enable debug mode')
@click.option('--cfg-file', type=click.Path(),
default=Path(click.get_app_dir(APP_NAME)).joinpath('iottb.cfg'),
envvar='IOTTB_CONF_HOME', help='Path to iottb config file')
@click.pass_context
def cli(ctx, verbosity, debug, cfg_file):
setup_logging(verbosity, debug) # Setup logging based on the loaded configuration and other options
ctx.ensure_object(dict) # Make sure context is ready for use
logger.info("Starting execution.")
ctx.obj['CONFIG'] = IottbConfig(cfg_file) # Load configuration directly
ctx.meta['FULL_PATH_CONFIG_FILE'] = str(cfg_file)
##################################################################################
# Add all subcommands to group here
#################################################################################
# noinspection PyTypeChecker
cli.add_command(init_db)
cli.add_command(rm_cfg)
cli.add_command(set_key_in_table_to)
cli.add_command(rm_dbs)
# noinspection PyTypeChecker
cli.add_command(add_device)
cli.add_command(show_cfg)
cli.add_command(sniff)
cli.add_command(show_everything)
if __name__ == '__main__':
cli(auto_envvar_prefix='IOTTB', show_default=True, show_envvars=True)

View File

@ -0,0 +1,6 @@
class Database:
def __init__(self, name, path):
self.name = name
self.path = path
self.device_list = [] # List of the canonical names of devices registered in this database

View File

@ -0,0 +1,44 @@
import logging
import uuid
from datetime import datetime
import logging
import click
from iottb.utils.string_processing import make_canonical_name
logger = logging.getLogger(__name__)
class DeviceMetadata:
def __init__(self, device_name, description="", model="", manufacturer="", firmware_version="", device_type="",
supported_interfaces="", companion_applications="", save_to_file=None):
self.device_id = str(uuid.uuid4())
self.device_name = device_name
cn, aliases = make_canonical_name(device_name)
logger.debug(f'cn, aliases = {cn}, {str(aliases)}')
self.aliases = aliases
self.canonical_name = cn
self.date_added = datetime.now().isoformat()
self.description = description
self.model = model
self.manufacturer = manufacturer
self.current_firmware_version = firmware_version
self.device_type = device_type
self.supported_interfaces = supported_interfaces
self.companion_applications = companion_applications
self.last_metadata_update = datetime.now().isoformat()
if save_to_file is not None:
click.echo('TODO: Implement saving config to file after creation!')
def add_alias(self, alias: str = ""):
if alias == "":
return
self.aliases.append(alias)
def get_canonical_name(self):
return self.canonical_name
def print_attributes(self):
print(f'Printing attribute value pairs in {__name__}')
for attr, value in self.__dict__.items():
print(f'{attr}: {value}')

View File

@ -0,0 +1,124 @@
import json
from pathlib import Path
from iottb import definitions
import logging
logger = logging.getLogger(__name__)
DB_NAME = definitions.DB_NAME
class IottbConfig:
""" Class to handle testbed configuration.
TODO: Add instead of overwrite Database locations when initializing if a location with valid db
exists.
"""
@staticmethod
def warn():
logger.warning(f'DatabaseLocations are DatabaseLocationMap in the class {__name__}')
def __init__(self, cfg_file=definitions.CFG_FILE_PATH):
logger.info('Initializing Config object')
IottbConfig.warn()
self.cfg_file = Path(cfg_file)
self.default_database = None
self.default_db_location = None
self.db_path_dict = dict()
self.load_config()
def create_default_config(self):
"""Create default iottb config file."""
logger.info(f'Creating default config file at {self.cfg_file}')
self.default_database = DB_NAME
self.default_db_location = str(Path.home())
self.db_path_dict = {
DB_NAME: self.default_db_location
}
defaults = {
'DefaultDatabase': self.default_database,
'DefaultDatabasePath': self.default_db_location,
'DatabaseLocations': self.db_path_dict
}
try:
self.cfg_file.parent.mkdir(parents=True, exist_ok=True)
with self.cfg_file.open('w') as config_file:
json.dump(defaults, config_file, indent=4)
except IOError as e:
logger.error(f"Failed to create default configuration file at {self.cfg_file}: {e}")
raise RuntimeError(f"Failed to create configuration file: {e}") from e
def load_config(self):
"""Loads or creates default configuration from given file path."""
logger.info('Loading configuration file')
if not self.cfg_file.is_file():
logger.info('Config file does not exist.')
self.create_default_config()
else:
logger.info('Config file exists, opening.')
with self.cfg_file.open('r') as config_file:
data = json.load(config_file)
self.default_database = data.get('DefaultDatabase')
self.default_db_location = data.get('DefaultDatabasePath')
self.db_path_dict = data.get('DatabaseLocations', {})
def save_config(self):
"""Save the current configuration to the config file."""
data = {
'DefaultDatabase': self.default_database,
'DefaultDatabasePath': self.default_db_location,
'DatabaseLocations': self.db_path_dict
}
try:
with self.cfg_file.open('w') as config_file:
json.dump(data, config_file, indent=4)
except IOError as e:
logger.error(f"Failed to save configuration file at {self.cfg_file}: {e}")
raise RuntimeError(f"Failed to save configuration file: {e}") from e
def set_default_database(self, name, path):
"""Set the default database and its path."""
self.default_database = name
self.default_db_location = path
self.db_path_dict[name] = path
def get_default_database_location(self):
return self.default_db_location
def get_default_database(self):
return self.default_database
def get_database_location(self, name):
"""Get the location of a specific database."""
return self.db_path_dict.get(name)
def set_database_location(self, name, path):
"""Set the location for a database."""
logger.debug(f'Type of "path" parameter {type(path)}')
logger.debug(f'String value of "path" parameter {str(path)}')
logger.debug(f'Type of "name" parameter {type(name)}')
logger.debug(f'String value of "name" parameter {str(name)}')
path = Path(path)
name = Path(name)
logger.debug(f'path:name = {path}:{name}')
if path.name == name:
path = path.parent
self.db_path_dict[str(name)] = str(path)
def get_known_databases(self):
"""Get the set of known databases"""
logger.info(f'Getting known databases.')
return self.db_path_dict.keys()
def get_know_database_paths(self):
"""Get the paths of all known databases"""
logger.info(f'Getting known database paths.')
return self.db_path_dict.values()
def get_full_default_path(self):
return Path(self.default_db_location) / self.default_database

View File

@ -0,0 +1,4 @@
import logging
logger = logging.getLogger('iottb.sniff') # Log with sniff subcommand

View File

@ -0,0 +1,41 @@
import logging
import sys
from logging.handlers import RotatingFileHandler
from iottb import definitions
from iottb.definitions import MAX_VERBOSITY, CONSOLE_LOG_FORMATS, APP_NAME, LOGFILE_LOG_FORMAT
loglevel = definitions.LOGLEVEL
def setup_logging(verbosity, debug=loglevel):
""" Setup root logger for iottb """
log_level = loglevel
handlers = []
date_format = '%Y-%m-%d %H:%M:%S'
if verbosity > 0:
log_level = logging.WARNING
if verbosity > MAX_VERBOSITY:
verbosity = MAX_VERBOSITY
log_level = logging.INFO
assert verbosity <= MAX_VERBOSITY, f'Verbosity must be <= {MAX_VERBOSITY}'
console_handler = logging.StreamHandler(sys.stdout)
print(str(sys.stdout))
console_handler.setFormatter(logging.Formatter(CONSOLE_LOG_FORMATS[verbosity], datefmt=date_format))
console_handler.setLevel(logging.DEBUG) # can keep at debug since it depends on global level?
handlers.append(console_handler)
if debug:
log_level = logging.DEBUG
# Logfile logs INFO+, no debugs though
file_handler = RotatingFileHandler(f'{str(definitions.LOGDIR / APP_NAME)}.log', maxBytes=10240, backupCount=5)
file_handler.setFormatter(logging.Formatter(LOGFILE_LOG_FORMAT[verbosity], datefmt=date_format))
file_handler.setLevel(logging.INFO)
# finnish root logger setup
handlers.append(file_handler)
# Force this config to be applied to root logger
logging.basicConfig(level=log_level, handlers=handlers, force=True)

View File

@ -0,0 +1,40 @@
import re
from iottb import definitions
import logging
logger = logging.getLogger(__name__)
def normalize_string(s, chars_to_replace=None, replacement=None, allow_unicode=False):
pass
def make_canonical_name(name):
"""
Normalize the device name to a canonical form:
- Replace the first two occurrences of spaces and transform characters with dashes.
- Remove any remaining spaces and non-ASCII characters.
- Convert to lowercase.
"""
aliases = [name]
logger.info(f'Normalizing name {name}')
# We first normalize
chars_to_replace = definitions.REPLACEMENT_SET_CANONICAL_DEVICE_NAMES
pattern = re.compile('|'.join(re.escape(char) for char in chars_to_replace))
norm_name = pattern.sub('-', name)
norm_name = re.sub(r'[^\x00-\x7F]+', '', norm_name) # removes non ascii chars
aliases.append(norm_name)
# Lower case
norm_name = norm_name.lower()
aliases.append(norm_name)
# canonical name is only first two parts of resulting string
parts = norm_name.split('-')
canonical_name = canonical_name = '-'.join(parts[:2])
aliases.append(canonical_name)
logger.debug(f'Canonical name: {canonical_name}')
logger.debug(f'Aliases: {aliases}')
return canonical_name, list(set(aliases))

View File

@ -0,0 +1,42 @@
# iottb/utils/user_interaction.py
import click
from iottb.definitions import TB_ECHO_STYLES
import sys
import os
def tb_echo2(msg: str, lvl='i', log=True):
style = TB_ECHO_STYLES.get(lvl, {})
click.secho(f'[IOTTB]', **style)
click.secho(f'[IOTTB] \t {msg}', **style)
last_prefix = None
def tb_echo(msg: str, lvl='i', log=True):
global last_prefix
prefix = f'Testbed [{lvl.upper()}]\n'
if last_prefix != prefix:
click.secho(prefix, nl=False, **TB_ECHO_STYLES['header'])
last_prefix = prefix
click.secho(f' {msg}', **TB_ECHO_STYLES[lvl])
def main():
tb_echo('Info message', 'i')
tb_echo('Warning message', 'w')
tb_echo('Error message', 'e')
tb_echo('Success message', 's')
if __name__ == '__main__':
# arrrgggg hacky
current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.abspath(os.path.join(current_dir, '../../'))
sys.path.insert(0, project_root)
main()

103
code/iottb-project/poetry.lock generated Normal file
View File

@ -0,0 +1,103 @@
# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand.
[[package]]
name = "click"
version = "8.1.7"
description = "Composable command line interface toolkit"
optional = false
python-versions = ">=3.7"
files = [
{file = "click-8.1.7-py3-none-any.whl", hash = "sha256:ae74fb96c20a0277a1d615f1e4d73c8414f5a98db8b799a7931d1582f3390c28"},
{file = "click-8.1.7.tar.gz", hash = "sha256:ca9853ad459e787e2192211578cc907e7594e294c7ccc834310722b41b9ca6de"},
]
[package.dependencies]
colorama = {version = "*", markers = "platform_system == \"Windows\""}
[[package]]
name = "colorama"
version = "0.4.6"
description = "Cross-platform colored terminal text."
optional = false
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7"
files = [
{file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"},
{file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"},
]
[[package]]
name = "iniconfig"
version = "2.0.0"
description = "brain-dead simple config-ini parsing"
optional = false
python-versions = ">=3.7"
files = [
{file = "iniconfig-2.0.0-py3-none-any.whl", hash = "sha256:b6a85871a79d2e3b22d2d1b94ac2824226a63c6b741c88f7ae975f18b6778374"},
{file = "iniconfig-2.0.0.tar.gz", hash = "sha256:2d91e135bf72d31a410b17c16da610a82cb55f6b0477d1a902134b24a455b8b3"},
]
[[package]]
name = "packaging"
version = "24.1"
description = "Core utilities for Python packages"
optional = false
python-versions = ">=3.8"
files = [
{file = "packaging-24.1-py3-none-any.whl", hash = "sha256:5b8f2217dbdbd2f7f384c41c628544e6d52f2d0f53c6d0c3ea61aa5d1d7ff124"},
{file = "packaging-24.1.tar.gz", hash = "sha256:026ed72c8ed3fcce5bf8950572258698927fd1dbda10a5e981cdf0ac37f4f002"},
]
[[package]]
name = "pluggy"
version = "1.5.0"
description = "plugin and hook calling mechanisms for python"
optional = false
python-versions = ">=3.8"
files = [
{file = "pluggy-1.5.0-py3-none-any.whl", hash = "sha256:44e1ad92c8ca002de6377e165f3e0f1be63266ab4d554740532335b9d75ea669"},
{file = "pluggy-1.5.0.tar.gz", hash = "sha256:2cffa88e94fdc978c4c574f15f9e59b7f4201d439195c3715ca9e2486f1d0cf1"},
]
[package.extras]
dev = ["pre-commit", "tox"]
testing = ["pytest", "pytest-benchmark"]
[[package]]
name = "pytest"
version = "8.2.2"
description = "pytest: simple powerful testing with Python"
optional = false
python-versions = ">=3.8"
files = [
{file = "pytest-8.2.2-py3-none-any.whl", hash = "sha256:c434598117762e2bd304e526244f67bf66bbd7b5d6cf22138be51ff661980343"},
{file = "pytest-8.2.2.tar.gz", hash = "sha256:de4bb8104e201939ccdc688b27a89a7be2079b22e2bd2b07f806b6ba71117977"},
]
[package.dependencies]
colorama = {version = "*", markers = "sys_platform == \"win32\""}
iniconfig = "*"
packaging = "*"
pluggy = ">=1.5,<2.0"
[package.extras]
dev = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"]
[[package]]
name = "scapy"
version = "2.5.0"
description = "Scapy: interactive packet manipulation tool"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, <4"
files = [
{file = "scapy-2.5.0.tar.gz", hash = "sha256:5b260c2b754fd8d409ba83ee7aee294ecdbb2c235f9f78fe90bc11cb6e5debc2"},
]
[package.extras]
basic = ["ipython"]
complete = ["cryptography (>=2.0)", "ipython", "matplotlib", "pyx"]
docs = ["sphinx (>=3.0.0)", "sphinx_rtd_theme (>=0.4.3)", "tox (>=3.0.0)"]
[metadata]
lock-version = "2.0"
python-versions = "^3.12"
content-hash = "10b2c268b0f10db15eab2cca3d2dc9dc25bc60f4b218ebf786fb780fa85557e0"

View File

@ -0,0 +1,22 @@
[tool.poetry]
name = "iottb"
version = "0.1.0"
description = "IoT Testbed"
authors = ["Sebastian Lenzlinger <sebastian.lenzlinger@unibas.ch>"]
readme = "README.md"
package-mode = false
[tool.poetry.dependencies]
python = "^3.12"
click = "^8.1"
scapy = "^2.5"
[tool.poetry.scripts]
iottb = "iottb.main:cli"
[tool.poetry.group.test.dependencies]
pytest = "^8.2.2"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

View File

View File

@ -0,0 +1,23 @@
from iottb.utils.string_processing import make_canonical_name
import pytest
class TestMakeCanonicalName:
def test_normalizes_name_with_spaces_to_dashes(self):
name = "Device Name With Spaces"
expected_canonical_name = "device-name"
canonical_name, aliases = make_canonical_name(name)
assert canonical_name == expected_canonical_name
assert "device-name-with-spaces" in aliases
assert "device-name" in aliases
assert "Device Name With Spaces" in aliases
def test_name_with_no_spaces_or_special_characters(self):
name = "DeviceName123"
expected_canonical_name = "devicename123"
canonical_name, aliases = make_canonical_name(name)
assert canonical_name == expected_canonical_name
assert "DeviceName123" in aliases
assert "devicename123" in aliases