Hopefully successfully integrate proper repo.

This commit is contained in:
Sebastian Lenzlinger 2024-06-30 00:02:59 +02:00
parent 01954bd5a6
commit d9d3f66fc8
30 changed files with 1497 additions and 14 deletions

14
.idea/webResources.xml generated
View File

@ -1,14 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="WebResourcesPaths">
<contentEntries>
<entry url="file://$PROJECT_DIR$">
<entryData>
<resourceRoots>
<path value="file://$PROJECT_DIR$/data" />
</resourceRoots>
</entryData>
</entry>
</contentEntries>
</component>
</project>

36
code/iottb-project/.gitignore vendored Normal file
View File

@ -0,0 +1,36 @@
__pycache__
.venv
iottb.egg-info
.idea
*.log
logs/
*.pyc
.obsidian
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
# User-specific stuff
.idea/**/workspace.xml
.idea/**/tasks.xml
.idea/**/usage.statistics.xml
.idea/**/dictionaries
.idea/**/shelf
# AWS User-specific
.idea/**/aws.xml
# Generated files
.idea/**/contentModel.xml
# Sensitive or high-churn files
.idea/**/dataSources/
.idea/**/dataSources.ids
.idea/**/dataSources.local.xml
.idea/**/sqlDataSources.xml
.idea/**/dynamic.xml
.idea/**/uiDesigner.xml
.idea/**/dbnavigator.xml
.private/
*.pcap

View File

@ -0,0 +1,9 @@
# Iottb
## Basic Invocation
## Configuration
### Env Vars
- IOTTB_CONF_HOME
By setting this variable you control where the basic iottb application
configuration should be looked for

View File

@ -0,0 +1,11 @@
from iottb import definitions
import logging
from iottb.utils.user_interaction import tb_echo
import click
click.echo = tb_echo # This is very hacky
logging.basicConfig(level=definitions.LOGLEVEL)
log_dir = definitions.LOGDIR
# Ensure logs dir exists before new handlers are registered in main.py
if not log_dir.is_dir():
log_dir.mkdir()

View File

@ -0,0 +1,89 @@
import json
import click
from pathlib import Path
import logging
import re
from iottb import definitions
from iottb.models.device_metadata import DeviceMetadata
from iottb.models.iottb_config import IottbConfig
from iottb.definitions import CFG_FILE_PATH, TB_ECHO_STYLES
logger = logging.getLogger(__name__)
def add_device_guided(ctx, cn, db):
click.echo('TODO: Implement')
logger.info('Adding device interactively')
#logger.debug(f'Parameters: {params}. value: {value}')
@click.command('add-device', help='Add a device to a database')
@click.option('--dev', '--device-name', type=str, required=True,
help='The name of the device to be added. If this string contains spaces or other special characters \
normalization is performed to derive a canonical name')
@click.option('--db', '--database', type=click.Path(exists=True, file_okay=False, dir_okay=True),
envvar='IOTTB_DB', show_envvar=True,
help='Database in which to add this device. If not specified use default from config.')
@click.option('--guided', is_flag=True, default=False, show_default=True, envvar='IOTTB_GUIDED_ADD', show_envvar=True,
help='Add device interactively')
def add_device(dev, db, guided):
"""Add a new device to a database
Device name must be supplied unless in an interactive setup. Database is taken from config by default.
"""
logger.info('add-device invoked')
# Step 1: Load Config
# Dependency: Config file must exist
config = IottbConfig(Path(CFG_FILE_PATH))
logger.debug(f'Config loaded: {config}')
# Step 2: Load database
# dependency: Database folder must exist
if db:
database = db
path = config.db_path_dict[database]
logger.debug(f'Resolved (path, db) {path}, {database}')
else:
path = config.default_db_location
database = config.default_database
logger.debug(f'Default (path, db) {path}, {database}')
click.secho(f'Using database {database}')
full_db_path = Path(path) / database
if not full_db_path.is_dir():
logger.warning(f'No database at {database}')
click.echo(f'Could not find a database.')
click.echo(f'You need to initialize the testbed before before you add devices!')
click.echo(f'To initialize the testbed in the default location run "iottb init-db"')
click.echo('Exiting...')
exit()
# Step 3: Check if device already exists in database
# dependency: DeviceMetadata object
device_metadata = DeviceMetadata(device_name=dev)
device_dir = full_db_path / device_metadata.canonical_name
# Check if device is already registered
if device_dir.exists():
logger.warning(f'Device directory {device_dir} already exists.')
click.echo(f'Device {dev} already exists in the database.')
click.echo('Exiting...')
exit()
try:
device_dir.mkdir()
except OSError as e:
logger.error(f'Error trying to create device {e}')
click.echo('Exiting...')
exit()
# Step 4: Save metadata into device_dir
metadata_path = device_dir / definitions.DEVICE_METADATA_FILE_NAME
with metadata_path.open('w') as metadata_file:
json.dump(device_metadata.__dict__, metadata_file, indent=4)
click.echo(f'Successfully added device {dev} to database')
logger.debug(f'Added device {dev} to database {database}. Full path of metadata {metadata_path}')
logger.info(f'Metadata for {dev} {device_metadata.print_attributes()}')

View File

@ -0,0 +1,130 @@
from pathlib import Path
import logging
import click
from iottb import tb_echo
from iottb.definitions import DB_NAME, CFG_FILE_PATH
from iottb.models.iottb_config import IottbConfig
logger = logging.getLogger(__name__)
@click.group('util')
def tb():
pass
@click.command()
@click.option('--file', default=DB_NAME)
@click.option('--table', type=str, default='DefaultDatabase')
@click.option('--key')
@click.option('--value')
@click.pass_context
def set_key_in_table_to(ctx, file, table, key, value):
"""Edit config or metadata files. TODO: Implement"""
click.echo(f'set_key_in_table_to invoked')
logger.warning("Unimplemented subcommand invoked.")
@click.command()
@click.confirmation_option(prompt="Are you certain that you want to delete the cfg file?")
def rm_cfg():
""" Removes the cfg file from the filesystem.
This is mostly a utility during development. Once non-standard database locations are implemented,
deleting this would lead to iottb not being able to find them anymore.
"""
Path(CFG_FILE_PATH).unlink()
click.echo(f'Iottb configuration removed at {CFG_FILE_PATH}')
@click.command()
@click.confirmation_option(prompt="Are you certain that you want to delete the databases file?")
def rm_dbs(dbs):
""" Removes ALL(!) databases from the filesystem if they're empty.
Development utility currently unfit for use.
"""
config = IottbConfig()
paths = config.get_know_database_paths()
logger.debug(f'Known db paths: {str(paths)}')
for dbs in paths:
try:
Path(dbs).rmdir()
click.echo(f'{dbs} deleted')
except Exception as e:
logger.debug(f'Failed unlinking db {dbs} with error {e}')
logger.info(f'All databases deleted')
@click.command('show-cfg', help='Show the current configuration context')
@click.option('--cfg-file', type=click.Path(), default=CFG_FILE_PATH, help='Path to the config file')
@click.option('-pp', is_flag=True, default=False, help='Pretty Print')
@click.pass_context
def show_cfg(ctx, cfg_file, pp):
logger.debug(f'Pretty print option set to {pp}')
if pp:
try:
config = IottbConfig(Path(cfg_file))
click.echo("Configuration Context:")
click.echo(f"Default Database: {config.default_database}")
click.echo(f"Default Database Path: {config.default_db_location}")
click.echo("Database Locations:")
for db_name, db_path in config.db_path_dict.items():
click.echo(f" - {db_name}: {db_path}")
except Exception as e:
logger.error(f"Error loading configuration: {e}")
click.echo(f"Failed to load configuration from {cfg_file}")
else:
path = Path(cfg_file)
if path.is_file():
with path.open('r') as file:
content = file.read()
click.echo(content)
else:
click.echo(f"Configuration file not found at {cfg_file}")
@click.command('show-all', help='Show everything: configuration, databases, and device metadata')
@click.pass_context
def show_everything(ctx):
"""Show everything that can be recursively found based on config except file contents."""
config = ctx.obj['CONFIG']
click.echo("Configuration Context:")
click.echo(f"Default Database: {config.default_database}")
click.echo(f"Default Database Path: {config.default_db_location}")
click.echo("Database Locations:")
everything_dict = {}
for db_name, db_path in config.db_path_dict.items():
click.echo(f" - {db_name}: {db_path}")
for db_name, db_path in config.db_path_dict.items():
full_db_path = Path(db_path) / db_name
if full_db_path.is_dir():
click.echo(f"\nContents of {full_db_path}:")
flag = True
for item in full_db_path.iterdir():
flag = False
if item.is_file():
click.echo(f" - {item.name}")
try:
with item.open('r', encoding='utf-8') as file:
content = file.read()
click.echo(f" Content:\n{content}")
except UnicodeDecodeError:
click.echo(" Content is not readable as text")
elif item.is_dir():
click.echo(f" - {item.name}/")
for subitem in item.iterdir():
if subitem.is_file():
click.echo(f" - {subitem.name}")
elif subitem.is_dir():
click.echo(f" - {subitem.name}/")
if flag:
tb_echo(f'\t EMPTY')
else:
click.echo(f"{full_db_path} is not a directory")

View File

@ -0,0 +1,327 @@
import os
import shutil
import uuid
from time import time
import click
import subprocess
import json
from pathlib import Path
import logging
import re
from datetime import datetime
from click_option_group import optgroup
from iottb.definitions import APP_NAME, CFG_FILE_PATH
from iottb.models.iottb_config import IottbConfig
from iottb.utils.string_processing import make_canonical_name
# Setup logger
logger = logging.getLogger('iottb.sniff')
def is_ip_address(address):
ip_pattern = re.compile(r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$")
return ip_pattern.match(address) is not None
def is_mac_address(address):
mac_pattern = re.compile(r"^([0-9A-Fa-f]{2}:){5}[0-9A-Fa-f]{2}$")
return mac_pattern.match(address) is not None
def load_config(cfg_file):
"""Loads configuration from the given file path."""
with open(cfg_file, 'r') as config_file:
return json.load(config_file)
def validate_sniff(ctx, param, value):
logger.info('Validating sniff...')
if ctx.params.get('unsafe') and not value:
return None
if not ctx.params.get('unsafe') and not value:
raise click.BadParameter('Address is required unless --unsafe is set.')
if not is_ip_address(value) and not is_mac_address(value):
raise click.BadParameter('Address must be a valid IP address or MAC address.')
return value
@click.command('sniff', help='Sniff packets with tcpdump')
@optgroup.group('Testbed sources')
@optgroup.option('--db', '--database', type=str, envvar='IOTTB_DB', show_envvar=True,
help='Database of device. Only needed if not current default.')
@optgroup.option('--app', type=str, help='Companion app being used during capture', required=False)
@optgroup.group('Runtime behaviour')
@optgroup.option('--unsafe', is_flag=True, default=False, envvar='IOTTB_UNSAFE', is_eager=True,
help='Disable checks for otherwise required options.\n', show_envvar=True)
@optgroup.option('--guided', is_flag=True, default=False, envvar='IOTTB_GUIDED', show_envvar=True)
@optgroup.option('--pre', type=click.Path(exists=True, executable=True), help='Script to be executed before main '
'command'
'is started.')
@optgroup.group('Tcpdump options')
@optgroup.option('-i', '--interface',
help='Network interface to capture on.' +
'If not specified tcpdump tries to find and appropriate one.\n', show_envvar=True,
envvar='IOTTB_CAPTURE_INTERFACE')
@optgroup.option('-a', '--address', callback=validate_sniff,
help='IP or MAC address to filter packets by.\n', show_envvar=True,
envvar='IOTTB_CAPTURE_ADDRESS')
@optgroup.option('-I', '--monitor-mode', help='Put interface into monitor mode.', is_flag=True)
@optgroup.option('--ff', type=str, envvar='IOTTB_CAPTURE_FILTER', show_envvar=True,
help='tcpdump filter as string or file path.')
@optgroup.option('-#', '--print-pacno', is_flag=True, default=True,
help='Print packet number at beginning of line. True by default.')
@optgroup.option('-e', '--print-ll', is_flag=True, default=False,
help='Print link layer headers. True by default.')
@optgroup.option('-c', '--count', type=int, help='Number of packets to capture.', default=1000)
# @optgroup.option('--mins', type=int, help='Time in minutes to capture.', default=1)
@click.argument('tcpdump-args', nargs=-1, required=False, metavar='[TCPDUMP-ARGS]')
@click.argument('device', required=False)
@click.pass_context
def sniff(ctx, device, interface, print_pacno, ff, count, monitor_mode, print_ll, address, db, unsafe, guided,
app, tcpdump_args, **params):
""" Sniff packets from a device """
logger.info('sniff command invoked')
# Step1: Load Config
config = ctx.obj['CONFIG']
logger.debug(f'Config loaded: {config}')
# Step2: determine relevant database
database = db if db else config.default_database
path = config.db_path_dict[database]
full_db_path = Path(path) / database
logger.debug(f'Full db path is {str(full_db_path)}')
# 2.2: Check if it exists
if not full_db_path.is_dir():
logger.error('DB unexpectedly missing')
click.echo('DB unexpectedly missing')
return
canonical_name, aliases = make_canonical_name(device)
click.echo(f'Using canonical device name {canonical_name}')
device_path = full_db_path / canonical_name
# Step 3: now the device
if not device_path.exists():
if not unsafe:
logger.error(f'Device path {device_path} does not exist')
click.echo(f'Device path {device_path} does not exist')
return
else:
device_path.mkdir(parents=True, exist_ok=True)
logger.info(f'Device path {device_path} created')
click.echo(f'Found device at path {device_path}')
# Step 4: Generate filter
generic_filter = None
cap_filter = None
if ff:
logger.debug(f'ff: {ff}')
if Path(ff).is_file():
logger.info('Given filter option is a file')
with open(ff, 'r') as f:
cap_filter = f.read().strip()
else:
logger.info('Given filter option is an expression')
cap_filter = ff
else:
if address is not None:
if is_ip_address(address):
generic_filter = 'net'
cap_filter = f'{generic_filter} {address}'
elif is_mac_address(address):
generic_filter = 'ether net'
cap_filter = f'{generic_filter} {address}'
elif not unsafe:
logger.error('Invalid address format')
click.echo('Invalid address format')
return
logger.info(f'Generic filter {generic_filter}')
click.echo(f'Using filter {cap_filter}')
# Step 5: prep capture directory
capture_date = datetime.now().strftime('%Y-%m-%d')
capture_base_dir = device_path / f'sniffs/{capture_date}'
capture_base_dir.mkdir(parents=True, exist_ok=True)
logger.debug(f'Previous captures {capture_base_dir.glob('cap*')}')
capture_count = sum(1 for _ in capture_base_dir.glob('cap*'))
logger.debug(f'Capture count is {capture_count}')
capture_dir = f'cap{capture_count:04d}-{datetime.now().strftime('%H%M')}'
logger.debug(f'capture_dir: {capture_dir}')
# Full path
capture_dir_full_path = capture_base_dir / capture_dir
capture_dir_full_path.mkdir(parents=True, exist_ok=True)
click.echo(f'Files will be placed in {str(capture_dir_full_path)}')
logger.debug(f'successfully created capture directory')
# Step 6: Prepare capture file names
# Generate UUID for filenames
capture_uuid = str(uuid.uuid4())
click.echo(f'Capture has id {capture_uuid}')
pcap_file = f"{canonical_name}_{capture_uuid}.pcap"
pcap_file_full_path = capture_dir_full_path / pcap_file
stdout_log_file = f'stdout_{capture_uuid}.log'
stderr_log_file = f'stderr_{capture_uuid}.log'
logger.debug(f'Full pcap file path is {pcap_file_full_path}')
logger.info(f'pcap file name is {pcap_file}')
logger.info(f'stdout log file is {stdout_log_file}')
logger.info(f'stderr log file is {stderr_log_file}')
# Step 7: Build tcpdump command
logger.debug(f'pgid {os.getpgrp()}')
logger.debug(f'ppid {os.getppid()}')
logger.debug(f'(real, effective, saved) user id: {os.getresuid()}')
logger.debug(f'(real, effective, saved) group id: {os.getresgid()}')
cmd = ['sudo', 'tcpdump']
# 7.1 process flags
flags = []
if print_pacno:
flags.append('-#')
if print_ll:
flags.append('-e')
if monitor_mode:
flags.append('-I')
flags.append('-n') # TODO: Integrate, in case name resolution is wanted!
cmd.extend(flags)
flags_string = " ".join(flags)
logger.debug(f'Flags: {flags_string}')
# debug interlude
verbosity = ctx.obj['VERBOSITY']
if verbosity > 0:
verbosity_flag = '-'
for i in range(0, verbosity):
verbosity_flag = verbosity_flag + 'v'
logger.debug(f'verbosity string to pass to tcpdump: {verbosity_flag}')
cmd.append(verbosity_flag)
# 7.2 generic (i.e. reusable) kw args
generic_kw_args = []
if count:
generic_kw_args.extend(['-c', str(count)])
# if mins:
# generic_kw_args.extend(['-G', str(mins * 60)]) TODO: this currently loads to errors with sudo
cmd.extend(generic_kw_args)
generic_kw_args_string = " ".join(generic_kw_args)
logger.debug(f'KW args: {generic_kw_args_string}')
# 7.3 special kw args (not a priori reusable)
non_generic_kw_args = []
if interface:
non_generic_kw_args.extend(['-i', interface])
non_generic_kw_args.extend(['-w', str(pcap_file_full_path)])
cmd.extend(non_generic_kw_args)
non_generic_kw_args_string = " ".join(non_generic_kw_args)
logger.debug(f'Non transferable (special) kw args: {non_generic_kw_args_string}')
# 7.4 add filter expression
if cap_filter:
logger.debug(f'cap_filter (not generic): {cap_filter}')
cmd.append(cap_filter)
full_cmd_string = " ".join(cmd)
logger.info(f'tcpdump command: {"".join(full_cmd_string)}')
click.echo('Capture setup complete!')
# Step 8: Execute tcpdump command
start_time = datetime.now().strftime("%H:%M:%S")
start = time()
try:
if guided:
click.confirm(f'Execute following command: {full_cmd_string}')
stdout_log_file_abs_path = capture_dir_full_path / stdout_log_file
stderr_log_file_abs_path = capture_dir_full_path / stderr_log_file
stdout_log_file_abs_path.touch(mode=0o777)
stderr_log_file_abs_path.touch(mode=0o777)
with open(stdout_log_file_abs_path, 'w') as out, open(stderr_log_file_abs_path, 'w') as err:
logger.debug(f'\nstdout: {out}.\nstderr: {err}.\n')
tcp_complete = subprocess.run(cmd, check=True, capture_output=True, text=True)
out.write(tcp_complete.stdout)
err.write(tcp_complete.stderr)
#click.echo(f'Mock sniff execution')
click.echo(f"Capture complete. Saved to {pcap_file}")
except subprocess.CalledProcessError as e:
logger.error(f'Failed to capture packets: {e}')
click.echo(f'Failed to capture packets: {e}')
click.echo(f'Check {stderr_log_file} for more info.')
if ctx.obj['DEBUG']:
msg = [f'STDERR log {stderr_log_file} contents:\n']
with open(capture_dir_full_path / stderr_log_file) as log:
for line in log:
msg.append(line)
click.echo("\t".join(msg), lvl='e')
# print('DEBUG ACTIVE')
if guided:
click.prompt('Create metadata anyway?')
else:
click.echo('Aborting capture...')
exit()
end_time = datetime.now().strftime("%H:%M:%S")
end = time()
delta = end - start
click.echo(f'tcpdump took {delta:.2f} seconds.')
# Step 9: Register metadata
metadata = {
'device': canonical_name,
'device_id': device,
'capture_id': capture_uuid,
'capture_date_iso': datetime.now().isoformat(),
'invoked_command': " ".join(map(str, cmd)),
'capture_duration': delta,
'generic_parameters': {
'flags': flags_string,
'kwargs': generic_kw_args_string,
'filter': generic_filter
},
'non_generic_parameters': {
'kwargs': non_generic_kw_args_string,
'filter': cap_filter
},
'features': {
'interface': interface,
'address': address
},
'resources': {
'pcap_file': str(pcap_file),
'stdout_log': str(stdout_log_file),
'stderr_log': str(stderr_log_file)
},
'environment': {
'capture_dir': capture_dir,
'database': database,
'capture_base_dir': str(capture_base_dir),
'capture_dir_abs_path': str(capture_dir_full_path)
}
}
click.echo('Ensuring correct ownership of created files.')
username = os.getlogin()
gid = os.getgid()
# Else there are issues when running with sudo:
try:
subprocess.run(f'sudo chown -R {username}:{username} {device_path}', shell=True)
except OSError as e:
click.echo(f'Some error {e}')
click.echo(f'Saving metadata.')
metadata_abs_path = capture_dir_full_path / 'capture_metadata.json'
with open(metadata_abs_path, 'w') as f:
json.dump(metadata, f, indent=4)
click.echo(f'END SNIFF SUBCOMMAND')

View File

@ -0,0 +1,120 @@
import click
from pathlib import Path
import logging
from logging.handlers import RotatingFileHandler
import sys
from iottb.models.iottb_config import IottbConfig
from iottb.definitions import DB_NAME, CFG_FILE_PATH
logger = logging.getLogger(__name__)
@click.command()
@click.option('-d', '--dest', type=click.Path(), help='Location to put (new) iottb database')
@click.option('-n', '--name', default=DB_NAME, type=str, help='Name of new database.')
@click.option('--update-default/--no-update-default', default=True, help='If new db should be set as the new default')
@click.pass_context
def init_db(ctx, dest, name, update_default):
logger.info('init-db invoked')
config = ctx.obj['CONFIG']
logger.debug(f'str(config)')
# Use the default path from config if dest is not provided
known_dbs = config.get_known_databases()
logger.debug(f'Known databases: {known_dbs}')
if name in known_dbs:
dest = config.get_database_location(name)
if Path(dest).joinpath(name).is_dir():
click.echo(f'A database {name} already exists.')
logger.debug(f'DB {name} exists in {dest}')
click.echo(f'Exiting...')
exit()
logger.debug(f'DB name {name} registered but does not exist.')
if not dest:
logger.info('No dest set, choosing default destination.')
dest = Path(config.default_db_location).parent
db_path = Path(dest).joinpath(name)
logger.debug(f'Full path for db {str(db_path)}')
# Create the directory if it doesn't exist
db_path.mkdir(parents=True, exist_ok=True)
logger.info(f"mkdir {db_path} successful")
click.echo(f'Created {db_path}')
# Update configuration
config.set_database_location(name, str(dest))
if update_default:
config.set_default_database(name, str(dest))
config.save_config()
logger.info(f"Updated configuration with database {name} at {db_path}")
# @click.group('config')
# @click.pass_context
# def cfg(ctx):
# pass
#
# @click.command('set', help='Set the location of a database.')
# @click.argument('database', help='Name of database')
# @click.argument('location', help='Where the database is located (i.e. its parent directory)')
# @click.pass_context
# def set(ctx, key, value):
# click.echo(f'Setting {key} to {value} in config')
# config = ctx.obj['CONFIG']
# logger.warning('No checks performed!')
# config.set_database_location(key, value)
# config.save_config()
@click.command()
@click.option('-d', '--dest', type=click.Path(), help='Location to put (new) iottb database')
@click.option('-n', '--name', default=DB_NAME, type=str, help='Name of new database.')
@click.option('--update-default/--no-update-default', default=True, help='If new db should be set as the new default')
@click.pass_context
def init_db_inactive(ctx, dest, name, update_default):
logger.info('init-db invoked')
config = ctx.obj['CONFIG']
logger.debug(f'str(config)')
# Retrieve known databases
known_dbs = config.get_known_databases()
# Determine destination path
if name in known_dbs:
dest = Path(config.get_database_location(name))
if dest.joinpath(name).is_dir():
click.echo(f'A database {name} already exists.')
logger.debug(f'DB {name} exists in {dest}')
click.echo(f'Exiting...')
exit()
logger.debug(f'DB name {name} registered but does not exist.')
elif not dest:
logger.info('No destination set, using default path from config.')
dest = Path(config.default_db_location).parent
# Ensure destination path is absolute
dest = dest.resolve()
# Combine destination path with database name
db_path = dest / name
logger.debug(f'Full path for database: {str(db_path)}')
# Create the directory if it doesn't exist
try:
db_path.mkdir(parents=True, exist_ok=True)
logger.info(f'Directory {db_path} created successfully.')
click.echo(f'Created {db_path}')
except Exception as e:
logger.error(f'Failed to create directory {db_path}: {e}')
click.echo(f'Failed to create directory {db_path}: {e}', err=True)
exit(1)
# Update configuration
config.set_database_location(name, str(db_path))
if update_default:
config.set_default_database(name, str(db_path))
config.save_config()
logger.info(f'Updated configuration with database {name} at {db_path}')
click.echo(f'Updated configuration with database {name} at {db_path}')

View File

@ -0,0 +1,48 @@
import logging
from pathlib import Path
import click
APP_NAME = 'iottb'
DB_NAME = 'iottb.db'
CFG_FILE_PATH = str(Path(click.get_app_dir(APP_NAME)).joinpath('iottb.cfg'))
CONSOLE_LOG_FORMATS = {
0: '%(levelname)s - %(message)s',
1: '%(levelname)s - %(module)s - %(message)s',
2: '%(levelname)s - %(module)s - %(funcName)s - %(lineno)d - %(message)s'
}
LOGFILE_LOG_FORMAT = {
0: '%(levelname)s - %(asctime)s - %(module)s - %(message)s',
1: '%(levelname)s - %(asctime)s - %(module)s - %(funcName)s - %(message)s',
2: '%(levelname)s - %(asctime)s - %(module)s - %(funcName)s - %(lineno)d - %(message)s'
}
MAX_VERBOSITY = len(CONSOLE_LOG_FORMATS) - 1
assert len(LOGFILE_LOG_FORMAT) == len(CONSOLE_LOG_FORMATS), 'Log formats must be same size'
LOGLEVEL = logging.DEBUG
LOGDIR = Path.cwd() / 'logs'
# Characters to just replace
REPLACEMENT_SET_CANONICAL_DEVICE_NAMES = {' ', '_', ',', '!', '@', '#', '$', '%', '^', '&', '*', '(', ')', '+', '=',
'{', '}', '[', ']',
'|',
'\\', ':', ';', '"', "'", '<', '>', '?', '/', '`', '~'}
# Characters to possibly error on
ERROR_SET_CANONICAL_DEVICE_NAMES = {',', '!', '@', '#', '$', '%', '^', '&', '*', '(', ')', '+', '=', '{', '}', '[', ']',
'|',
'\\', ':', ';', '"', "'", '<', '>', '?', '/', '`', '~'}
DEVICE_METADATA_FILE_NAME = 'device_metadata.json'
TB_ECHO_STYLES = {
'w': {'fg': 'yellow', 'bold': True},
'i': {'fg': 'blue', 'italic': True},
's': {'fg': 'green', 'bold': True},
'e': {'fg': 'red', 'bold': True},
'header': {'fg': 'bright_cyan', 'bold': True, 'italic': True}
}
NAME_OF_CAPTURE_DIR = 'sniffs'

View File

@ -0,0 +1,77 @@
import os
import shutil
import click
from pathlib import Path
import logging
from iottb.commands.sniff import sniff
from iottb.commands.developer import set_key_in_table_to, rm_cfg, rm_dbs, show_cfg, show_everything
##################################################
# Import package modules
#################################################
from iottb.utils.logger_config import setup_logging
from iottb import definitions
from iottb.models.iottb_config import IottbConfig
from iottb.commands.testbed import init_db
from iottb.commands.add_device import add_device
############################################################################
# Module shortcuts for global definitions
###########################################################################
APP_NAME = definitions.APP_NAME
DB_NAME = definitions.DB_NAME
CFG_FILE_PATH = definitions.CFG_FILE_PATH
# These are (possibly) redundant when defined in definitions.py
# keeping them here until refactored and tested
MAX_VERBOSITY = definitions.MAX_VERBOSITY
# Logger stuff
loglevel = definitions.LOGLEVEL
logger = logging.getLogger(__name__)
@click.group(context_settings=dict(auto_envvar_prefix='IOTTB', show_default=True))
@click.option('-v', '--verbosity', count=True, type=click.IntRange(0, 3), default=0, is_eager=True,
help='Set verbosity')
@click.option('-d', '--debug', is_flag=True, default=False, is_eager=True,
help='Enable debug mode')
@click.option('--dry-run', is_flag=True, default=True, is_eager=True)
@click.option('--cfg-file', type=click.Path(),
default=Path(click.get_app_dir(APP_NAME)).joinpath('iottb.cfg'),
envvar='IOTTB_CONF_HOME', help='Path to iottb config file')
@click.pass_context
def cli(ctx, verbosity, debug, dry_run, cfg_file):
setup_logging(verbosity, debug) # Setup logging based on the loaded configuration and other options
ctx.ensure_object(dict) # Make sure context is ready for use
logger.info("Starting execution.")
ctx.obj['CONFIG'] = IottbConfig(cfg_file) # Load configuration directly
ctx.meta['FULL_PATH_CONFIG_FILE'] = str(cfg_file)
ctx.meta['DRY_RUN'] = dry_run
logger.debug(f'Verbosity: {verbosity}')
ctx.obj['VERBOSITY'] = verbosity
logger.debug(f'Debug: {debug}')
ctx.obj['DEBUG'] = debug
##################################################################################
# Add all subcommands to group here
#################################################################################
# TODO: Is there a way to do this without pylint freaking out?
# noinspection PyTypeChecker
cli.add_command(init_db)
cli.add_command(rm_cfg)
cli.add_command(set_key_in_table_to)
cli.add_command(rm_dbs)
# noinspection PyTypeChecker
cli.add_command(add_device)
cli.add_command(show_cfg)
cli.add_command(sniff)
cli.add_command(show_everything)
if __name__ == '__main__':
cli()
for log in Path.cwd().iterdir():
log.chmod(0o777)

View File

@ -0,0 +1,6 @@
class Database:
def __init__(self, name, path):
self.name = name
self.path = path
self.device_list = [] # List of the canonical names of devices registered in this database

View File

@ -0,0 +1,44 @@
import logging
import uuid
from datetime import datetime
import logging
import click
from iottb.utils.string_processing import make_canonical_name
logger = logging.getLogger(__name__)
class DeviceMetadata:
def __init__(self, device_name, description="", model="", manufacturer="", firmware_version="", device_type="",
supported_interfaces="", companion_applications="", save_to_file=None):
self.device_id = str(uuid.uuid4())
self.device_name = device_name
cn, aliases = make_canonical_name(device_name)
logger.debug(f'cn, aliases = {cn}, {str(aliases)}')
self.aliases = aliases
self.canonical_name = cn
self.date_added = datetime.now().isoformat()
self.description = description
self.model = model
self.manufacturer = manufacturer
self.current_firmware_version = firmware_version
self.device_type = device_type
self.supported_interfaces = supported_interfaces
self.companion_applications = companion_applications
self.last_metadata_update = datetime.now().isoformat()
if save_to_file is not None:
click.echo('TODO: Implement saving config to file after creation!')
def add_alias(self, alias: str = ""):
if alias == "":
return
self.aliases.append(alias)
def get_canonical_name(self):
return self.canonical_name
def print_attributes(self):
print(f'Printing attribute value pairs in {__name__}')
for attr, value in self.__dict__.items():
print(f'{attr}: {value}')

View File

@ -0,0 +1,124 @@
import json
from pathlib import Path
from iottb import definitions
import logging
logger = logging.getLogger(__name__)
DB_NAME = definitions.DB_NAME
class IottbConfig:
""" Class to handle testbed configuration.
TODO: Add instead of overwrite Database locations when initializing if a location with valid db
exists.
"""
@staticmethod
def warn():
logger.warning(f'DatabaseLocations are DatabaseLocationMap in the class {__name__}')
def __init__(self, cfg_file=definitions.CFG_FILE_PATH):
logger.info('Initializing Config object')
IottbConfig.warn()
self.cfg_file = Path(cfg_file)
self.default_database = None
self.default_db_location = None
self.db_path_dict = dict()
self.load_config()
def create_default_config(self):
"""Create default iottb config file."""
logger.info(f'Creating default config file at {self.cfg_file}')
self.default_database = DB_NAME
self.default_db_location = str(Path.home())
self.db_path_dict = {
DB_NAME: self.default_db_location
}
defaults = {
'DefaultDatabase': self.default_database,
'DefaultDatabasePath': self.default_db_location,
'DatabaseLocations': self.db_path_dict
}
try:
self.cfg_file.parent.mkdir(parents=True, exist_ok=True)
with self.cfg_file.open('w') as config_file:
json.dump(defaults, config_file, indent=4)
except IOError as e:
logger.error(f"Failed to create default configuration file at {self.cfg_file}: {e}")
raise RuntimeError(f"Failed to create configuration file: {e}") from e
def load_config(self):
"""Loads or creates default configuration from given file path."""
logger.info('Loading configuration file')
if not self.cfg_file.is_file():
logger.info('Config file does not exist.')
self.create_default_config()
else:
logger.info('Config file exists, opening.')
with self.cfg_file.open('r') as config_file:
data = json.load(config_file)
self.default_database = data.get('DefaultDatabase')
self.default_db_location = data.get('DefaultDatabasePath')
self.db_path_dict = data.get('DatabaseLocations', {})
def save_config(self):
"""Save the current configuration to the config file."""
data = {
'DefaultDatabase': self.default_database,
'DefaultDatabasePath': self.default_db_location,
'DatabaseLocations': self.db_path_dict
}
try:
with self.cfg_file.open('w') as config_file:
json.dump(data, config_file, indent=4)
except IOError as e:
logger.error(f"Failed to save configuration file at {self.cfg_file}: {e}")
raise RuntimeError(f"Failed to save configuration file: {e}") from e
def set_default_database(self, name, path):
"""Set the default database and its path."""
self.default_database = name
self.default_db_location = path
self.db_path_dict[name] = path
def get_default_database_location(self):
return self.default_db_location
def get_default_database(self):
return self.default_database
def get_database_location(self, name):
"""Get the location of a specific database."""
return self.db_path_dict.get(name)
def set_database_location(self, name, path):
"""Set the location for a database."""
logger.debug(f'Type of "path" parameter {type(path)}')
logger.debug(f'String value of "path" parameter {str(path)}')
logger.debug(f'Type of "name" parameter {type(name)}')
logger.debug(f'String value of "name" parameter {str(name)}')
path = Path(path)
name = Path(name)
logger.debug(f'path:name = {path}:{name}')
if path.name == name:
path = path.parent
self.db_path_dict[str(name)] = str(path)
def get_known_databases(self):
"""Get the set of known databases"""
logger.info(f'Getting known databases.')
return self.db_path_dict.keys()
def get_know_database_paths(self):
"""Get the paths of all known databases"""
logger.info(f'Getting known database paths.')
return self.db_path_dict.values()
def get_full_default_path(self):
return Path(self.default_db_location) / self.default_database

View File

@ -0,0 +1,39 @@
import json
import logging
import uuid
from datetime import datetime
from pathlib import Path
logger = logging.getLogger('iottb.sniff') # Log with sniff subcommand
class CaptureMetadata:
def __init__(self, device_id, capture_dir, interface, address, capture_file, tcpdump_command, tcpdump_stdout, tcpdump_stderr, packet_filter, alias):
self.base_data = {
'device_id': device_id,
'capture_id': str(uuid.uuid4()),
'capture_date': datetime.now().isoformat(),
'capture_dir': str(capture_dir),
'capture_file': capture_file,
'start_time': "",
'stop_time': "",
'alias': alias
}
self.features = {
'interface': interface,
'device_ip_address': address if address else "No IP Address set",
'tcpdump_stdout': str(tcpdump_stdout),
'tcpdump_stderr': str(tcpdump_stderr),
'packet_filter': packet_filter
}
self.command = tcpdump_command
def save_to_file(self):
metadata = {
'base_data': self.base_data,
'features': self.features,
'command': self.command
}
metadata_file_path = Path(self.base_data['capture_dir']) / 'metadata.json'
with open(metadata_file_path, 'w') as f:
json.dump(metadata, f, indent=4)
logger.info(f'Metadata saved to {metadata_file_path}')

View File

@ -0,0 +1,52 @@
import click
from io import StringIO
import sys
# Import your CLI app here
from iottb.main import cli
"""Script to generate the help text and write to file.
Definitely needs better formatting.
Script is also not very flexible.
"""
def get_help_text(command):
"""Get the help text for a given command."""
help_text = StringIO()
with click.Context(command) as ctx:
# chatgpt says this helps: was right
sys_stdout = sys.stdout
sys.stdout = help_text
try:
click.echo(command.get_help(ctx))
finally:
sys.stdout = sys_stdout
return help_text.getvalue()
def write_help_to_file(cli, filename):
"""Write help messages of all commands and subcommands to a file."""
with open(filename, 'w') as f:
# main
f.write(f"Main Command: iottb\n")
f.write(get_help_text(cli))
f.write("\n\n")
# go through subcommands
for cmd_name, cmd in cli.commands.items():
f.write(f"Command: {cmd_name}\n")
f.write(get_help_text(cmd))
f.write("\n\n")
# subcommands of subcommands
if isinstance(cmd, click.Group):
for sub_cmd_name, sub_cmd in cmd.commands.items():
f.write(f"Subcommand: {cmd_name} {sub_cmd_name}\n")
f.write(get_help_text(sub_cmd))
f.write("\n\n")
if __name__ == "__main__":
write_help_to_file(cli, "help_messages.md")

View File

@ -0,0 +1,4 @@
#/bin/sh
echo 'Running iottb as sudo'
sudo $(which python) iottb $@
echo 'Finished executing iottb with sudo'

View File

@ -0,0 +1,41 @@
import logging
import sys
from logging.handlers import RotatingFileHandler
from iottb import definitions
from iottb.definitions import MAX_VERBOSITY, CONSOLE_LOG_FORMATS, APP_NAME, LOGFILE_LOG_FORMAT
loglevel = definitions.LOGLEVEL
def setup_logging(verbosity, debug=loglevel):
""" Setup root logger for iottb """
log_level = loglevel
handlers = []
date_format = '%Y-%m-%d %H:%M:%S'
if verbosity > 0:
log_level = logging.WARNING
if verbosity > MAX_VERBOSITY:
verbosity = MAX_VERBOSITY
log_level = logging.INFO
assert verbosity <= MAX_VERBOSITY, f'Verbosity must be <= {MAX_VERBOSITY}'
console_handler = logging.StreamHandler(sys.stdout)
print(str(sys.stdout))
console_handler.setFormatter(logging.Formatter(CONSOLE_LOG_FORMATS[verbosity], datefmt=date_format))
console_handler.setLevel(logging.DEBUG) # can keep at debug since it depends on global level?
handlers.append(console_handler)
if debug:
log_level = logging.DEBUG
# Logfile logs INFO+, no debugs though
file_handler = RotatingFileHandler(f'{str(definitions.LOGDIR / APP_NAME)}.log', maxBytes=10240, backupCount=5)
file_handler.setFormatter(logging.Formatter(LOGFILE_LOG_FORMAT[verbosity], datefmt=date_format))
file_handler.setLevel(logging.INFO)
# finnish root logger setup
handlers.append(file_handler)
# Force this config to be applied to root logger
logging.basicConfig(level=log_level, handlers=handlers, force=True)

View File

@ -0,0 +1,40 @@
import re
from iottb import definitions
import logging
logger = logging.getLogger(__name__)
def normalize_string(s, chars_to_replace=None, replacement=None, allow_unicode=False):
pass
def make_canonical_name(name):
"""
Normalize the device name to a canonical form:
- Replace the first two occurrences of spaces and transform characters with dashes.
- Remove any remaining spaces and non-ASCII characters.
- Convert to lowercase.
"""
aliases = [name]
logger.info(f'Normalizing name {name}')
# We first normalize
chars_to_replace = definitions.REPLACEMENT_SET_CANONICAL_DEVICE_NAMES
pattern = re.compile('|'.join(re.escape(char) for char in chars_to_replace))
norm_name = pattern.sub('-', name)
norm_name = re.sub(r'[^\x00-\x7F]+', '', norm_name) # removes non ascii chars
aliases.append(norm_name)
# Lower case
norm_name = norm_name.lower()
aliases.append(norm_name)
# canonical name is only first two parts of resulting string
parts = norm_name.split('-')
canonical_name = canonical_name = '-'.join(parts[:2])
aliases.append(canonical_name)
aliases = list(set(aliases))
logger.debug(f'Canonical name: {canonical_name}')
logger.debug(f'Aliases: {aliases}')
return canonical_name, aliases

View File

@ -0,0 +1,42 @@
# iottb/utils/user_interaction.py
import click
from iottb.definitions import TB_ECHO_STYLES
import sys
import os
def tb_echo2(msg: str, lvl='i', log=True):
style = TB_ECHO_STYLES.get(lvl, {})
click.secho(f'[IOTTB]', **style)
click.secho(f'[IOTTB] \t {msg}', **style)
last_prefix = None
def tb_echo(msg: str, lvl='i', log=True):
global last_prefix
prefix = f'Testbed [{lvl.upper()}]\n'
if last_prefix != prefix:
click.secho(prefix, nl=False, **TB_ECHO_STYLES['header'])
last_prefix = prefix
click.secho(f' {msg}', **TB_ECHO_STYLES[lvl])
def main():
tb_echo('Info message', 'i')
tb_echo('Warning message', 'w')
tb_echo('Error message', 'e')
tb_echo('Success message', 's')
if __name__ == '__main__':
# arrrgggg hacky
current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.abspath(os.path.join(current_dir, '../../'))
sys.path.insert(0, project_root)
main()

103
code/iottb-project/poetry.lock generated Normal file
View File

@ -0,0 +1,103 @@
# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand.
[[package]]
name = "click"
version = "8.1.7"
description = "Composable command line interface toolkit"
optional = false
python-versions = ">=3.7"
files = [
{file = "click-8.1.7-py3-none-any.whl", hash = "sha256:ae74fb96c20a0277a1d615f1e4d73c8414f5a98db8b799a7931d1582f3390c28"},
{file = "click-8.1.7.tar.gz", hash = "sha256:ca9853ad459e787e2192211578cc907e7594e294c7ccc834310722b41b9ca6de"},
]
[package.dependencies]
colorama = {version = "*", markers = "platform_system == \"Windows\""}
[[package]]
name = "colorama"
version = "0.4.6"
description = "Cross-platform colored terminal text."
optional = false
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7"
files = [
{file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"},
{file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"},
]
[[package]]
name = "iniconfig"
version = "2.0.0"
description = "brain-dead simple config-ini parsing"
optional = false
python-versions = ">=3.7"
files = [
{file = "iniconfig-2.0.0-py3-none-any.whl", hash = "sha256:b6a85871a79d2e3b22d2d1b94ac2824226a63c6b741c88f7ae975f18b6778374"},
{file = "iniconfig-2.0.0.tar.gz", hash = "sha256:2d91e135bf72d31a410b17c16da610a82cb55f6b0477d1a902134b24a455b8b3"},
]
[[package]]
name = "packaging"
version = "24.1"
description = "Core utilities for Python packages"
optional = false
python-versions = ">=3.8"
files = [
{file = "packaging-24.1-py3-none-any.whl", hash = "sha256:5b8f2217dbdbd2f7f384c41c628544e6d52f2d0f53c6d0c3ea61aa5d1d7ff124"},
{file = "packaging-24.1.tar.gz", hash = "sha256:026ed72c8ed3fcce5bf8950572258698927fd1dbda10a5e981cdf0ac37f4f002"},
]
[[package]]
name = "pluggy"
version = "1.5.0"
description = "plugin and hook calling mechanisms for python"
optional = false
python-versions = ">=3.8"
files = [
{file = "pluggy-1.5.0-py3-none-any.whl", hash = "sha256:44e1ad92c8ca002de6377e165f3e0f1be63266ab4d554740532335b9d75ea669"},
{file = "pluggy-1.5.0.tar.gz", hash = "sha256:2cffa88e94fdc978c4c574f15f9e59b7f4201d439195c3715ca9e2486f1d0cf1"},
]
[package.extras]
dev = ["pre-commit", "tox"]
testing = ["pytest", "pytest-benchmark"]
[[package]]
name = "pytest"
version = "8.2.2"
description = "pytest: simple powerful testing with Python"
optional = false
python-versions = ">=3.8"
files = [
{file = "pytest-8.2.2-py3-none-any.whl", hash = "sha256:c434598117762e2bd304e526244f67bf66bbd7b5d6cf22138be51ff661980343"},
{file = "pytest-8.2.2.tar.gz", hash = "sha256:de4bb8104e201939ccdc688b27a89a7be2079b22e2bd2b07f806b6ba71117977"},
]
[package.dependencies]
colorama = {version = "*", markers = "sys_platform == \"win32\""}
iniconfig = "*"
packaging = "*"
pluggy = ">=1.5,<2.0"
[package.extras]
dev = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"]
[[package]]
name = "scapy"
version = "2.5.0"
description = "Scapy: interactive packet manipulation tool"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, <4"
files = [
{file = "scapy-2.5.0.tar.gz", hash = "sha256:5b260c2b754fd8d409ba83ee7aee294ecdbb2c235f9f78fe90bc11cb6e5debc2"},
]
[package.extras]
basic = ["ipython"]
complete = ["cryptography (>=2.0)", "ipython", "matplotlib", "pyx"]
docs = ["sphinx (>=3.0.0)", "sphinx_rtd_theme (>=0.4.3)", "tox (>=3.0.0)"]
[metadata]
lock-version = "2.0"
python-versions = "^3.12"
content-hash = "10b2c268b0f10db15eab2cca3d2dc9dc25bc60f4b218ebf786fb780fa85557e0"

View File

@ -0,0 +1,21 @@
[tool.poetry]
name = "iottb"
version = "0.1.0"
description = "IoT Testbed"
authors = ["Sebastian Lenzlinger <sebastian.lenzlinger@unibas.ch>"]
readme = "README.md"
[tool.poetry.dependencies]
python = "^3.12"
click = "^8.1"
scapy = "^2.5"
[tool.poetry.scripts]
iottb = "iottb.main:cli"
[tool.poetry.group.test.dependencies]
pytest = "^8.2.2"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

View File

View File

@ -0,0 +1,23 @@
from iottb.utils.string_processing import make_canonical_name
import pytest
class TestMakeCanonicalName:
def test_normalizes_name_with_spaces_to_dashes(self):
name = "Device Name With Spaces"
expected_canonical_name = "device-name"
canonical_name, aliases = make_canonical_name(name)
assert canonical_name == expected_canonical_name
assert "device-name-with-spaces" in aliases
assert "device-name" in aliases
assert "Device Name With Spaces" in aliases
def test_name_with_no_spaces_or_special_characters(self):
name = "DeviceName123"
expected_canonical_name = "devicename123"
canonical_name, aliases = make_canonical_name(name)
assert canonical_name == expected_canonical_name
assert "DeviceName123" in aliases
assert "devicename123" in aliases

View File

@ -0,0 +1,55 @@
#!/usr/bin/env bash
# Note, this is not my original work. Source: https://linuxtldr.com/changing-interface-mode/
function list_nic_info () {
ip addr show
}
function enable_monm_iw () {
interface=$1
sudo ip link set "$interface" down
sudo iw "$interface" set monitor control
sudo ip link set "$interface" up
}
function disable_monm_iw () {
interface=$1
sudo ip link set "$interface" down
sudo iw "$interface" set type managed
sudo ip link set "$interface" up
}
function enable_monm_iwconfig () {
interface=$1
sudo ifconfig "$interface" down
sudo iwconfig "$interface" mode monitor
sudo ifconfig "$interface" up
}
function disable_monm_iwconfig () {
interface=$1
sudo ifconfig "$interface" down
sudo iwconfig "$interface" mode managed
sudo ifconfig "$interface" up
}
function enable_monm_acng () {
interface=$1
sudo airmon-ng check
sudo airmon-ng check kill
sudo airmon-ng start "$interface"
}
function disable_monm_acng () {
interface="${1}mon"
sudo airmon-ng stop "$interface"
sudo systemctl restart NetworkManager
}
if declare -f "$1" > /dev/null
then
"$@"
else
echo "Unknown function '$1'" >&2
exit 1
fi

View File

@ -0,0 +1,29 @@
import matplotlib.pyplot as plt
import networkx as nx
# Create the graph
G1 = nx.DiGraph()
# Add nodes with positions
G1.add_node("IoT Device", pos=(1, 3))
G1.add_node("AP", pos=(3, 3))
G1.add_node("Switch (Port Mirroring Enabled)", pos=(5, 3))
G1.add_node("Gateway Router", pos=(7, 3))
G1.add_node("Internet", pos=(9, 3))
G1.add_node("Capture Device", pos=(5, 1))
# Add edges
G1.add_edge("IoT Device", "AP")
G1.add_edge("AP", "Switch (Port Mirroring Enabled)")
G1.add_edge("Switch (Port Mirroring Enabled)", "Gateway Router")
G1.add_edge("Gateway Router", "Internet")
G1.add_edge("Switch (Port Mirroring Enabled)", "Capture Device")
# Draw the graph
pos = nx.get_node_attributes(G1, 'pos')
plt.figure(figsize=(12, 8))
nx.draw(G1, pos, with_labels=True, node_size=3000, node_color='lightblue', font_size=10, font_weight='bold')
nx.draw_networkx_edge_labels(G1, pos, edge_labels={("Switch (Port Mirroring Enabled)", "Capture Device"): "Mirrored Traffic"}, font_color='red')
plt.title("IoT Device Connected via AP to Gateway Router via Switch with Port Mirroring Enabled")
plt.show()

View File

@ -0,0 +1,27 @@
import matplotlib.pyplot as plt
import networkx as nx
# Create the graph
G2 = nx.DiGraph()
# Add nodes with positions
G2.add_node("IoT Device", pos=(1, 3))
G2.add_node("Capture Device (Hotspot)", pos=(3, 3))
G2.add_node("Ethernet Connection", pos=(5, 3))
G2.add_node("Gateway Router", pos=(7, 3))
G2.add_node("Internet", pos=(9, 3))
# Add edges
G2.add_edge("IoT Device", "Capture Device (Hotspot)")
G2.add_edge("Capture Device (Hotspot)", "Ethernet Connection")
G2.add_edge("Ethernet Connection", "Gateway Router")
G2.add_edge("Gateway Router", "Internet")
# Draw the graph
pos = nx.get_node_attributes(G2, 'pos')
plt.figure(figsize=(12, 8))
nx.draw(G2, pos, with_labels=True, node_size=3000, node_color='lightblue', font_size=10, font_weight='bold')
nx.draw_networkx_edge_labels(G2, pos, edge_labels={("Capture Device (Hotspot)", "Ethernet Connection"): "Bridged Traffic"}, font_color='red')
plt.title("Capture Device Provides Hotspot and Bridges to Ethernet for Internet")
plt.show()

View File