From e5ece09c33dc2698917d917d7ccc253596ded833 Mon Sep 17 00:00:00 2001 From: Sebastian Lenzlinger Date: Wed, 12 Jun 2024 13:31:49 +0200 Subject: [PATCH 01/10] Clean Slate (or not) --- .idea/2024-bsc-sebastian-lenzlinger.iml | 12 ++++++++---- code/iottb/__main__.py | 10 +++++----- code/iottb/models/capture_metadata_model.py | 9 +++++---- code/iottb/subcommands/add_device.py | 13 +++++++------ code/iottb/subcommands/capture.py | 9 ++++++--- notes/journal/2024-05-15-wed.md | 3 ++- 6 files changed, 33 insertions(+), 23 deletions(-) diff --git a/.idea/2024-bsc-sebastian-lenzlinger.iml b/.idea/2024-bsc-sebastian-lenzlinger.iml index aad402c..8e5446a 100644 --- a/.idea/2024-bsc-sebastian-lenzlinger.iml +++ b/.idea/2024-bsc-sebastian-lenzlinger.iml @@ -1,10 +1,14 @@ - + + + + + + + + - - \ No newline at end of file diff --git a/code/iottb/__main__.py b/code/iottb/__main__.py index 1512c70..36e81c6 100644 --- a/code/iottb/__main__.py +++ b/code/iottb/__main__.py @@ -47,22 +47,22 @@ def check_iottb_env(): response = input('Do you want to create it now? [y/N]') logger.debug(f'response: {response}') if response.lower() != 'y': - logger.debug(f'Not creating "{environ['IOTTB_HOME']}"') + logger.debug(f'Not setting "IOTTB_HOME"') print('TODO') print("Aborting execution...") return ReturnCodes.ABORTED else: - print(f'Creating "{environ['IOTTB_HOME']}"') + print(f'Setting environment variable IOTTB_HOME""') Path(IOTTB_HOME_ABS).mkdir(parents=True, exist_ok=False) # Should always work since in 'not exist' code path - return ReturnCodes.OK + return ReturnCodes.SUCCESS logger.info(f'"{IOTTB_HOME_ABS}" exists.') # TODO: Check that it is a valid iottb dir or can we say it is valid by definition if? - return ReturnCodes.OK + return ReturnCodes.SUCCESS def main(): - if check_iottb_env() != ReturnCodes.OK: + if check_iottb_env() != ReturnCodes.SUCCESS: exit(ReturnCodes.ABORTED) parser = setup_argparse() args = parser.parse_args() diff --git a/code/iottb/models/capture_metadata_model.py b/code/iottb/models/capture_metadata_model.py index 4213212..33584ea 100644 --- a/code/iottb/models/capture_metadata_model.py +++ b/code/iottb/models/capture_metadata_model.py @@ -12,11 +12,11 @@ from iottb.logger import logger class CaptureMetadata: # Required Fields device_metadata: DeviceMetadata - capture_id: str = lambda: str(uuid.uuid4()) + device_id: str capture_dir: Path capture_file: str - capture_date: str = lambda: datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower() + # Statistics start_time: str @@ -39,7 +39,8 @@ class CaptureMetadata: def __init__(self, device_metadata: DeviceMetadata, capture_dir: Path): logger.info(f'Creating CaptureMetadata model from DeviceMetadata: {device_metadata}') self.device_metadata = device_metadata - + self.capture_id = str(uuid.uuid4()) + self.capture_date = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower() self.capture_dir = capture_dir assert capture_dir.is_dir(), f'Capture directory {capture_dir} does not exist' @@ -47,7 +48,7 @@ class CaptureMetadata: logger.info(f'Building capture file name') if self.app is None: logger.debug(f'No app specified') - prefix = self.device_metadata.device_short_name + prefix = "iphone-14" #self.device_metadata.device_short_name else: logger.debug(f'App specified: {self.app}') assert str(self.app).strip() not in {'', ' '}, f'app is not a valid name: {self.app}' diff --git a/code/iottb/subcommands/add_device.py b/code/iottb/subcommands/add_device.py index d3fb9f8..14fe95b 100644 --- a/code/iottb/subcommands/add_device.py +++ b/code/iottb/subcommands/add_device.py @@ -1,4 +1,5 @@ import logging +import os import pathlib from iottb import definitions @@ -10,9 +11,11 @@ logger.setLevel(logging.INFO) # Since module currently passes all tests def setup_init_device_root_parser(subparsers): + #assert os.environ['IOTTB_HOME'] is not None, "IOTTB_HOME environment variable is not set" parser = subparsers.add_parser('add-device', aliases=['add-device-root', 'add'], help='Initialize a folder for a device.') - parser.add_argument('--root_dir', type=pathlib.Path, default=pathlib.Path.cwd()) + parser.add_argument('--root_dir', type=pathlib.Path, + default=definitions.IOTTB_HOME_ABS) # TODO: Refactor code to not use this or handle iottb here group = parser.add_mutually_exclusive_group() group.add_argument('--guided', action='store_true', help='Guided setup', default=False) group.add_argument('--name', action='store', type=str, help='name of device') @@ -20,14 +23,12 @@ def setup_init_device_root_parser(subparsers): def handle_add(args): + # TODO: This whole function should be refactored into using the fact that IOTTB_HOME is set, and the dir exists logger.info(f'Add device handler called with args {args}') - args.root_dir.mkdir(parents=True, - exist_ok=True) # else metadata.save_to_file will fail TODO: unclear what to assume - if args.guided: logger.debug('begin guided setup') - metadata = guided_setup(args.root_dir) + metadata = guided_setup(args.root_dir) # TODO refactor to use IOTTB_HOME logger.debug('guided setup complete') else: logger.debug('Setup through passed args: setup') @@ -36,7 +37,7 @@ def handle_add(args): return ReturnCodes.ERROR metadata = DeviceMetadata(args.name, args.root_dir) - file_path = args.root_dir / DEVICE_METADATA_FILE + file_path = args.root_dir / DEVICE_METADATA_FILE # TODO IOTTB_HOME REFACTOR if file_path.exists(): print('Directory already contains a metadata file. Aborting.') return ReturnCodes.ABORTED diff --git a/code/iottb/subcommands/capture.py b/code/iottb/subcommands/capture.py index a67b5e8..bb6e418 100644 --- a/code/iottb/subcommands/capture.py +++ b/code/iottb/subcommands/capture.py @@ -2,11 +2,13 @@ import subprocess from pathlib import Path from iottb.definitions import * +from iottb.logger import logger from iottb.models.capture_metadata_model import CaptureMetadata from iottb.models.device_metadata_model import DeviceMetadata, dir_contains_device_metadata from iottb.utils.capture_utils import get_capture_src_folder, make_capture_src_folder from iottb.utils.tcpdump_utils import check_installed + def setup_capture_parser(subparsers): parser = subparsers.add_parser('sniff', help='Sniff packets with tcpdump') # metadata args @@ -33,7 +35,7 @@ def setup_capture_parser(subparsers): help='Please see tcpdump manual for details. Unused by default.') cap_size_group = parser.add_mutually_exclusive_group(required=False) - cap_size_group.add_argument('-c', '--count', type=int, help='Number of packets to capture.', default=1000) + cap_size_group.add_argument('-c', '--count', type=int, help='Number of packets to capture.', default=10) cap_size_group.add_argument('--mins', type=int, help='Time in minutes to capture.', default=1) parser.set_defaults(func=handle_capture) @@ -88,6 +90,7 @@ def handle_capture(args): assert args.device_root is not None, f'Device root directory is required' assert dir_contains_device_metadata(args.device_root), f'Device metadata file \'{args.device_root}\' does not exist' # get device metadata + logger.info(f'Device root directory: {args.device_root}') if args.safe and not dir_contains_device_metadata(args.device_root): print(f'Supplied folder contains no device metadata. ' f'Please setup a device root directory before using this command') @@ -98,6 +101,7 @@ def handle_capture(args): else: name = input('Please enter a device name: ') args.device_root.mkdir(parents=True, exist_ok=True) + device_data = DeviceMetadata(name, args.device_root) # start constructing environment for capture capture_dir = get_capture_src_folder(args.device_root) @@ -152,7 +156,7 @@ def build_tcpdump_args(args, cmd, capture_metadata: CaptureMetadata): capture_metadata.build_capture_file_name() cmd.append('-w') - cmd.append(capture_metadata.capture_file) + cmd.append(str(capture_metadata.capture_dir) + "/" + capture_metadata.capture_file) if args.safe: cmd.append(f'host {args.device_ip}') # if not specified, filter 'any' implied by tcpdump @@ -160,7 +164,6 @@ def build_tcpdump_args(args, cmd, capture_metadata: CaptureMetadata): return cmd - # def capture_file_cmd(args, cmd, capture_dir, capture_metadata: CaptureMetadata): # capture_file_prefix = capture_metadata.get_device_metadata().get_device_short_name() # if args.app_name is not None: diff --git a/notes/journal/2024-05-15-wed.md b/notes/journal/2024-05-15-wed.md index 39af607..2145e86 100644 --- a/notes/journal/2024-05-15-wed.md +++ b/notes/journal/2024-05-15-wed.md @@ -11,4 +11,5 @@ With the above idea it would be possible to also refactor or rewrite how tcpdump I want an option such that one can automatically convert a captures resulting file into a csv. Probably will focus on tcpdump for now, since other tools like [[mitmproxy]] have different output files. ## Defining Experiment -I want a pair of commands that 1. provide a guided cli interface to define an experiment and 2. to run that experiment -> Here [Collective Knowledge Framework](https://github.com/mlcommons/ck) might actually come in handy. The already have tooling for setting up and defining aspects of experiments so that they become reproducible. So maybe one part of the `iottb` as a tool would be to write the correct json files into the directory which contain the informatin on how the command was run. Caveat: All all option values are the same, basically only, if it was used or not (flagging options) or that it was used (e.g. an ip address was used in the filter but the specific value of the ip is of no use for reproducing). Also, Collective Minds tooling relies very common ML algos/framework and static data. So maybe this only comes into play after a capture has been done. So maybe a feature extraction tool (see [[further considerations#Usage paths/ Workflows]]) should create the data and built the database separately. \ No newline at end of file +I want a pair of commands that 1. provide a guided cli interface to define an experiment and 2. to run that experiment -> Here [Collective Knowledge Framework](https://github.com/mlcommons/ck) might actually come in handy. The already have tooling for setting up and defining aspects of experiments so that they become reproducible. So maybe one part of the `iottb` as a tool would be to write the correct json files into the directory which contain the informatin on how the command was run. Caveat: All all option values are the same, basically only, if it was used or not (flagging options) or that it was used (e.g. an ip address was used in the filter but the specific value of the ip is of no use for reproducing). Also, Collective Minds tooling relies very common ML algos/framework and static data. So maybe this only comes into play after a capture has been done. So maybe a feature extraction tool (see [[further considerations#Usage paths/ Workflows]]) should create the data and built the database separately. +#remark TCP dump filter could also be exported into an environment variable? But then again what is the use of defining a conformance, then could use the raw capture idea for tcpdump, too. \ No newline at end of file From 2b782bbdca0143d2ee6ef217f468455b8915bbde Mon Sep 17 00:00:00 2001 From: Sebastian Lenzlinger Date: Wed, 12 Jun 2024 16:04:32 +0200 Subject: [PATCH 02/10] Cleanup --- .gitignore | 2 ++ .idea/2024-bsc-sebastian-lenzlinger.iml | 14 -------------- .idea/misc.xml | 7 ------- code/pyproject.toml | 17 +++++++++++++++++ 4 files changed, 19 insertions(+), 21 deletions(-) delete mode 100644 .idea/2024-bsc-sebastian-lenzlinger.iml delete mode 100644 .idea/misc.xml create mode 100644 code/pyproject.toml diff --git a/.gitignore b/.gitignore index d05fd96..aac33da 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,5 @@ __pycache__ /.idea .idea/ 2024-bsc-sebastian-lenzlinger.iml +logs/ +requirements.txt diff --git a/.idea/2024-bsc-sebastian-lenzlinger.iml b/.idea/2024-bsc-sebastian-lenzlinger.iml deleted file mode 100644 index 8e5446a..0000000 --- a/.idea/2024-bsc-sebastian-lenzlinger.iml +++ /dev/null @@ -1,14 +0,0 @@ - - - - - - - - - - - - \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml deleted file mode 100644 index 0ea184e..0000000 --- a/.idea/misc.xml +++ /dev/null @@ -1,7 +0,0 @@ - - - - - - \ No newline at end of file diff --git a/code/pyproject.toml b/code/pyproject.toml new file mode 100644 index 0000000..8e8bad2 --- /dev/null +++ b/code/pyproject.toml @@ -0,0 +1,17 @@ +[build-system] +requires = ["setuptools", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = 'iottb' +version = '0.1.0' +authors = [{name = "Sebastian Lenzlinger", email = "sebastian.lenzlinger@unibas.ch"}] +description = "Automation Tool for Capturing Network packets of IoT devices." +license = {file = "LICENSE"} +dependencies = [ + # List your dependencies here, e.g., "numpy", "pandas>=1.0" +] +include-package-data = true + +[project.scripts] +iottb = "iottb.__main__:main" \ No newline at end of file From 5196c2e129eacf2a6ffc38604cc9bcf4db4b84ae Mon Sep 17 00:00:00 2001 From: Sebastian Lenzlinger Date: Wed, 12 Jun 2024 16:47:45 +0200 Subject: [PATCH 03/10] Ensure no arguments are passed to list_interfaces() --- code/iottb/__main__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/code/iottb/__main__.py b/code/iottb/__main__.py index 36e81c6..d757a1e 100644 --- a/code/iottb/__main__.py +++ b/code/iottb/__main__.py @@ -7,7 +7,7 @@ from iottb.logger import logger from iottb.subcommands.add_device import setup_init_device_root_parser from iottb.subcommands.capture import setup_capture_parser from iottb.utils.tcpdump_utils import list_interfaces -from definitions import IOTTB_HOME_ABS, ReturnCodes +from iottb.definitions import IOTTB_HOME_ABS, ReturnCodes ###################### @@ -28,7 +28,7 @@ def setup_argparse(): interfaces_parser = subparsers.add_parser('list-interfaces', aliases=['li', 'if'], help='List available network interfaces.') - interfaces_parser.set_defaults(func=list_interfaces) + interfaces_parser.set_defaults(func=list_interfaces()) return root_parser From f22b06ad14ae284a7d0a440a1a3258151f954966 Mon Sep 17 00:00:00 2001 From: Sebastian Lenzlinger Date: Wed, 12 Jun 2024 16:51:17 +0200 Subject: [PATCH 04/10] Small corrections to list_interfaces(). Also make pyproject.toml usable. --- .gitignore | 1 + code/iottb/utils/tcpdump_utils.py | 2 +- code/pyproject.toml | 11 +++++------ 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/.gitignore b/.gitignore index aac33da..0ed2dec 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ __pycache__ 2024-bsc-sebastian-lenzlinger.iml logs/ requirements.txt +*.egg-info/ diff --git a/code/iottb/utils/tcpdump_utils.py b/code/iottb/utils/tcpdump_utils.py index d9df1d5..2763f11 100644 --- a/code/iottb/utils/tcpdump_utils.py +++ b/code/iottb/utils/tcpdump_utils.py @@ -20,7 +20,7 @@ def list_interfaces() -> str: ensure_installed() try: result = subprocess.run(['tcpdump', '--list-interfaces'], capture_output=True, text=True, check=True) - return result.stdout + print(result.stdout) except subprocess.CalledProcessError as e: print(f'Failed to list interfaces: {e}') return '' diff --git a/code/pyproject.toml b/code/pyproject.toml index 8e8bad2..1261bc7 100644 --- a/code/pyproject.toml +++ b/code/pyproject.toml @@ -1,5 +1,5 @@ [build-system] -requires = ["setuptools", "wheel"] +requires = ["setuptools>=42", "wheel"] build-backend = "setuptools.build_meta" [project] @@ -7,11 +7,10 @@ name = 'iottb' version = '0.1.0' authors = [{name = "Sebastian Lenzlinger", email = "sebastian.lenzlinger@unibas.ch"}] description = "Automation Tool for Capturing Network packets of IoT devices." -license = {file = "LICENSE"} -dependencies = [ - # List your dependencies here, e.g., "numpy", "pandas>=1.0" -] -include-package-data = true +requires-python = ">=3.8" + +[tool.setuptools] +packages = ["iottb"] [project.scripts] iottb = "iottb.__main__:main" \ No newline at end of file From ae82bd3a67ec042a708ff4c1e64cebf1f67a0487 Mon Sep 17 00:00:00 2001 From: Sebastian Lenzlinger Date: Wed, 12 Jun 2024 17:33:42 +0200 Subject: [PATCH 05/10] Fix debugger --- code/iottb/__main__.py | 18 ++++++++++--- code/iottb/logger.py | 29 +++++++++++++-------- code/iottb/models/capture_metadata_model.py | 5 +++- code/iottb/models/device_metadata_model.py | 5 +++- code/iottb/subcommands/add_device.py | 6 ++--- code/iottb/subcommands/capture.py | 4 ++- 6 files changed, 46 insertions(+), 21 deletions(-) diff --git a/code/iottb/__main__.py b/code/iottb/__main__.py index d757a1e..379a7e2 100644 --- a/code/iottb/__main__.py +++ b/code/iottb/__main__.py @@ -2,14 +2,15 @@ import argparse from os import environ from pathlib import Path - -from iottb.logger import logger +import logging from iottb.subcommands.add_device import setup_init_device_root_parser from iottb.subcommands.capture import setup_capture_parser from iottb.utils.tcpdump_utils import list_interfaces from iottb.definitions import IOTTB_HOME_ABS, ReturnCodes +from iottb.logger import setup_logging - +logger = logging.getLogger('iottbLogger.__main__') +logger.setLevel(logging.DEBUG) ###################### # Argparse setup ###################### @@ -64,9 +65,12 @@ def check_iottb_env(): def main(): if check_iottb_env() != ReturnCodes.SUCCESS: exit(ReturnCodes.ABORTED) + + logger.debug(f'Pre setup_argparse()') parser = setup_argparse() + logger.debug('Post setup_argparse().') args = parser.parse_args() - print(args) + logger.debug(f'Args parsed: {args}') if args.command: try: args.func(args) @@ -74,9 +78,15 @@ def main(): print('Received keyboard interrupt. Exiting...') exit(1) except Exception as e: + logger.debug(f'Error in main: {e}') print(f'Error: {e}') # create_capture_directory(args.device_name) if __name__ == '__main__': + setup_logging() + logger.debug("Debug level is working") + logger.info("Info level is working") + logger.warning("Warning level is working") + main() diff --git a/code/iottb/logger.py b/code/iottb/logger.py index ea6add2..cc0cdb5 100644 --- a/code/iottb/logger.py +++ b/code/iottb/logger.py @@ -1,28 +1,35 @@ import logging import sys +import os from logging.handlers import RotatingFileHandler def setup_logging(): - logger_obj = logging.getLogger('iottbLogger') - logger_obj.setLevel(logging.DEBUG) + # Ensure the logs directory exists + log_directory = 'logs' + if not os.path.exists(log_directory): + os.makedirs(log_directory) - file_handler = RotatingFileHandler('iottb.log') + # Create handlers + file_handler = RotatingFileHandler(os.path.join(log_directory, 'iottb.log'), maxBytes=1048576, backupCount=5) console_handler = logging.StreamHandler(sys.stdout) - file_handler.setLevel(logging.INFO) - console_handler.setLevel(logging.DEBUG) - + # Create formatters and add it to handlers file_fmt = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') - console_fmt = logging.Formatter('%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s') + console_fmt = logging.Formatter( + '%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s') file_handler.setFormatter(file_fmt) console_handler.setFormatter(console_fmt) - logger_obj.addHandler(file_handler) - logger_obj.addHandler(console_handler) + # Get the root logger and add handlers + root_logger = logging.getLogger() + root_logger.setLevel(logging.DEBUG) + root_logger.addHandler(file_handler) + root_logger.addHandler(console_handler) - return logger_obj + # Prevent propagation to the root logger to avoid duplicate logs + root_logger.propagate = False -logger = setup_logging() +setup_logging() diff --git a/code/iottb/models/capture_metadata_model.py b/code/iottb/models/capture_metadata_model.py index 33584ea..9ddec56 100644 --- a/code/iottb/models/capture_metadata_model.py +++ b/code/iottb/models/capture_metadata_model.py @@ -6,7 +6,10 @@ from typing import Optional from iottb.definitions import ReturnCodes, CAPTURE_METADATA_FILE from iottb.models.device_metadata_model import DeviceMetadata -from iottb.logger import logger +import logging + +logger = logging.getLogger('iottbLogger.capture_metadata_model') +logger.setLevel(logging.DEBUG) class CaptureMetadata: diff --git a/code/iottb/models/device_metadata_model.py b/code/iottb/models/device_metadata_model.py index 359aa96..b0ea063 100644 --- a/code/iottb/models/device_metadata_model.py +++ b/code/iottb/models/device_metadata_model.py @@ -6,7 +6,10 @@ from typing import Optional, List # iottb modules from iottb.definitions import ReturnCodes, DEVICE_METADATA_FILE -from iottb.logger import logger +import logging + +logger = logging.getLogger('iottbLogger.device_metadata_model') +logger.setLevel(logging.DEBUG) # 3rd party libs IMMUTABLE_FIELDS = {'device_name', 'device_short_name', 'device_id', 'date_created'} diff --git a/code/iottb/subcommands/add_device.py b/code/iottb/subcommands/add_device.py index 14fe95b..425bb16 100644 --- a/code/iottb/subcommands/add_device.py +++ b/code/iottb/subcommands/add_device.py @@ -4,11 +4,11 @@ import pathlib from iottb import definitions from iottb.definitions import DEVICE_METADATA_FILE, ReturnCodes -from iottb.logger import logger from iottb.models.device_metadata_model import DeviceMetadata -logger.setLevel(logging.INFO) # Since module currently passes all tests - +# logger.setLevel(logging.INFO) # Since module currently passes all tests +logger = logging.getLogger('iottbLogger.add_device') +logger.setLevel(logging.INFO) def setup_init_device_root_parser(subparsers): #assert os.environ['IOTTB_HOME'] is not None, "IOTTB_HOME environment variable is not set" diff --git a/code/iottb/subcommands/capture.py b/code/iottb/subcommands/capture.py index bb6e418..46ff6e1 100644 --- a/code/iottb/subcommands/capture.py +++ b/code/iottb/subcommands/capture.py @@ -2,12 +2,14 @@ import subprocess from pathlib import Path from iottb.definitions import * -from iottb.logger import logger +import logging from iottb.models.capture_metadata_model import CaptureMetadata from iottb.models.device_metadata_model import DeviceMetadata, dir_contains_device_metadata from iottb.utils.capture_utils import get_capture_src_folder, make_capture_src_folder from iottb.utils.tcpdump_utils import check_installed +logger = logging.getLogger('iottbLogger.capture') +logger.setLevel(logging.DEBUG) def setup_capture_parser(subparsers): parser = subparsers.add_parser('sniff', help='Sniff packets with tcpdump') From 7d9095f113bd6cea3e9e59f5be908e8a68e27c08 Mon Sep 17 00:00:00 2001 From: Sebastian Lenzlinger Date: Wed, 12 Jun 2024 20:01:59 +0200 Subject: [PATCH 06/10] SYNC --- code/iottb/__main__.py | 2 +- code/iottb/main2.py | 10 ++++++++++ code/iottb/subcommands/sniff.py | 10 ++++++++++ code/iottb/utils/tcpdump_utils.py | 2 +- notes/scrible | 13 +++++++++++++ 5 files changed, 35 insertions(+), 2 deletions(-) create mode 100644 code/iottb/main2.py create mode 100644 code/iottb/subcommands/sniff.py create mode 100644 notes/scrible diff --git a/code/iottb/__main__.py b/code/iottb/__main__.py index 379a7e2..ff3150f 100644 --- a/code/iottb/__main__.py +++ b/code/iottb/__main__.py @@ -29,7 +29,7 @@ def setup_argparse(): interfaces_parser = subparsers.add_parser('list-interfaces', aliases=['li', 'if'], help='List available network interfaces.') - interfaces_parser.set_defaults(func=list_interfaces()) + interfaces_parser.set_defaults(func=list_interfaces) return root_parser diff --git a/code/iottb/main2.py b/code/iottb/main2.py new file mode 100644 index 0000000..cb82aba --- /dev/null +++ b/code/iottb/main2.py @@ -0,0 +1,10 @@ +import subprocess +import logging + +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.DEBUG) + + + + +if __name__ == '__main__': diff --git a/code/iottb/subcommands/sniff.py b/code/iottb/subcommands/sniff.py new file mode 100644 index 0000000..1a9ab7f --- /dev/null +++ b/code/iottb/subcommands/sniff.py @@ -0,0 +1,10 @@ +import subprocess + +def sniff_tcpdump(args, filter): + pass + +def sniff_mitmproxy(args, filter): + pass + +def sniff_raw(cmd,args): + pass \ No newline at end of file diff --git a/code/iottb/utils/tcpdump_utils.py b/code/iottb/utils/tcpdump_utils.py index 2763f11..6870202 100644 --- a/code/iottb/utils/tcpdump_utils.py +++ b/code/iottb/utils/tcpdump_utils.py @@ -15,7 +15,7 @@ def ensure_installed(): raise RuntimeError('tcpdump is not installed. Please install it to continue.') -def list_interfaces() -> str: +def list_interfaces(args) -> str: """List available network interfaces using tcpdump.""" ensure_installed() try: diff --git a/notes/scrible b/notes/scrible new file mode 100644 index 0000000..3663610 --- /dev/null +++ b/notes/scrible @@ -0,0 +1,13 @@ +`iottb sniff`: + min: nothing + min meaningfull: interface + min usefull: ip/mac addr of dev + good: ip/mac, device type + better: + +`iottb device` + `add`: add new device config + +`iottb db` + `init` initialize device database + `add` add device From bf33dfe3a839468dc213c757292051d0a6d46883 Mon Sep 17 00:00:00 2001 From: Sebastian Lenzlinger Date: Wed, 12 Jun 2024 22:33:08 +0200 Subject: [PATCH 07/10] Why am I so slow???????????? Just sync commit. Slowly but surely getting allong with this refactoring. --- archive/functions_dump.py | 27 +++++++++++ code/iottb/__main__.py | 85 +++++++++++++++++++-------------- code/iottb/main2.py | 10 ---- code/iottb/subcommands/sniff.py | 53 ++++++++++++++++++++ notes/scrible.py | 7 +++ 5 files changed, 137 insertions(+), 45 deletions(-) delete mode 100644 code/iottb/main2.py create mode 100644 notes/scrible.py diff --git a/archive/functions_dump.py b/archive/functions_dump.py index 4d94ad9..1639e2a 100644 --- a/archive/functions_dump.py +++ b/archive/functions_dump.py @@ -30,3 +30,30 @@ def setup_sniff_parser(subparsers): def setup_pcap_filter_parser(parser_sniff): parser_pcap_filter = parser_sniff.add_argument_parser('pcap-filter expression') pass + +def check_iottb_env(): + # This makes the option '--root-dir' obsolescent # TODO How to streamline this?\ + try: + iottb_home = environ['IOTTB_HOME'] # TODO WARN implicit declaration of env var name! + except KeyError: + logger.error(f"Environment variable 'IOTTB_HOME' is not set." + f"Setting environment variable 'IOTTB_HOME' to '~/{IOTTB_HOME_ABS}'") + environ['IOTTB_HOME'] = IOTTB_HOME_ABS + finally: + if not Path(IOTTB_HOME_ABS).exists(): + print(f'"{IOTTB_HOME_ABS}" does not exist.') + response = input('Do you want to create it now? [y/N]') + logger.debug(f'response: {response}') + if response.lower() != 'y': + logger.debug(f'Not setting "IOTTB_HOME"') + print('TODO') + print("Aborting execution...") + return ReturnCodes.ABORTED + else: + print(f'Setting environment variable IOTTB_HOME""') + Path(IOTTB_HOME_ABS).mkdir(parents=True, + exist_ok=False) # Should always work since in 'not exist' code path + return ReturnCodes.SUCCESS + logger.info(f'"{IOTTB_HOME_ABS}" exists.') + # TODO: Check that it is a valid iottb dir or can we say it is valid by definition if? + return ReturnCodes.SUCCESS diff --git a/code/iottb/__main__.py b/code/iottb/__main__.py index ff3150f..82199ff 100644 --- a/code/iottb/__main__.py +++ b/code/iottb/__main__.py @@ -4,27 +4,36 @@ from os import environ from pathlib import Path import logging from iottb.subcommands.add_device import setup_init_device_root_parser -from iottb.subcommands.capture import setup_capture_parser +# from iottb.subcommands.capture import setup_capture_parser +from iottb.subcommands.sniff import setup_sniff_parser from iottb.utils.tcpdump_utils import list_interfaces from iottb.definitions import IOTTB_HOME_ABS, ReturnCodes from iottb.logger import setup_logging logger = logging.getLogger('iottbLogger.__main__') logger.setLevel(logging.DEBUG) + + ###################### # Argparse setup ###################### def setup_argparse(): # create top level parser root_parser = argparse.ArgumentParser(prog='iottb') - subparsers = root_parser.add_subparsers(title='subcommands', required=True, dest='command') - # shared options root_parser.add_argument('--verbose', '-v', action='count', default=0) - # configure subcommands - setup_capture_parser(subparsers) - setup_init_device_root_parser(subparsers) + # Group of args w.r.t iottb.db creation + group = root_parser.add_argument_group('database options') + group.add_argument('--db-home', default=Path.home() / 'IoTtb.db') + group.add_argument('--config-home', default=Path.home() / '.config' / 'iottb.conf', type=Path, ) + group.add_argument('--user', default=Path.home().stem, type=Path, ) + + # configure subcommands + subparsers = root_parser.add_subparsers(title='subcommands', required=True, dest='command') + # setup_capture_parser(subparsers) + setup_init_device_root_parser(subparsers) + setup_sniff_parser(subparsers) # Utility to list interfaces directly with iottb instead of relying on external tooling interfaces_parser = subparsers.add_parser('list-interfaces', aliases=['li', 'if'], @@ -34,38 +43,44 @@ def setup_argparse(): return root_parser -def check_iottb_env(): - # This makes the option '--root-dir' obsolescent # TODO How to streamline this?\ - try: - iottb_home = environ['IOTTB_HOME'] # TODO WARN implicit declaration of env var name! - except KeyError: - logger.error(f"Environment variable 'IOTTB_HOME' is not set." - f"Setting environment variable 'IOTTB_HOME' to '~/{IOTTB_HOME_ABS}'") - environ['IOTTB_HOME'] = IOTTB_HOME_ABS - finally: - if not Path(IOTTB_HOME_ABS).exists(): - print(f'"{IOTTB_HOME_ABS}" does not exist.') - response = input('Do you want to create it now? [y/N]') - logger.debug(f'response: {response}') - if response.lower() != 'y': - logger.debug(f'Not setting "IOTTB_HOME"') - print('TODO') - print("Aborting execution...") - return ReturnCodes.ABORTED - else: - print(f'Setting environment variable IOTTB_HOME""') - Path(IOTTB_HOME_ABS).mkdir(parents=True, - exist_ok=False) # Should always work since in 'not exist' code path - return ReturnCodes.SUCCESS - logger.info(f'"{IOTTB_HOME_ABS}" exists.') - # TODO: Check that it is a valid iottb dir or can we say it is valid by definition if? - return ReturnCodes.SUCCESS +### +# Where put ?! +### +class IoTdb: + def __init__(self, db_home=Path.home() / 'IoTtb.db', iottb_config=Path.home() / '.conf' / 'iottb.conf', + user=Path.home().stem): + self.db_home = db_home + self.config_home = iottb_config + self.default_filters_home = db_home / 'default_filters' + self.user = user + + def create_db(self, mode=0o777, parents=False, exist_ok=False): + logger.info(f'Creating db at {self.db_home}') + try: + self.db_home.mkdir(mode=mode, parents=parents, exist_ok=exist_ok) + except FileExistsError: + logger.error(f'Database path already at {self.db_home} exists and is not a directory') + finally: + logger.debug(f'Leaving finally clause in create_db') + + def create_device_tree(self, mode=0o777, parents=False, exist_ok=False): + logger.info(f'Creating device tree at {self.db_home / 'devices'}') + + def parse_db_config(self): + pass + + def parse_iottb_config(self): + pass + + def get_known_devices(self): + pass + + +def iottb_db_exists(db_home=Path.home() / 'IoTtb.db'): + res = db_home.is_dir() def main(): - if check_iottb_env() != ReturnCodes.SUCCESS: - exit(ReturnCodes.ABORTED) - logger.debug(f'Pre setup_argparse()') parser = setup_argparse() logger.debug('Post setup_argparse().') diff --git a/code/iottb/main2.py b/code/iottb/main2.py deleted file mode 100644 index cb82aba..0000000 --- a/code/iottb/main2.py +++ /dev/null @@ -1,10 +0,0 @@ -import subprocess -import logging - -logger = logging.getLogger(__name__) -logging.basicConfig(level=logging.DEBUG) - - - - -if __name__ == '__main__': diff --git a/code/iottb/subcommands/sniff.py b/code/iottb/subcommands/sniff.py index 1a9ab7f..554a9c7 100644 --- a/code/iottb/subcommands/sniff.py +++ b/code/iottb/subcommands/sniff.py @@ -1,4 +1,57 @@ import subprocess +import logging + + +logger = logging.getLogger('iottbLogger.capture') +logger.setLevel(logging.DEBUG) +class Sniffer: + def __init__(self): + pass + + +def setup_sniff_parser(subparsers): + parser = subparsers.add_parser('sniff', help='Sniff packets with tcpdump') + # metadata args + parser.add_argument('-a', '--addr', help='IP or MAC address of IoT device') + # tcpdump args + parser.add_argument('--app', help='Application name to sniff', default=None) + + parser_sniff_tcpdump = parser.add_argument_group('tcpdump arguments') + + parser_sniff_tcpdump.add_argument('-i', '--interface', help='Interface to capture on.', dest='capture_interface', + required=True) + parser_sniff_tcpdump.add_argument('-I', '--monitor-mode', help='Put interface into monitor mode', + action='store_true') + parser_sniff_tcpdump.add_argument('-n', help='Deactivate name resolution. True by default.', + action='store_true', dest='no_name_resolution') + parser_sniff_tcpdump.add_argument('-#', '--number', + help='Print packet number at beginning of line. True by default.', + action='store_true') + parser_sniff_tcpdump.add_argument('-e', help='Print link layer headers. True by default.', + action='store_true', dest='print_link_layer') + parser_sniff_tcpdump.add_argument('-t', action='count', default=0, + help='Please see tcpdump manual for details. Unused by default.') + + cap_size_group = parser.add_mutually_exclusive_group(required=False) + cap_size_group.add_argument('-c', '--count', type=int, help='Number of packets to capture.', default=10) + cap_size_group.add_argument('--mins', type=int, help='Time in minutes to capture.', default=1) + + parser.set_defaults(func=sniff) + + +def parse_addr(addr): + #TODO Implement + pass + + +def sniff(args): + if args.addr is None: + print('You must supply either a MAC or IP(v4) address to use this tool!') + logger.info("Exiting on account of missing MAC/IP.") + exit(1) + else: + (type, value) = parse_addr(args.addr) + #TODO Get this party started def sniff_tcpdump(args, filter): pass diff --git a/notes/scrible.py b/notes/scrible.py new file mode 100644 index 0000000..dae9072 --- /dev/null +++ b/notes/scrible.py @@ -0,0 +1,7 @@ +class Config: + db_dir = Path.home() + app_config_dir = Path.home /.Config + db_name = 'IoTtb.db' + app_config_name = 'iottb.conf' + + From b345474a899e944235a9484446484f3267c71d52 Mon Sep 17 00:00:00 2001 From: Sebastian Lenzlinger Date: Wed, 12 Jun 2024 23:07:09 +0200 Subject: [PATCH 08/10] eod sync --- code/iottb/__main__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/code/iottb/__main__.py b/code/iottb/__main__.py index 82199ff..9e6d869 100644 --- a/code/iottb/__main__.py +++ b/code/iottb/__main__.py @@ -65,6 +65,7 @@ class IoTdb: def create_device_tree(self, mode=0o777, parents=False, exist_ok=False): logger.info(f'Creating device tree at {self.db_home / 'devices'}') + #TODO def parse_db_config(self): pass From 01954bd5a60cb193ffbe198e932f1f5b59df6428 Mon Sep 17 00:00:00 2001 From: Sebastian Lenzlinger Date: Tue, 18 Jun 2024 03:12:28 +0200 Subject: [PATCH 09/10] Introduce complete refactoring. --- {code => archive}/.gitkeep | 0 .../iottb/subcommands => archive}/__init__.py | 0 {code/tests => archive/iottb}/__init__.py | 0 archive/iottb/__main__.py | 107 ++++++++++++++++ {code => archive}/iottb/definitions.py | 0 archive/iottb/logger.py | 35 ++++++ .../iottb/models}/__init__.py | 0 .../iottb/models/capture_metadata_model.py | 0 .../iottb/models/device_metadata_model.py | 0 .../iottb/subcommands/__init__.py | 0 .../iottb/subcommands/add_device.py | 0 .../iottb/subcommands/capture.py | 0 {code => archive}/iottb/subcommands/sniff.py | 0 .../iottb/utils/__init__.py | 0 archive/iottb/utils/capture_utils.py | 44 +++++++ archive/iottb/utils/tcpdump_utils.py | 41 +++++++ {code => archive}/iottb/utils/utils.py | 0 {code => archive}/misc/dnsmasq.conf | 0 {code => archive}/misc/enable-forwarding.sh | 0 {code => archive}/misc/hostapd.conf | 0 {code => archive}/misc/hostapd.conf.bak | 0 {code => archive}/misc/initSwAP | 0 {code => archive}/misc/initSwAP_nftables | 0 {code => archive}/misc/make_ap.sh | 0 {code => archive}/pyproject.toml | 0 .../test_main.py => archive/tests/__init__.py | 0 .../tests/fixtures/__init__.py | 0 .../tests/fixtures/shared_fixtures.py | 0 .../models/test_capture_metadata_model.py | 0 .../models/test_device_metadata_model.py | 0 .../tests/subcommands/test_add_device.py | 0 .../tests/test_capture_metadata_model.py | 0 archive/tests/test_main.py | 0 .../utils/test_capture_metadata_utils.py | 0 .../tests/utils/test_device_metadata_utils.py | 0 archive/tests/utils/test_tcpdump_utils.py | 0 code/iottb/.gitignore | 3 + code/iottb/__main__.py | 116 +++--------------- code/iottb/commands/__init__.py | 0 code/iottb/commands/sniff.py | 108 ++++++++++++++++ code/iottb/config.json | 1 + code/iottb/config.py | 45 +++++++ code/iottb/logger.py | 35 ------ code/iottb/models/capture_metadata.py | 29 +++++ code/iottb/models/device_metadata.py | 27 ++++ code/iottb/pyproject.toml | 18 +++ .../templates/capture_metadata_template.json | 15 +++ code/iottb/templates/config_template.json | 4 + .../templates/device_metadata_template.json | 14 +++ code/iottb/test/test_Config.py | 43 +++++++ .../test/test_ensure_directory_exists.py | 38 ++++++ code/iottb/test/test_is_ip_address.py | 62 ++++++++++ code/iottb/test/test_is_mac_address.py | 64 ++++++++++ code/iottb/utils/capture_utils.py | 54 +++----- code/iottb/utils/file_utils.py | 19 +++ code/iottb/utils/tcpdump_utils.py | 38 +----- 56 files changed, 754 insertions(+), 206 deletions(-) rename {code => archive}/.gitkeep (100%) rename {code/iottb/subcommands => archive}/__init__.py (100%) rename {code/tests => archive/iottb}/__init__.py (100%) create mode 100644 archive/iottb/__main__.py rename {code => archive}/iottb/definitions.py (100%) create mode 100644 archive/iottb/logger.py rename {code/tests/fixtures => archive/iottb/models}/__init__.py (100%) rename {code => archive}/iottb/models/capture_metadata_model.py (100%) rename {code => archive}/iottb/models/device_metadata_model.py (100%) rename code/tests/models/test_capture_metadata_model.py => archive/iottb/subcommands/__init__.py (100%) rename {code => archive}/iottb/subcommands/add_device.py (100%) rename {code => archive}/iottb/subcommands/capture.py (100%) rename {code => archive}/iottb/subcommands/sniff.py (100%) rename code/tests/models/test_device_metadata_model.py => archive/iottb/utils/__init__.py (100%) create mode 100644 archive/iottb/utils/capture_utils.py create mode 100644 archive/iottb/utils/tcpdump_utils.py rename {code => archive}/iottb/utils/utils.py (100%) rename {code => archive}/misc/dnsmasq.conf (100%) rename {code => archive}/misc/enable-forwarding.sh (100%) rename {code => archive}/misc/hostapd.conf (100%) rename {code => archive}/misc/hostapd.conf.bak (100%) rename {code => archive}/misc/initSwAP (100%) rename {code => archive}/misc/initSwAP_nftables (100%) rename {code => archive}/misc/make_ap.sh (100%) rename {code => archive}/pyproject.toml (100%) rename code/tests/test_main.py => archive/tests/__init__.py (100%) rename code/tests/utils/test_device_metadata_utils.py => archive/tests/fixtures/__init__.py (100%) rename {code => archive}/tests/fixtures/shared_fixtures.py (100%) rename code/tests/utils/test_tcpdump_utils.py => archive/tests/models/test_capture_metadata_model.py (100%) create mode 100644 archive/tests/models/test_device_metadata_model.py rename {code => archive}/tests/subcommands/test_add_device.py (100%) rename {code => archive}/tests/test_capture_metadata_model.py (100%) create mode 100644 archive/tests/test_main.py rename {code => archive}/tests/utils/test_capture_metadata_utils.py (100%) create mode 100644 archive/tests/utils/test_device_metadata_utils.py create mode 100644 archive/tests/utils/test_tcpdump_utils.py create mode 100644 code/iottb/.gitignore create mode 100644 code/iottb/commands/__init__.py create mode 100644 code/iottb/commands/sniff.py create mode 100644 code/iottb/config.json create mode 100644 code/iottb/config.py create mode 100644 code/iottb/models/capture_metadata.py create mode 100644 code/iottb/models/device_metadata.py create mode 100644 code/iottb/pyproject.toml create mode 100644 code/iottb/templates/capture_metadata_template.json create mode 100644 code/iottb/templates/config_template.json create mode 100644 code/iottb/templates/device_metadata_template.json create mode 100644 code/iottb/test/test_Config.py create mode 100644 code/iottb/test/test_ensure_directory_exists.py create mode 100644 code/iottb/test/test_is_ip_address.py create mode 100644 code/iottb/test/test_is_mac_address.py create mode 100644 code/iottb/utils/file_utils.py diff --git a/code/.gitkeep b/archive/.gitkeep similarity index 100% rename from code/.gitkeep rename to archive/.gitkeep diff --git a/code/iottb/subcommands/__init__.py b/archive/__init__.py similarity index 100% rename from code/iottb/subcommands/__init__.py rename to archive/__init__.py diff --git a/code/tests/__init__.py b/archive/iottb/__init__.py similarity index 100% rename from code/tests/__init__.py rename to archive/iottb/__init__.py diff --git a/archive/iottb/__main__.py b/archive/iottb/__main__.py new file mode 100644 index 0000000..208076e --- /dev/null +++ b/archive/iottb/__main__.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python3 +import argparse +from os import environ +from pathlib import Path +import logging +from archive.iottb.subcommands.add_device import setup_init_device_root_parser +# from iottb.subcommands.capture import setup_capture_parser +from iottb.subcommands.sniff import setup_sniff_parser +from iottb.utils.tcpdump_utils import list_interfaces +from iottb.logger import setup_logging + +logger = logging.getLogger('iottbLogger.__main__') +logger.setLevel(logging.DEBUG) + + +###################### +# Argparse setup +###################### +def setup_argparse(): + # create top level parser + root_parser = argparse.ArgumentParser(prog='iottb') + # shared options + root_parser.add_argument('--verbose', '-v', action='count', default=0) + root_parser.add_argument('--script-mode', action='store_true', help='Run in script mode (non-interactive)') + # Group of args w.r.t iottb.db creation + group = root_parser.add_argument_group('database options') + group.add_argument('--db-home', default=Path.home() / 'IoTtb.db') + group.add_argument('--config-home', default=Path.home() / '.config' / 'iottb.conf', type=Path, ) + group.add_argument('--user', default=Path.home().stem, type=Path, ) + + # configure subcommands + subparsers = root_parser.add_subparsers(title='subcommands', required=True, dest='command') + # setup_capture_parser(subparsers) + setup_init_device_root_parser(subparsers) + setup_sniff_parser(subparsers) + # Utility to list interfaces directly with iottb instead of relying on external tooling + + interfaces_parser = subparsers.add_parser('list-interfaces', aliases=['li', 'if'], + help='List available network interfaces.') + interfaces_parser.set_defaults(func=list_interfaces) + + return root_parser + + +### +# Where put ?! +### +class IoTdb: + def __init__(self, db_home=Path.home() / 'IoTtb.db', iottb_config=Path.home() / '.conf' / 'iottb.conf', + user=Path.home().stem): + self.db_home = db_home + self.config_home = iottb_config + self.default_filters_home = db_home / 'default_filters' + self.user = user + + def create_db(self, mode=0o777, parents=False, exist_ok=False): + logger.info(f'Creating db at {self.db_home}') + try: + self.db_home.mkdir(mode=mode, parents=parents, exist_ok=exist_ok) + except FileExistsError: + logger.error(f'Database path already at {self.db_home} exists and is not a directory') + finally: + logger.debug(f'Leaving finally clause in create_db') + + def create_device_tree(self, mode=0o777, parents=False, exist_ok=False): + logger.info(f'Creating device tree at {self.db_home / 'devices'}') + #TODO + + def parse_db_config(self): + pass + + def parse_iottb_config(self): + pass + + def get_known_devices(self): + pass + + +def iottb_db_exists(db_home=Path.home() / 'IoTtb.db'): + res = db_home.is_dir() + + +def main(): + logger.debug(f'Pre setup_argparse()') + parser = setup_argparse() + logger.debug('Post setup_argparse().') + args = parser.parse_args() + logger.debug(f'Args parsed: {args}') + if args.command: + try: + args.func(args) + except KeyboardInterrupt: + print('Received keyboard interrupt. Exiting...') + exit(1) + except Exception as e: + logger.debug(f'Error in main: {e}') + print(f'Error: {e}') + # create_capture_directory(args.device_name) + + +if __name__ == '__main__': + setup_logging() + logger.debug("Debug level is working") + logger.info("Info level is working") + logger.warning("Warning level is working") + + main() diff --git a/code/iottb/definitions.py b/archive/iottb/definitions.py similarity index 100% rename from code/iottb/definitions.py rename to archive/iottb/definitions.py diff --git a/archive/iottb/logger.py b/archive/iottb/logger.py new file mode 100644 index 0000000..cc0cdb5 --- /dev/null +++ b/archive/iottb/logger.py @@ -0,0 +1,35 @@ +import logging +import sys +import os +from logging.handlers import RotatingFileHandler + + +def setup_logging(): + # Ensure the logs directory exists + log_directory = 'logs' + if not os.path.exists(log_directory): + os.makedirs(log_directory) + + # Create handlers + file_handler = RotatingFileHandler(os.path.join(log_directory, 'iottb.log'), maxBytes=1048576, backupCount=5) + console_handler = logging.StreamHandler(sys.stdout) + + # Create formatters and add it to handlers + file_fmt = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + console_fmt = logging.Formatter( + '%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s') + + file_handler.setFormatter(file_fmt) + console_handler.setFormatter(console_fmt) + + # Get the root logger and add handlers + root_logger = logging.getLogger() + root_logger.setLevel(logging.DEBUG) + root_logger.addHandler(file_handler) + root_logger.addHandler(console_handler) + + # Prevent propagation to the root logger to avoid duplicate logs + root_logger.propagate = False + + +setup_logging() diff --git a/code/tests/fixtures/__init__.py b/archive/iottb/models/__init__.py similarity index 100% rename from code/tests/fixtures/__init__.py rename to archive/iottb/models/__init__.py diff --git a/code/iottb/models/capture_metadata_model.py b/archive/iottb/models/capture_metadata_model.py similarity index 100% rename from code/iottb/models/capture_metadata_model.py rename to archive/iottb/models/capture_metadata_model.py diff --git a/code/iottb/models/device_metadata_model.py b/archive/iottb/models/device_metadata_model.py similarity index 100% rename from code/iottb/models/device_metadata_model.py rename to archive/iottb/models/device_metadata_model.py diff --git a/code/tests/models/test_capture_metadata_model.py b/archive/iottb/subcommands/__init__.py similarity index 100% rename from code/tests/models/test_capture_metadata_model.py rename to archive/iottb/subcommands/__init__.py diff --git a/code/iottb/subcommands/add_device.py b/archive/iottb/subcommands/add_device.py similarity index 100% rename from code/iottb/subcommands/add_device.py rename to archive/iottb/subcommands/add_device.py diff --git a/code/iottb/subcommands/capture.py b/archive/iottb/subcommands/capture.py similarity index 100% rename from code/iottb/subcommands/capture.py rename to archive/iottb/subcommands/capture.py diff --git a/code/iottb/subcommands/sniff.py b/archive/iottb/subcommands/sniff.py similarity index 100% rename from code/iottb/subcommands/sniff.py rename to archive/iottb/subcommands/sniff.py diff --git a/code/tests/models/test_device_metadata_model.py b/archive/iottb/utils/__init__.py similarity index 100% rename from code/tests/models/test_device_metadata_model.py rename to archive/iottb/utils/__init__.py diff --git a/archive/iottb/utils/capture_utils.py b/archive/iottb/utils/capture_utils.py new file mode 100644 index 0000000..8c4d60f --- /dev/null +++ b/archive/iottb/utils/capture_utils.py @@ -0,0 +1,44 @@ +import uuid +from pathlib import Path +from iottb.models.device_metadata_model import dir_contains_device_metadata +from iottb.utils.utils import get_iso_date + + +def get_capture_uuid(): + return str(uuid.uuid4()) + + +def get_capture_date_folder(device_root: Path): + today_iso = get_iso_date() + today_folder = device_root / today_iso + if dir_contains_device_metadata(device_root): + if not today_folder.is_dir(): + try: + today_folder.mkdir() + except FileExistsError: + print(f'Folder {today_folder} already exists') + return today_folder + raise FileNotFoundError(f'Given path {device_root} is not a device root directory') + + +def get_capture_src_folder(device_folder: Path): + assert device_folder.is_dir(), f'Given path {device_folder} is not a folder' + today_iso = get_iso_date() + max_sequence_number = 1 + for d in device_folder.iterdir(): + if d.is_dir() and d.name.startswith(f'{today_iso}_capture_'): + name = d.name + num = int(name.split('_')[2]) + max_sequence_number = max(max_sequence_number, num) + + next_sequence_number = max_sequence_number + 1 + return device_folder.joinpath(f'{today_iso}_capture_{next_sequence_number:03}') + + +def make_capture_src_folder(capture_src_folder: Path): + try: + capture_src_folder.mkdir() + except FileExistsError: + print(f'Folder {capture_src_folder} already exists') + finally: + return capture_src_folder diff --git a/archive/iottb/utils/tcpdump_utils.py b/archive/iottb/utils/tcpdump_utils.py new file mode 100644 index 0000000..6870202 --- /dev/null +++ b/archive/iottb/utils/tcpdump_utils.py @@ -0,0 +1,41 @@ +import ipaddress +import shutil +import subprocess +from typing import Optional + + +def check_installed() -> bool: + """Check if tcpdump is installed and available on the system path.""" + return shutil.which('tcpdump') is not None + + +def ensure_installed(): + """Ensure that tcpdump is installed, raise an error if not.""" + if not check_installed(): + raise RuntimeError('tcpdump is not installed. Please install it to continue.') + + +def list_interfaces(args) -> str: + """List available network interfaces using tcpdump.""" + ensure_installed() + try: + result = subprocess.run(['tcpdump', '--list-interfaces'], capture_output=True, text=True, check=True) + print(result.stdout) + except subprocess.CalledProcessError as e: + print(f'Failed to list interfaces: {e}') + return '' + + +def is_valid_ipv4(ip: str) -> bool: + try: + ipaddress.IPv4Address(ip) + return True + except ValueError: + return False + +def str_to_ipv4(ip: str) -> (bool, Optional[ipaddress]): + try: + address = ipaddress.IPv4Address(ip) + return address == ipaddress.IPv4Address(ip), address + except ipaddress.AddressValueError: + return False, None diff --git a/code/iottb/utils/utils.py b/archive/iottb/utils/utils.py similarity index 100% rename from code/iottb/utils/utils.py rename to archive/iottb/utils/utils.py diff --git a/code/misc/dnsmasq.conf b/archive/misc/dnsmasq.conf similarity index 100% rename from code/misc/dnsmasq.conf rename to archive/misc/dnsmasq.conf diff --git a/code/misc/enable-forwarding.sh b/archive/misc/enable-forwarding.sh similarity index 100% rename from code/misc/enable-forwarding.sh rename to archive/misc/enable-forwarding.sh diff --git a/code/misc/hostapd.conf b/archive/misc/hostapd.conf similarity index 100% rename from code/misc/hostapd.conf rename to archive/misc/hostapd.conf diff --git a/code/misc/hostapd.conf.bak b/archive/misc/hostapd.conf.bak similarity index 100% rename from code/misc/hostapd.conf.bak rename to archive/misc/hostapd.conf.bak diff --git a/code/misc/initSwAP b/archive/misc/initSwAP similarity index 100% rename from code/misc/initSwAP rename to archive/misc/initSwAP diff --git a/code/misc/initSwAP_nftables b/archive/misc/initSwAP_nftables similarity index 100% rename from code/misc/initSwAP_nftables rename to archive/misc/initSwAP_nftables diff --git a/code/misc/make_ap.sh b/archive/misc/make_ap.sh similarity index 100% rename from code/misc/make_ap.sh rename to archive/misc/make_ap.sh diff --git a/code/pyproject.toml b/archive/pyproject.toml similarity index 100% rename from code/pyproject.toml rename to archive/pyproject.toml diff --git a/code/tests/test_main.py b/archive/tests/__init__.py similarity index 100% rename from code/tests/test_main.py rename to archive/tests/__init__.py diff --git a/code/tests/utils/test_device_metadata_utils.py b/archive/tests/fixtures/__init__.py similarity index 100% rename from code/tests/utils/test_device_metadata_utils.py rename to archive/tests/fixtures/__init__.py diff --git a/code/tests/fixtures/shared_fixtures.py b/archive/tests/fixtures/shared_fixtures.py similarity index 100% rename from code/tests/fixtures/shared_fixtures.py rename to archive/tests/fixtures/shared_fixtures.py diff --git a/code/tests/utils/test_tcpdump_utils.py b/archive/tests/models/test_capture_metadata_model.py similarity index 100% rename from code/tests/utils/test_tcpdump_utils.py rename to archive/tests/models/test_capture_metadata_model.py diff --git a/archive/tests/models/test_device_metadata_model.py b/archive/tests/models/test_device_metadata_model.py new file mode 100644 index 0000000..e69de29 diff --git a/code/tests/subcommands/test_add_device.py b/archive/tests/subcommands/test_add_device.py similarity index 100% rename from code/tests/subcommands/test_add_device.py rename to archive/tests/subcommands/test_add_device.py diff --git a/code/tests/test_capture_metadata_model.py b/archive/tests/test_capture_metadata_model.py similarity index 100% rename from code/tests/test_capture_metadata_model.py rename to archive/tests/test_capture_metadata_model.py diff --git a/archive/tests/test_main.py b/archive/tests/test_main.py new file mode 100644 index 0000000..e69de29 diff --git a/code/tests/utils/test_capture_metadata_utils.py b/archive/tests/utils/test_capture_metadata_utils.py similarity index 100% rename from code/tests/utils/test_capture_metadata_utils.py rename to archive/tests/utils/test_capture_metadata_utils.py diff --git a/archive/tests/utils/test_device_metadata_utils.py b/archive/tests/utils/test_device_metadata_utils.py new file mode 100644 index 0000000..e69de29 diff --git a/archive/tests/utils/test_tcpdump_utils.py b/archive/tests/utils/test_tcpdump_utils.py new file mode 100644 index 0000000..e69de29 diff --git a/code/iottb/.gitignore b/code/iottb/.gitignore new file mode 100644 index 0000000..852cc91 --- /dev/null +++ b/code/iottb/.gitignore @@ -0,0 +1,3 @@ +__pycache__ +.venv +iottb.egg-info \ No newline at end of file diff --git a/code/iottb/__main__.py b/code/iottb/__main__.py index 9e6d869..fb99a10 100644 --- a/code/iottb/__main__.py +++ b/code/iottb/__main__.py @@ -1,108 +1,30 @@ -#!/usr/bin/env python3 import argparse -from os import environ -from pathlib import Path import logging -from iottb.subcommands.add_device import setup_init_device_root_parser -# from iottb.subcommands.capture import setup_capture_parser -from iottb.subcommands.sniff import setup_sniff_parser -from iottb.utils.tcpdump_utils import list_interfaces -from iottb.definitions import IOTTB_HOME_ABS, ReturnCodes -from iottb.logger import setup_logging +from pathlib import Path -logger = logging.getLogger('iottbLogger.__main__') -logger.setLevel(logging.DEBUG) +from .commands.sniff import setup_sniff_parser +from .config import Config +from .utils.file_utils import ensure_directory_exists -###################### -# Argparse setup -###################### -def setup_argparse(): - # create top level parser - root_parser = argparse.ArgumentParser(prog='iottb') - # shared options - root_parser.add_argument('--verbose', '-v', action='count', default=0) - - # Group of args w.r.t iottb.db creation - group = root_parser.add_argument_group('database options') - group.add_argument('--db-home', default=Path.home() / 'IoTtb.db') - group.add_argument('--config-home', default=Path.home() / '.config' / 'iottb.conf', type=Path, ) - group.add_argument('--user', default=Path.home().stem, type=Path, ) - - # configure subcommands - subparsers = root_parser.add_subparsers(title='subcommands', required=True, dest='command') - # setup_capture_parser(subparsers) - setup_init_device_root_parser(subparsers) - setup_sniff_parser(subparsers) - # Utility to list interfaces directly with iottb instead of relying on external tooling - - interfaces_parser = subparsers.add_parser('list-interfaces', aliases=['li', 'if'], - help='List available network interfaces.') - interfaces_parser.set_defaults(func=list_interfaces) - - return root_parser - - -### -# Where put ?! -### -class IoTdb: - def __init__(self, db_home=Path.home() / 'IoTtb.db', iottb_config=Path.home() / '.conf' / 'iottb.conf', - user=Path.home().stem): - self.db_home = db_home - self.config_home = iottb_config - self.default_filters_home = db_home / 'default_filters' - self.user = user - - def create_db(self, mode=0o777, parents=False, exist_ok=False): - logger.info(f'Creating db at {self.db_home}') - try: - self.db_home.mkdir(mode=mode, parents=parents, exist_ok=exist_ok) - except FileExistsError: - logger.error(f'Database path already at {self.db_home} exists and is not a directory') - finally: - logger.debug(f'Leaving finally clause in create_db') - - def create_device_tree(self, mode=0o777, parents=False, exist_ok=False): - logger.info(f'Creating device tree at {self.db_home / 'devices'}') - #TODO - - def parse_db_config(self): - pass - - def parse_iottb_config(self): - pass - - def get_known_devices(self): - pass - - -def iottb_db_exists(db_home=Path.home() / 'IoTtb.db'): - res = db_home.is_dir() +def setup_logging(): + logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(name)s: %(message)s') def main(): - logger.debug(f'Pre setup_argparse()') - parser = setup_argparse() - logger.debug('Post setup_argparse().') - args = parser.parse_args() - logger.debug(f'Args parsed: {args}') - if args.command: - try: - args.func(args) - except KeyboardInterrupt: - print('Received keyboard interrupt. Exiting...') - exit(1) - except Exception as e: - logger.debug(f'Error in main: {e}') - print(f'Error: {e}') - # create_capture_directory(args.device_name) - - -if __name__ == '__main__': setup_logging() - logger.debug("Debug level is working") - logger.info("Info level is working") - logger.warning("Warning level is working") + parser = argparse.ArgumentParser(description='IoT Testbed') + subparsers = parser.add_subparsers() + + setup_sniff_parser(subparsers) + + args = parser.parse_args() + if hasattr(args, 'func'): + args.func(args) + else: + parser.print_help() + + +if __name__ == "__main__": main() diff --git a/code/iottb/commands/__init__.py b/code/iottb/commands/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/code/iottb/commands/sniff.py b/code/iottb/commands/sniff.py new file mode 100644 index 0000000..f8e97ad --- /dev/null +++ b/code/iottb/commands/sniff.py @@ -0,0 +1,108 @@ +import subprocess +import re +from datetime import datetime +from pathlib import Path +import logging +from models.capture_metadata import CaptureMetadata +from models.device_metadata import DeviceMetadata +from utils.capture_utils import get_capture_src_folder, make_capture_src_folder +from utils.tcpdump_utils import check_installed +from utils.file_utils import ensure_directory_exists +from config import Config + +logger = logging.getLogger('iottb.sniff') + + +def is_ip_address(address): + ip_pattern = re.compile(r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$") + return ip_pattern.match(address) is not None + + +def is_mac_address(address): + mac_pattern = re.compile(r"^([0-9A-Fa-f]{2}:){5}[0-9A-Fa-f]{2}$") + return mac_pattern.match(address) is not None + + +class Sniffer: + def __init__(self, device_id, capture_interface, address=None, safe=True): + #TODO Decide if we want to use the device_id as the device_name (seems unhandy) + self.device_id = device_id + self.capture_interface = capture_interface + self.address = address + self.safe = safe + self.config = Config().load_config() + self.device_path = self.initialize_device(device_id) + self.filter = self.generate_filter() + + def initialize_device(self, device_id): + db_path = Path(self.config.get('database_path', '~/iottb.db')).expanduser() + device_path = db_path / device_id + ensure_directory_exists(device_path) + + metadata_file = device_path / 'device_metadata.json' + if not metadata_file.exists(): + device_metadata = DeviceMetadata(device_name=device_id, device_root_path=device_path) + device_metadata.save_to_file() + return device_path + + def get_capture_metadata(self, capture_dir): + metadata = CaptureMetadata(device_id=self.device_id, capture_dir=capture_dir) + metadata.build_capture_file_name() + metadata.interface = self.capture_interface + metadata.device_ip_address = self.address or "No IP Address set" + return metadata + + def generate_filter(self): + + if not self.address and self.safe: + raise ValueError("Address must be provided in safe mode.") + + if is_ip_address(self.address): + return f"host {self.address}" + elif is_mac_address(self.address): + return f"ether host {self.address}" + else: + raise ValueError("Invalid address format.") + + def capture(self): + if not check_installed(): + print('Please install tcpdump first') + return + + capture_dir = make_capture_src_folder(get_capture_src_folder(self.device_path)) + metadata = self.get_capture_metadata(capture_dir) + pcap_file = capture_dir / metadata.capture_file + cmd = ['sudo', 'tcpdump', '-i', self.capture_interface, '-w', str(pcap_file)] + + if self.filter: + cmd.append(self.filter) + + metadata.tcpdump_command = ' '.join(cmd) + print(f'Executing: {metadata.tcpdump_command}') + + try: + metadata.start_time = datetime.now().isoformat() + subprocess.run(cmd, check=True) + metadata.stop_time = datetime.now().isoformat() + except subprocess.CalledProcessError as e: + logger.error(f'Failed to capture packets: {e}') + return + + metadata.save_to_file() + print(f"Capture complete. Metadata saved to {capture_dir / 'metadata.json'}") + + +def setup_sniff_parser(subparsers): + parser = subparsers.add_parser('sniff', help='Sniff packets with tcpdump') + parser.add_argument('device_id', help='ID of the device to sniff') + parser.add_argument('-i', '--interface', required=True, help='Network interface to capture on') + parser.add_argument('-a', '--address', help='IP or MAC address to filter packets by') + parser.add_argument('-u', '--unsafe', action='store_true', help='Run in unsafe mode without supplying an address. ' + 'Highly discouraged.') + parser.set_defaults(func=handle_sniff) + + +def handle_sniff(args): + sniffer = Sniffer(device_id=args.device_id, capture_interface=args.interface, address=args.address, + safe=not args.unsafe) + sniffer.capture() diff --git a/code/iottb/config.json b/code/iottb/config.json new file mode 100644 index 0000000..ec311ec --- /dev/null +++ b/code/iottb/config.json @@ -0,0 +1 @@ +{"database_path": "~/.iottb.db", "log_level": "INFO"} \ No newline at end of file diff --git a/code/iottb/config.py b/code/iottb/config.py new file mode 100644 index 0000000..6c0e532 --- /dev/null +++ b/code/iottb/config.py @@ -0,0 +1,45 @@ +import json +from pathlib import Path +import logging + +logger = logging.getLogger('iottb.config') + + +class Config: + DEFAULT_CONFIG = { + "database_path": "~/.iottb.db", + "log_level": "INFO" + } + + def __init__(self, config_file=None): + self.config_file = Path(config_file or "config.json") + if not self.config_file.exists(): + self.create_default_config() + else: + self.config = self.load_config() + + def create_default_config(self): + try: + self.save_config(self.DEFAULT_CONFIG) + except (IsADirectoryError, PermissionError) as e: + logger.error(f"Error creating default config: {e}") + raise + + def load_config(self): + try: + with open(self.config_file, "r") as file: + return json.load(file) + except IsADirectoryError as e: + logger.error(f"Error loading config: {e}") + raise + except PermissionError as e: + logger.error(f"Error loading config: {e}") + raise + + def save_config(self, config): + try: + with open(self.config_file, "w") as f: + json.dump(config, f, indent=2) + except (IsADirectoryError, PermissionError) as e: + logger.error(f"Error saving config: {e}") + raise diff --git a/code/iottb/logger.py b/code/iottb/logger.py index cc0cdb5..e69de29 100644 --- a/code/iottb/logger.py +++ b/code/iottb/logger.py @@ -1,35 +0,0 @@ -import logging -import sys -import os -from logging.handlers import RotatingFileHandler - - -def setup_logging(): - # Ensure the logs directory exists - log_directory = 'logs' - if not os.path.exists(log_directory): - os.makedirs(log_directory) - - # Create handlers - file_handler = RotatingFileHandler(os.path.join(log_directory, 'iottb.log'), maxBytes=1048576, backupCount=5) - console_handler = logging.StreamHandler(sys.stdout) - - # Create formatters and add it to handlers - file_fmt = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') - console_fmt = logging.Formatter( - '%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s') - - file_handler.setFormatter(file_fmt) - console_handler.setFormatter(console_fmt) - - # Get the root logger and add handlers - root_logger = logging.getLogger() - root_logger.setLevel(logging.DEBUG) - root_logger.addHandler(file_handler) - root_logger.addHandler(console_handler) - - # Prevent propagation to the root logger to avoid duplicate logs - root_logger.propagate = False - - -setup_logging() diff --git a/code/iottb/models/capture_metadata.py b/code/iottb/models/capture_metadata.py new file mode 100644 index 0000000..4c7e1c0 --- /dev/null +++ b/code/iottb/models/capture_metadata.py @@ -0,0 +1,29 @@ +import json +import uuid +from datetime import datetime +from pathlib import Path + + +class CaptureMetadata: + def __init__(self, device_id, capture_dir): + self.device_id = device_id + self.capture_id = str(uuid.uuid4()) + self.capture_date = datetime.now().isoformat() + self.capture_dir = Path(capture_dir) + self.capture_file = "" + self.start_time = "" + self.stop_time = "" + self.tcpdump_command = "" + self.interface = "" + self.device_ip_address = "" + + def build_capture_file_name(self): + self.capture_file = f"{self.device_id}_{self.capture_id}.pcap" + + def to_dict(self): + return self.__dict__ + + def save_to_file(self, file_path=None): + file_path = file_path or self.capture_dir / 'metadata.json' + with open(file_path, 'w') as f: + json.dump(self.to_dict(), f, indent=4) diff --git a/code/iottb/models/device_metadata.py b/code/iottb/models/device_metadata.py new file mode 100644 index 0000000..5c5cb91 --- /dev/null +++ b/code/iottb/models/device_metadata.py @@ -0,0 +1,27 @@ +import json +import uuid +from datetime import datetime +from pathlib import Path + + +class DeviceMetadata: + def __init__(self, device_name, device_root_path): + self.device_name = device_name + self.device_short_name = device_name.lower().replace(' ', '_') + self.device_id = str(uuid.uuid4()) + self.date_created = datetime.now().isoformat() + self.device_root_path = Path(device_root_path) + + def to_dict(self): + return self.__dict__ + + def save_to_file(self): + file_path = self.device_root_path / 'device_metadata.json' + with open(file_path, 'w') as f: + json.dump(self.to_dict(), f, indent=4) + + @classmethod + def load_from_file(cls, file_path): + with open(file_path, 'r') as f: + data = json.load(f) + return cls(**data) diff --git a/code/iottb/pyproject.toml b/code/iottb/pyproject.toml new file mode 100644 index 0000000..580c3e8 --- /dev/null +++ b/code/iottb/pyproject.toml @@ -0,0 +1,18 @@ +[build-system] +requires = ["setuptools>=42", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "iottb" +version = "0.1.0" +authors = [{name = "Sebastian Lenzlinger", email = "sebastian.lenzlinger@unibas.ch"}] +description = "Automation Tool for Capturing Network packets of IoT devices." +requires-python = ">=3.8" +dependencies = [] + +[tool.setuptools.packages.find] +where = ["."] +exclude = ["tests*", "docs*"] + +[project.scripts] +iottb = "iottb.__main__:main" diff --git a/code/iottb/templates/capture_metadata_template.json b/code/iottb/templates/capture_metadata_template.json new file mode 100644 index 0000000..ec49e81 --- /dev/null +++ b/code/iottb/templates/capture_metadata_template.json @@ -0,0 +1,15 @@ +{ + "device_id": "", + "capture_id": "", + "capture_date": "", + "capture_file": "", + "start_time": "", + "stop_time": "", + "capture_duration": "", + "interfaces": "", + "device_ip_address": "", + "device_mac_address": "", + "contacted_ip_address": [], + "device_firmware_version": "", + "campanion_app": "" +} \ No newline at end of file diff --git a/code/iottb/templates/config_template.json b/code/iottb/templates/config_template.json new file mode 100644 index 0000000..89338dc --- /dev/null +++ b/code/iottb/templates/config_template.json @@ -0,0 +1,4 @@ +{ + "database_path": "~/.iottb.db", + "log_level": "INFO" +} \ No newline at end of file diff --git a/code/iottb/templates/device_metadata_template.json b/code/iottb/templates/device_metadata_template.json new file mode 100644 index 0000000..79a17f1 --- /dev/null +++ b/code/iottb/templates/device_metadata_template.json @@ -0,0 +1,14 @@ +{ + "device_id": "", + "device_name": "", + "device_short_name": "", + "date_created": "", + "description": "", + "model": "", + "manufacturer": "", + "firmware_version": "", + "device_type": "", + "supported_interfaces": "", + "companion_applications": "", + "last_metadata_update": "" +} \ No newline at end of file diff --git a/code/iottb/test/test_Config.py b/code/iottb/test/test_Config.py new file mode 100644 index 0000000..cb21267 --- /dev/null +++ b/code/iottb/test/test_Config.py @@ -0,0 +1,43 @@ +import json +from pathlib import Path +from unittest import mock + +from config import Config + +import unittest + + +class TestConfig(unittest.TestCase): + + def test_creates_new_config_file_if_not_exists(self): + config_path = Path("test_config.json") + if config_path.exists(): + config_path.unlink() + config = Config(config_file=config_path) + self.assertTrue(config_path.exists()) + config_path.unlink() + + def test_writes_default_configuration_to_config_file(self): + config_path = Path("test_config.json") + if config_path.exists(): + config_path.unlink() + config = Config(config_file=config_path) + with open(config_path, "r") as f: + data = json.load(f) + self.assertEqual(data, {"database_path": "~/.iottb.db", "log_level": "INFO"}) + config_path.unlink() + + @unittest.mock.patch("builtins.open", side_effect=PermissionError) + def test_config_file_path_not_writable(self, mock_open): + config_path = Path("test_config.json") + with self.assertRaises(PermissionError): + config = Config(config_file=config_path) + config.create_default_config() + + def test_config_file_path_is_directory(self): + config_dir = Path("test_config_dir") + config_dir.mkdir(exist_ok=True) + with self.assertRaises(IsADirectoryError): + config = Config(config_file=config_dir) + config.create_default_config() + config_dir.rmdir() diff --git a/code/iottb/test/test_ensure_directory_exists.py b/code/iottb/test/test_ensure_directory_exists.py new file mode 100644 index 0000000..ac5b1fd --- /dev/null +++ b/code/iottb/test/test_ensure_directory_exists.py @@ -0,0 +1,38 @@ +from pathlib import Path + +# Generated by CodiumAI +import unittest + +from utils.file_utils import ensure_directory_exists + + +class TestEnsureDirectoryExists(unittest.TestCase): + + # creates directory if it does not exist + def test_creates_directory_if_not_exists(self): + path = Path('/tmp/testdir') + if path.exists(): + path.rmdir() + ensure_directory_exists(path) + self.assertTrue(path.exists()) + path.rmdir() + + # does not create directory if it already exists + def test_does_not_create_directory_if_exists(self): + path = Path('/tmp/testdir') + path.mkdir(exist_ok=True) + ensure_directory_exists(path) + self.assertTrue(path.exists()) + path.rmdir() + + # path is a symbolic link + def test_path_is_a_symbolic_link(self): + target_dir = Path('/tmp/targetdir') + symlink_path = Path('/tmp/symlinkdir') + target_dir.mkdir(exist_ok=True) + symlink_path.symlink_to(target_dir) + ensure_directory_exists(symlink_path) + self.assertTrue(symlink_path.exists()) + self.assertTrue(symlink_path.is_symlink()) + symlink_path.unlink() + target_dir.rmdir() diff --git a/code/iottb/test/test_is_ip_address.py b/code/iottb/test/test_is_ip_address.py new file mode 100644 index 0000000..a13c679 --- /dev/null +++ b/code/iottb/test/test_is_ip_address.py @@ -0,0 +1,62 @@ +from commands.sniff import is_ip_address + +import unittest + + +class TestIsIpAddress(unittest.TestCase): + + def test_valid_ipv4_address_all_octets_in_range(self): + self.assertTrue(is_ip_address("192.168.1.1")) + self.assertTrue(is_ip_address("0.0.0.0")) + self.assertTrue(is_ip_address("255.255.255.255")) + + def test_ipv4_address_with_leading_zeros(self): + self.assertTrue(is_ip_address("192.168.001.001")) + self.assertTrue(is_ip_address("0.0.0.0")) + self.assertTrue(is_ip_address("255.255.255.255")) + + def test_ipv4_address_mixed_single_double_digit_octets(self): + self.assertTrue(is_ip_address("192.168.1.01")) + self.assertTrue(is_ip_address("0.0.0.0")) + self.assertTrue(is_ip_address("255.255.255.255")) + + def test_ipv4_address_maximum_values_in_octets(self): + self.assertTrue(is_ip_address("255.255.255.255")) + self.assertTrue(is_ip_address("0.0.0.0")) + self.assertTrue(is_ip_address("192.168.1.1")) + + + def test_ipv4_address_minimum_values_in_octets(self): + self.assertTrue(is_ip_address("0.0.0.0")) + self.assertTrue(is_ip_address("192.168.1.1")) + self.assertTrue(is_ip_address("255.255.255.255")) + + + def test_ipv4_address_more_than_four_octets_invalid(self): + self.assertFalse(is_ip_address("192.168.1.1.1")) + self.assertFalse(is_ip_address("0.0.0.0.0")) + self.assertFalse(is_ip_address("255.255.255.255.255")) + + + def test_ipv4_address_fewer_than_four_octets_invalid(self): + self.assertFalse(is_ip_address("192.168.1")) + self.assertFalse(is_ip_address("0.0")) + self.assertFalse(is_ip_address("255")) + + + def test_ipv4_address_non_numeric_characters_invalid(self): + self.assertFalse(is_ip_address("192.a.b.c")) + self.assertFalse(is_ip_address("0.x.y.z")) + self.assertFalse(is_ip_address("255.q.w.e")) + + + def test_ipv4_address_octets_out_of_range_invalid(self): + self.assertFalse(is_ip_address("256.256.256.256")) + self.assertFalse(is_ip_address("300.300.300.300")) + self.assertFalse(is_ip_address("999.999.999.999")) + + + def test_ipv4_address_empty_string_invalid(self): + self.assertFalse(is_ip_address("")) + self.assertFalse(is_ip_address(" ")) + self.assertFalse(is_ip_address(None)) diff --git a/code/iottb/test/test_is_mac_address.py b/code/iottb/test/test_is_mac_address.py new file mode 100644 index 0000000..09e11a2 --- /dev/null +++ b/code/iottb/test/test_is_mac_address.py @@ -0,0 +1,64 @@ + +from commands.sniff import is_mac_address + +import unittest + +class TestIsMacAddress(unittest.TestCase): + + + def test_valid_mac_address_lowercase(self): + self.assertTrue(is_mac_address("aa:bb:cc:dd:ee:ff")) + self.assertFalse(is_mac_address("192.168.1.1")) + self.assertFalse(is_mac_address("aa:bb:cc:dd:ee:ff:gg")) + + + def test_valid_mac_address_uppercase(self): + self.assertTrue(is_mac_address("AA:BB:CC:DD:EE:FF")) + self.assertFalse(is_mac_address("10.0.0.1")) + self.assertFalse(is_mac_address("AA:BB:CC:DD:EE")) + + + def test_valid_mac_address_mixed_case(self): + self.assertTrue(is_mac_address("Aa:Bb:Cc:Dd:Ee:Ff")) + self.assertFalse(is_mac_address("172.16.0.1")) + self.assertFalse(is_mac_address("Aa:Bb:Cc:Dd:Ee:Ff:Gg")) + + + def test_valid_mac_address_digits(self): + self.assertTrue(is_mac_address("00:11:22:33:44:55")) + self.assertFalse(is_mac_address("8.8.8.8")) + self.assertFalse(is_mac_address("00:11:22:33:44")) + + # returns False for an empty string + def test_empty_string(self): + self.assertFalse(is_mac_address("")) + self.assertFalse(is_mac_address(":")) + + def test_invalid_characters(self): + self.assertFalse(is_mac_address("gh:ij:kl:mn:op:qr")) + self.assertFalse(is_mac_address("192.168.0.256")) + self.assertFalse(is_mac_address("ghij::klmn::opqr")) + + # returns False for a MAC address with incorrect length + def test_incorrect_length(self): + self.assertFalse(is_mac_address("aa:bb:cc")) + self.assertFalse(is_mac_address("10.0.0.256")) + self.assertFalse(is_mac_address("aa::bb::cc::dd::ee::ff::gg")) + + # returns False for a MAC address with missing colons + def test_missing_colons(self): + self.assertFalse(is_mac_address("aabbccddeeff")) + self.assertFalse(is_mac_address("127.0.0.1")) + self.assertFalse(is_mac_address("aabbccddeeffgg")) + + # returns False for a MAC address with extra colons + def test_extra_colons(self): + self.assertFalse(is_mac_address("aa::bb::cc::dd::ee::ff")) + self.assertFalse(is_mac_address("192.168.1.256")) + self.assertFalse(is_mac_address("aa::bb::cc::dd::ee::ff::gg")) + + # returns False for a MAC address with spaces + def test_spaces_in_mac(self): + self.assertFalse(is_mac_address("aa bb cc dd ee ff")) + self.assertFalse(is_mac_address("8.8.4.4")) + self.assertFalse(is_mac_address("aa bb cc dd ee ff gg")) diff --git a/code/iottb/utils/capture_utils.py b/code/iottb/utils/capture_utils.py index 8c4d60f..d6d73b3 100644 --- a/code/iottb/utils/capture_utils.py +++ b/code/iottb/utils/capture_utils.py @@ -1,44 +1,20 @@ -import uuid from pathlib import Path -from iottb.models.device_metadata_model import dir_contains_device_metadata -from iottb.utils.utils import get_iso_date +from datetime import datetime -def get_capture_uuid(): - return str(uuid.uuid4()) +def get_capture_src_folder(device_path): + today_str = datetime.now().strftime('%Y-%m-%d') + capture_base_path = device_path / today_str + capture_base_path.mkdir(parents=True, exist_ok=True) + + existing_captures = [d for d in capture_base_path.iterdir() if d.is_dir()] + nth_capture = len(existing_captures) + 1 + capture_dir = capture_base_path / f'capture_{nth_capture}' + capture_dir.mkdir(parents=True, exist_ok=True) + + return capture_dir -def get_capture_date_folder(device_root: Path): - today_iso = get_iso_date() - today_folder = device_root / today_iso - if dir_contains_device_metadata(device_root): - if not today_folder.is_dir(): - try: - today_folder.mkdir() - except FileExistsError: - print(f'Folder {today_folder} already exists') - return today_folder - raise FileNotFoundError(f'Given path {device_root} is not a device root directory') - - -def get_capture_src_folder(device_folder: Path): - assert device_folder.is_dir(), f'Given path {device_folder} is not a folder' - today_iso = get_iso_date() - max_sequence_number = 1 - for d in device_folder.iterdir(): - if d.is_dir() and d.name.startswith(f'{today_iso}_capture_'): - name = d.name - num = int(name.split('_')[2]) - max_sequence_number = max(max_sequence_number, num) - - next_sequence_number = max_sequence_number + 1 - return device_folder.joinpath(f'{today_iso}_capture_{next_sequence_number:03}') - - -def make_capture_src_folder(capture_src_folder: Path): - try: - capture_src_folder.mkdir() - except FileExistsError: - print(f'Folder {capture_src_folder} already exists') - finally: - return capture_src_folder +def make_capture_src_folder(capture_src_folder): + capture_src_folder.mkdir(parents=True, exist_ok=True) + return capture_src_folder diff --git a/code/iottb/utils/file_utils.py b/code/iottb/utils/file_utils.py new file mode 100644 index 0000000..5ccb42f --- /dev/null +++ b/code/iottb/utils/file_utils.py @@ -0,0 +1,19 @@ +import json +from pathlib import Path + + +def load_json_template(template_path): + with open(template_path, 'r') as f: + return json.load(f) + + +def save_json(data, file_path): + with open(file_path, 'w') as f: + json.dump(data, f, indent=4) + + +def ensure_directory_exists(path): + path = Path(path) + if not path.exists(): + path.mkdir(parents=True, exist_ok=True) + return path diff --git a/code/iottb/utils/tcpdump_utils.py b/code/iottb/utils/tcpdump_utils.py index 6870202..53b77e8 100644 --- a/code/iottb/utils/tcpdump_utils.py +++ b/code/iottb/utils/tcpdump_utils.py @@ -1,41 +1,9 @@ -import ipaddress -import shutil import subprocess -from typing import Optional -def check_installed() -> bool: - """Check if tcpdump is installed and available on the system path.""" - return shutil.which('tcpdump') is not None - - -def ensure_installed(): - """Ensure that tcpdump is installed, raise an error if not.""" - if not check_installed(): - raise RuntimeError('tcpdump is not installed. Please install it to continue.') - - -def list_interfaces(args) -> str: - """List available network interfaces using tcpdump.""" - ensure_installed() +def check_installed(): try: - result = subprocess.run(['tcpdump', '--list-interfaces'], capture_output=True, text=True, check=True) - print(result.stdout) - except subprocess.CalledProcessError as e: - print(f'Failed to list interfaces: {e}') - return '' - - -def is_valid_ipv4(ip: str) -> bool: - try: - ipaddress.IPv4Address(ip) + subprocess.run(['tcpdump', '--version'], check=True, capture_output=True) return True - except ValueError: + except subprocess.CalledProcessError: return False - -def str_to_ipv4(ip: str) -> (bool, Optional[ipaddress]): - try: - address = ipaddress.IPv4Address(ip) - return address == ipaddress.IPv4Address(ip), address - except ipaddress.AddressValueError: - return False, None From d9d3f66fc8ad9877512402e0f94d608436cc8b72 Mon Sep 17 00:00:00 2001 From: Sebastian Lenzlinger Date: Sun, 30 Jun 2024 00:02:59 +0200 Subject: [PATCH 10/10] Hopefully successfully integrate proper repo. --- .idea/webResources.xml | 14 - code/iottb-project/.gitignore | 36 ++ code/iottb-project/README.md | 9 + code/iottb-project/iottb/__init__.py | 11 + code/iottb-project/iottb/commands/__init__.py | 0 .../iottb/commands/add_device.py | 89 +++++ .../iottb-project/iottb/commands/developer.py | 130 +++++++ code/iottb-project/iottb/commands/sniff.py | 327 ++++++++++++++++++ code/iottb-project/iottb/commands/testbed.py | 120 +++++++ code/iottb-project/iottb/definitions.py | 48 +++ code/iottb-project/iottb/main.py | 77 +++++ code/iottb-project/iottb/models/__init__.py | 0 code/iottb-project/iottb/models/database.py | 6 + .../iottb/models/device_metadata.py | 44 +++ .../iottb/models/iottb_config.py | 124 +++++++ .../iottb/models/sniff_metadata.py | 39 +++ .../iottb/scripts/generate_help.py | 52 +++ code/iottb-project/iottb/scripts/sudo_iottb | 4 + code/iottb-project/iottb/utils/__init__.py | 0 .../iottb/utils/logger_config.py | 41 +++ .../iottb/utils/string_processing.py | 40 +++ .../iottb/utils/user_interaction.py | 42 +++ code/iottb-project/poetry.lock | 103 ++++++ code/iottb-project/pyproject.toml | 21 ++ code/iottb-project/tests/__init__.py | 0 .../tests/test_make_canonical_name.py | 23 ++ code/iottb/scripts/wifi_ctl.sh | 55 +++ code/iottb/utils/diagram1.py | 29 ++ code/iottb/utils/diagramm2.py | 27 ++ notes/journal/Untitled.md | 0 30 files changed, 1497 insertions(+), 14 deletions(-) delete mode 100644 .idea/webResources.xml create mode 100644 code/iottb-project/.gitignore create mode 100644 code/iottb-project/README.md create mode 100644 code/iottb-project/iottb/__init__.py create mode 100644 code/iottb-project/iottb/commands/__init__.py create mode 100644 code/iottb-project/iottb/commands/add_device.py create mode 100644 code/iottb-project/iottb/commands/developer.py create mode 100644 code/iottb-project/iottb/commands/sniff.py create mode 100644 code/iottb-project/iottb/commands/testbed.py create mode 100644 code/iottb-project/iottb/definitions.py create mode 100644 code/iottb-project/iottb/main.py create mode 100644 code/iottb-project/iottb/models/__init__.py create mode 100644 code/iottb-project/iottb/models/database.py create mode 100644 code/iottb-project/iottb/models/device_metadata.py create mode 100644 code/iottb-project/iottb/models/iottb_config.py create mode 100644 code/iottb-project/iottb/models/sniff_metadata.py create mode 100755 code/iottb-project/iottb/scripts/generate_help.py create mode 100644 code/iottb-project/iottb/scripts/sudo_iottb create mode 100644 code/iottb-project/iottb/utils/__init__.py create mode 100644 code/iottb-project/iottb/utils/logger_config.py create mode 100644 code/iottb-project/iottb/utils/string_processing.py create mode 100644 code/iottb-project/iottb/utils/user_interaction.py create mode 100644 code/iottb-project/poetry.lock create mode 100644 code/iottb-project/pyproject.toml create mode 100644 code/iottb-project/tests/__init__.py create mode 100644 code/iottb-project/tests/test_make_canonical_name.py create mode 100644 code/iottb/scripts/wifi_ctl.sh create mode 100644 code/iottb/utils/diagram1.py create mode 100644 code/iottb/utils/diagramm2.py create mode 100644 notes/journal/Untitled.md diff --git a/.idea/webResources.xml b/.idea/webResources.xml deleted file mode 100644 index aa647dc..0000000 --- a/.idea/webResources.xml +++ /dev/null @@ -1,14 +0,0 @@ - - - - - - - - - - - - - - \ No newline at end of file diff --git a/code/iottb-project/.gitignore b/code/iottb-project/.gitignore new file mode 100644 index 0000000..a457143 --- /dev/null +++ b/code/iottb-project/.gitignore @@ -0,0 +1,36 @@ +__pycache__ +.venv +iottb.egg-info +.idea +*.log +logs/ +*.pyc +.obsidian + +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/**/usage.statistics.xml +.idea/**/dictionaries +.idea/**/shelf + +# AWS User-specific +.idea/**/aws.xml + +# Generated files +.idea/**/contentModel.xml + +# Sensitive or high-churn files +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml +.idea/**/dbnavigator.xml + +.private/ +*.pcap \ No newline at end of file diff --git a/code/iottb-project/README.md b/code/iottb-project/README.md new file mode 100644 index 0000000..d7a21e2 --- /dev/null +++ b/code/iottb-project/README.md @@ -0,0 +1,9 @@ +# Iottb +## Basic Invocation + +## Configuration +### Env Vars +- IOTTB_CONF_HOME + +By setting this variable you control where the basic iottb application +configuration should be looked for \ No newline at end of file diff --git a/code/iottb-project/iottb/__init__.py b/code/iottb-project/iottb/__init__.py new file mode 100644 index 0000000..1438731 --- /dev/null +++ b/code/iottb-project/iottb/__init__.py @@ -0,0 +1,11 @@ +from iottb import definitions +import logging +from iottb.utils.user_interaction import tb_echo +import click + +click.echo = tb_echo # This is very hacky +logging.basicConfig(level=definitions.LOGLEVEL) +log_dir = definitions.LOGDIR +# Ensure logs dir exists before new handlers are registered in main.py +if not log_dir.is_dir(): + log_dir.mkdir() diff --git a/code/iottb-project/iottb/commands/__init__.py b/code/iottb-project/iottb/commands/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/code/iottb-project/iottb/commands/add_device.py b/code/iottb-project/iottb/commands/add_device.py new file mode 100644 index 0000000..5524ea7 --- /dev/null +++ b/code/iottb-project/iottb/commands/add_device.py @@ -0,0 +1,89 @@ +import json + +import click +from pathlib import Path +import logging +import re + +from iottb import definitions +from iottb.models.device_metadata import DeviceMetadata +from iottb.models.iottb_config import IottbConfig +from iottb.definitions import CFG_FILE_PATH, TB_ECHO_STYLES + +logger = logging.getLogger(__name__) + + +def add_device_guided(ctx, cn, db): + click.echo('TODO: Implement') + logger.info('Adding device interactively') + #logger.debug(f'Parameters: {params}. value: {value}') + + +@click.command('add-device', help='Add a device to a database') +@click.option('--dev', '--device-name', type=str, required=True, + help='The name of the device to be added. If this string contains spaces or other special characters \ + normalization is performed to derive a canonical name') +@click.option('--db', '--database', type=click.Path(exists=True, file_okay=False, dir_okay=True), + envvar='IOTTB_DB', show_envvar=True, + help='Database in which to add this device. If not specified use default from config.') +@click.option('--guided', is_flag=True, default=False, show_default=True, envvar='IOTTB_GUIDED_ADD', show_envvar=True, + help='Add device interactively') +def add_device(dev, db, guided): + """Add a new device to a database + + Device name must be supplied unless in an interactive setup. Database is taken from config by default. + """ + logger.info('add-device invoked') + + # Step 1: Load Config + # Dependency: Config file must exist + config = IottbConfig(Path(CFG_FILE_PATH)) + logger.debug(f'Config loaded: {config}') + + # Step 2: Load database + # dependency: Database folder must exist + if db: + database = db + path = config.db_path_dict[database] + logger.debug(f'Resolved (path, db) {path}, {database}') + else: + path = config.default_db_location + database = config.default_database + logger.debug(f'Default (path, db) {path}, {database}') + click.secho(f'Using database {database}') + full_db_path = Path(path) / database + if not full_db_path.is_dir(): + logger.warning(f'No database at {database}') + click.echo(f'Could not find a database.') + click.echo(f'You need to initialize the testbed before before you add devices!') + click.echo(f'To initialize the testbed in the default location run "iottb init-db"') + click.echo('Exiting...') + exit() + + # Step 3: Check if device already exists in database + # dependency: DeviceMetadata object + device_metadata = DeviceMetadata(device_name=dev) + device_dir = full_db_path / device_metadata.canonical_name + + # Check if device is already registered + if device_dir.exists(): + logger.warning(f'Device directory {device_dir} already exists.') + click.echo(f'Device {dev} already exists in the database.') + click.echo('Exiting...') + exit() + try: + device_dir.mkdir() + except OSError as e: + logger.error(f'Error trying to create device {e}') + click.echo('Exiting...') + exit() + + # Step 4: Save metadata into device_dir + metadata_path = device_dir / definitions.DEVICE_METADATA_FILE_NAME + with metadata_path.open('w') as metadata_file: + json.dump(device_metadata.__dict__, metadata_file, indent=4) + click.echo(f'Successfully added device {dev} to database') + logger.debug(f'Added device {dev} to database {database}. Full path of metadata {metadata_path}') + logger.info(f'Metadata for {dev} {device_metadata.print_attributes()}') + + diff --git a/code/iottb-project/iottb/commands/developer.py b/code/iottb-project/iottb/commands/developer.py new file mode 100644 index 0000000..d1e992b --- /dev/null +++ b/code/iottb-project/iottb/commands/developer.py @@ -0,0 +1,130 @@ +from pathlib import Path +import logging +import click + +from iottb import tb_echo +from iottb.definitions import DB_NAME, CFG_FILE_PATH +from iottb.models.iottb_config import IottbConfig + +logger = logging.getLogger(__name__) + + +@click.group('util') +def tb(): + pass + + +@click.command() +@click.option('--file', default=DB_NAME) +@click.option('--table', type=str, default='DefaultDatabase') +@click.option('--key') +@click.option('--value') +@click.pass_context +def set_key_in_table_to(ctx, file, table, key, value): + """Edit config or metadata files. TODO: Implement""" + click.echo(f'set_key_in_table_to invoked') + logger.warning("Unimplemented subcommand invoked.") + + +@click.command() +@click.confirmation_option(prompt="Are you certain that you want to delete the cfg file?") +def rm_cfg(): + """ Removes the cfg file from the filesystem. + + This is mostly a utility during development. Once non-standard database locations are implemented, + deleting this would lead to iottb not being able to find them anymore. + """ + Path(CFG_FILE_PATH).unlink() + click.echo(f'Iottb configuration removed at {CFG_FILE_PATH}') + + +@click.command() +@click.confirmation_option(prompt="Are you certain that you want to delete the databases file?") +def rm_dbs(dbs): + """ Removes ALL(!) databases from the filesystem if they're empty. + + Development utility currently unfit for use. + """ + config = IottbConfig() + paths = config.get_know_database_paths() + logger.debug(f'Known db paths: {str(paths)}') + for dbs in paths: + try: + Path(dbs).rmdir() + click.echo(f'{dbs} deleted') + except Exception as e: + logger.debug(f'Failed unlinking db {dbs} with error {e}') + logger.info(f'All databases deleted') + + +@click.command('show-cfg', help='Show the current configuration context') +@click.option('--cfg-file', type=click.Path(), default=CFG_FILE_PATH, help='Path to the config file') +@click.option('-pp', is_flag=True, default=False, help='Pretty Print') +@click.pass_context +def show_cfg(ctx, cfg_file, pp): + logger.debug(f'Pretty print option set to {pp}') + if pp: + try: + config = IottbConfig(Path(cfg_file)) + click.echo("Configuration Context:") + click.echo(f"Default Database: {config.default_database}") + click.echo(f"Default Database Path: {config.default_db_location}") + click.echo("Database Locations:") + for db_name, db_path in config.db_path_dict.items(): + click.echo(f" - {db_name}: {db_path}") + except Exception as e: + logger.error(f"Error loading configuration: {e}") + click.echo(f"Failed to load configuration from {cfg_file}") + else: + path = Path(cfg_file) + + if path.is_file(): + with path.open('r') as file: + content = file.read() + click.echo(content) + else: + click.echo(f"Configuration file not found at {cfg_file}") + + +@click.command('show-all', help='Show everything: configuration, databases, and device metadata') +@click.pass_context +def show_everything(ctx): + """Show everything that can be recursively found based on config except file contents.""" + config = ctx.obj['CONFIG'] + click.echo("Configuration Context:") + click.echo(f"Default Database: {config.default_database}") + click.echo(f"Default Database Path: {config.default_db_location}") + click.echo("Database Locations:") + everything_dict = {} + for db_name, db_path in config.db_path_dict.items(): + + click.echo(f" - {db_name}: {db_path}") + for db_name, db_path in config.db_path_dict.items(): + full_db_path = Path(db_path) / db_name + if full_db_path.is_dir(): + click.echo(f"\nContents of {full_db_path}:") + flag = True + for item in full_db_path.iterdir(): + flag = False + if item.is_file(): + click.echo(f" - {item.name}") + try: + with item.open('r', encoding='utf-8') as file: + content = file.read() + click.echo(f" Content:\n{content}") + except UnicodeDecodeError: + click.echo(" Content is not readable as text") + elif item.is_dir(): + click.echo(f" - {item.name}/") + for subitem in item.iterdir(): + if subitem.is_file(): + click.echo(f" - {subitem.name}") + elif subitem.is_dir(): + click.echo(f" - {subitem.name}/") + if flag: + tb_echo(f'\t EMPTY') + else: + click.echo(f"{full_db_path} is not a directory") + + + diff --git a/code/iottb-project/iottb/commands/sniff.py b/code/iottb-project/iottb/commands/sniff.py new file mode 100644 index 0000000..0eef81c --- /dev/null +++ b/code/iottb-project/iottb/commands/sniff.py @@ -0,0 +1,327 @@ +import os +import shutil +import uuid +from time import time + +import click +import subprocess +import json +from pathlib import Path +import logging +import re +from datetime import datetime +from click_option_group import optgroup +from iottb.definitions import APP_NAME, CFG_FILE_PATH +from iottb.models.iottb_config import IottbConfig +from iottb.utils.string_processing import make_canonical_name + +# Setup logger +logger = logging.getLogger('iottb.sniff') + + +def is_ip_address(address): + ip_pattern = re.compile(r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$") + return ip_pattern.match(address) is not None + + +def is_mac_address(address): + mac_pattern = re.compile(r"^([0-9A-Fa-f]{2}:){5}[0-9A-Fa-f]{2}$") + return mac_pattern.match(address) is not None + + +def load_config(cfg_file): + """Loads configuration from the given file path.""" + with open(cfg_file, 'r') as config_file: + return json.load(config_file) + + +def validate_sniff(ctx, param, value): + logger.info('Validating sniff...') + if ctx.params.get('unsafe') and not value: + return None + if not ctx.params.get('unsafe') and not value: + raise click.BadParameter('Address is required unless --unsafe is set.') + if not is_ip_address(value) and not is_mac_address(value): + raise click.BadParameter('Address must be a valid IP address or MAC address.') + return value + + +@click.command('sniff', help='Sniff packets with tcpdump') +@optgroup.group('Testbed sources') +@optgroup.option('--db', '--database', type=str, envvar='IOTTB_DB', show_envvar=True, + help='Database of device. Only needed if not current default.') +@optgroup.option('--app', type=str, help='Companion app being used during capture', required=False) +@optgroup.group('Runtime behaviour') +@optgroup.option('--unsafe', is_flag=True, default=False, envvar='IOTTB_UNSAFE', is_eager=True, + help='Disable checks for otherwise required options.\n', show_envvar=True) +@optgroup.option('--guided', is_flag=True, default=False, envvar='IOTTB_GUIDED', show_envvar=True) +@optgroup.option('--pre', type=click.Path(exists=True, executable=True), help='Script to be executed before main ' + 'command' + 'is started.') +@optgroup.group('Tcpdump options') +@optgroup.option('-i', '--interface', + help='Network interface to capture on.' + + 'If not specified tcpdump tries to find and appropriate one.\n', show_envvar=True, + envvar='IOTTB_CAPTURE_INTERFACE') +@optgroup.option('-a', '--address', callback=validate_sniff, + help='IP or MAC address to filter packets by.\n', show_envvar=True, + envvar='IOTTB_CAPTURE_ADDRESS') +@optgroup.option('-I', '--monitor-mode', help='Put interface into monitor mode.', is_flag=True) +@optgroup.option('--ff', type=str, envvar='IOTTB_CAPTURE_FILTER', show_envvar=True, + help='tcpdump filter as string or file path.') +@optgroup.option('-#', '--print-pacno', is_flag=True, default=True, + help='Print packet number at beginning of line. True by default.') +@optgroup.option('-e', '--print-ll', is_flag=True, default=False, + help='Print link layer headers. True by default.') +@optgroup.option('-c', '--count', type=int, help='Number of packets to capture.', default=1000) +# @optgroup.option('--mins', type=int, help='Time in minutes to capture.', default=1) +@click.argument('tcpdump-args', nargs=-1, required=False, metavar='[TCPDUMP-ARGS]') +@click.argument('device', required=False) +@click.pass_context +def sniff(ctx, device, interface, print_pacno, ff, count, monitor_mode, print_ll, address, db, unsafe, guided, + app, tcpdump_args, **params): + """ Sniff packets from a device """ + logger.info('sniff command invoked') + + # Step1: Load Config + config = ctx.obj['CONFIG'] + logger.debug(f'Config loaded: {config}') + + # Step2: determine relevant database + database = db if db else config.default_database + path = config.db_path_dict[database] + full_db_path = Path(path) / database + logger.debug(f'Full db path is {str(full_db_path)}') + + # 2.2: Check if it exists + if not full_db_path.is_dir(): + logger.error('DB unexpectedly missing') + click.echo('DB unexpectedly missing') + return + + canonical_name, aliases = make_canonical_name(device) + click.echo(f'Using canonical device name {canonical_name}') + device_path = full_db_path / canonical_name + + # Step 3: now the device + if not device_path.exists(): + if not unsafe: + logger.error(f'Device path {device_path} does not exist') + click.echo(f'Device path {device_path} does not exist') + return + else: + device_path.mkdir(parents=True, exist_ok=True) + logger.info(f'Device path {device_path} created') + + click.echo(f'Found device at path {device_path}') + # Step 4: Generate filter + generic_filter = None + cap_filter = None + if ff: + logger.debug(f'ff: {ff}') + if Path(ff).is_file(): + logger.info('Given filter option is a file') + with open(ff, 'r') as f: + cap_filter = f.read().strip() + else: + logger.info('Given filter option is an expression') + cap_filter = ff + else: + if address is not None: + if is_ip_address(address): + generic_filter = 'net' + cap_filter = f'{generic_filter} {address}' + elif is_mac_address(address): + generic_filter = 'ether net' + cap_filter = f'{generic_filter} {address}' + elif not unsafe: + logger.error('Invalid address format') + click.echo('Invalid address format') + return + + logger.info(f'Generic filter {generic_filter}') + click.echo(f'Using filter {cap_filter}') + + # Step 5: prep capture directory + capture_date = datetime.now().strftime('%Y-%m-%d') + capture_base_dir = device_path / f'sniffs/{capture_date}' + capture_base_dir.mkdir(parents=True, exist_ok=True) + + logger.debug(f'Previous captures {capture_base_dir.glob('cap*')}') + capture_count = sum(1 for _ in capture_base_dir.glob('cap*')) + logger.debug(f'Capture count is {capture_count}') + + capture_dir = f'cap{capture_count:04d}-{datetime.now().strftime('%H%M')}' + logger.debug(f'capture_dir: {capture_dir}') + + # Full path + capture_dir_full_path = capture_base_dir / capture_dir + capture_dir_full_path.mkdir(parents=True, exist_ok=True) + + click.echo(f'Files will be placed in {str(capture_dir_full_path)}') + logger.debug(f'successfully created capture directory') + + # Step 6: Prepare capture file names + # Generate UUID for filenames + capture_uuid = str(uuid.uuid4()) + click.echo(f'Capture has id {capture_uuid}') + + pcap_file = f"{canonical_name}_{capture_uuid}.pcap" + pcap_file_full_path = capture_dir_full_path / pcap_file + stdout_log_file = f'stdout_{capture_uuid}.log' + stderr_log_file = f'stderr_{capture_uuid}.log' + + logger.debug(f'Full pcap file path is {pcap_file_full_path}') + logger.info(f'pcap file name is {pcap_file}') + logger.info(f'stdout log file is {stdout_log_file}') + logger.info(f'stderr log file is {stderr_log_file}') + + # Step 7: Build tcpdump command + logger.debug(f'pgid {os.getpgrp()}') + logger.debug(f'ppid {os.getppid()}') + logger.debug(f'(real, effective, saved) user id: {os.getresuid()}') + logger.debug(f'(real, effective, saved) group id: {os.getresgid()}') + + cmd = ['sudo', 'tcpdump'] + + # 7.1 process flags + flags = [] + if print_pacno: + flags.append('-#') + if print_ll: + flags.append('-e') + if monitor_mode: + flags.append('-I') + flags.append('-n') # TODO: Integrate, in case name resolution is wanted! + cmd.extend(flags) + flags_string = " ".join(flags) + logger.debug(f'Flags: {flags_string}') + + # debug interlude + verbosity = ctx.obj['VERBOSITY'] + if verbosity > 0: + verbosity_flag = '-' + for i in range(0, verbosity): + verbosity_flag = verbosity_flag + 'v' + logger.debug(f'verbosity string to pass to tcpdump: {verbosity_flag}') + cmd.append(verbosity_flag) + + # 7.2 generic (i.e. reusable) kw args + generic_kw_args = [] + if count: + generic_kw_args.extend(['-c', str(count)]) + # if mins: + # generic_kw_args.extend(['-G', str(mins * 60)]) TODO: this currently loads to errors with sudo + cmd.extend(generic_kw_args) + generic_kw_args_string = " ".join(generic_kw_args) + logger.debug(f'KW args: {generic_kw_args_string}') + + # 7.3 special kw args (not a priori reusable) + non_generic_kw_args = [] + if interface: + non_generic_kw_args.extend(['-i', interface]) + non_generic_kw_args.extend(['-w', str(pcap_file_full_path)]) + cmd.extend(non_generic_kw_args) + non_generic_kw_args_string = " ".join(non_generic_kw_args) + logger.debug(f'Non transferable (special) kw args: {non_generic_kw_args_string}') + + # 7.4 add filter expression + if cap_filter: + logger.debug(f'cap_filter (not generic): {cap_filter}') + cmd.append(cap_filter) + + full_cmd_string = " ".join(cmd) + + logger.info(f'tcpdump command: {"".join(full_cmd_string)}') + click.echo('Capture setup complete!') + # Step 8: Execute tcpdump command + start_time = datetime.now().strftime("%H:%M:%S") + start = time() + try: + if guided: + click.confirm(f'Execute following command: {full_cmd_string}') + stdout_log_file_abs_path = capture_dir_full_path / stdout_log_file + stderr_log_file_abs_path = capture_dir_full_path / stderr_log_file + stdout_log_file_abs_path.touch(mode=0o777) + stderr_log_file_abs_path.touch(mode=0o777) + with open(stdout_log_file_abs_path, 'w') as out, open(stderr_log_file_abs_path, 'w') as err: + logger.debug(f'\nstdout: {out}.\nstderr: {err}.\n') + + tcp_complete = subprocess.run(cmd, check=True, capture_output=True, text=True) + + out.write(tcp_complete.stdout) + err.write(tcp_complete.stderr) + + #click.echo(f'Mock sniff execution') + click.echo(f"Capture complete. Saved to {pcap_file}") + except subprocess.CalledProcessError as e: + logger.error(f'Failed to capture packets: {e}') + click.echo(f'Failed to capture packets: {e}') + click.echo(f'Check {stderr_log_file} for more info.') + if ctx.obj['DEBUG']: + msg = [f'STDERR log {stderr_log_file} contents:\n'] + with open(capture_dir_full_path / stderr_log_file) as log: + for line in log: + msg.append(line) + + click.echo("\t".join(msg), lvl='e') + # print('DEBUG ACTIVE') + if guided: + click.prompt('Create metadata anyway?') + else: + click.echo('Aborting capture...') + exit() + end_time = datetime.now().strftime("%H:%M:%S") + end = time() + delta = end - start + click.echo(f'tcpdump took {delta:.2f} seconds.') + # Step 9: Register metadata + metadata = { + 'device': canonical_name, + 'device_id': device, + 'capture_id': capture_uuid, + 'capture_date_iso': datetime.now().isoformat(), + 'invoked_command': " ".join(map(str, cmd)), + 'capture_duration': delta, + 'generic_parameters': { + 'flags': flags_string, + 'kwargs': generic_kw_args_string, + 'filter': generic_filter + }, + 'non_generic_parameters': { + 'kwargs': non_generic_kw_args_string, + 'filter': cap_filter + }, + 'features': { + 'interface': interface, + 'address': address + }, + 'resources': { + 'pcap_file': str(pcap_file), + 'stdout_log': str(stdout_log_file), + 'stderr_log': str(stderr_log_file) + }, + 'environment': { + 'capture_dir': capture_dir, + 'database': database, + 'capture_base_dir': str(capture_base_dir), + 'capture_dir_abs_path': str(capture_dir_full_path) + } + } + + click.echo('Ensuring correct ownership of created files.') + username = os.getlogin() + gid = os.getgid() + + # Else there are issues when running with sudo: + try: + subprocess.run(f'sudo chown -R {username}:{username} {device_path}', shell=True) + except OSError as e: + click.echo(f'Some error {e}') + + click.echo(f'Saving metadata.') + metadata_abs_path = capture_dir_full_path / 'capture_metadata.json' + with open(metadata_abs_path, 'w') as f: + json.dump(metadata, f, indent=4) + + click.echo(f'END SNIFF SUBCOMMAND') diff --git a/code/iottb-project/iottb/commands/testbed.py b/code/iottb-project/iottb/commands/testbed.py new file mode 100644 index 0000000..eb26a9f --- /dev/null +++ b/code/iottb-project/iottb/commands/testbed.py @@ -0,0 +1,120 @@ +import click +from pathlib import Path +import logging +from logging.handlers import RotatingFileHandler +import sys +from iottb.models.iottb_config import IottbConfig +from iottb.definitions import DB_NAME, CFG_FILE_PATH + +logger = logging.getLogger(__name__) + + +@click.command() +@click.option('-d', '--dest', type=click.Path(), help='Location to put (new) iottb database') +@click.option('-n', '--name', default=DB_NAME, type=str, help='Name of new database.') +@click.option('--update-default/--no-update-default', default=True, help='If new db should be set as the new default') +@click.pass_context +def init_db(ctx, dest, name, update_default): + logger.info('init-db invoked') + config = ctx.obj['CONFIG'] + logger.debug(f'str(config)') + # Use the default path from config if dest is not provided + known_dbs = config.get_known_databases() + logger.debug(f'Known databases: {known_dbs}') + if name in known_dbs: + dest = config.get_database_location(name) + if Path(dest).joinpath(name).is_dir(): + click.echo(f'A database {name} already exists.') + logger.debug(f'DB {name} exists in {dest}') + click.echo(f'Exiting...') + exit() + logger.debug(f'DB name {name} registered but does not exist.') + if not dest: + logger.info('No dest set, choosing default destination.') + dest = Path(config.default_db_location).parent + + db_path = Path(dest).joinpath(name) + logger.debug(f'Full path for db {str(db_path)}') + # Create the directory if it doesn't exist + db_path.mkdir(parents=True, exist_ok=True) + logger.info(f"mkdir {db_path} successful") + click.echo(f'Created {db_path}') + + # Update configuration + config.set_database_location(name, str(dest)) + if update_default: + config.set_default_database(name, str(dest)) + config.save_config() + logger.info(f"Updated configuration with database {name} at {db_path}") + + +# @click.group('config') +# @click.pass_context +# def cfg(ctx): +# pass +# +# @click.command('set', help='Set the location of a database.') +# @click.argument('database', help='Name of database') +# @click.argument('location', help='Where the database is located (i.e. its parent directory)') +# @click.pass_context +# def set(ctx, key, value): +# click.echo(f'Setting {key} to {value} in config') +# config = ctx.obj['CONFIG'] +# logger.warning('No checks performed!') +# config.set_database_location(key, value) +# config.save_config() + + + + + +@click.command() +@click.option('-d', '--dest', type=click.Path(), help='Location to put (new) iottb database') +@click.option('-n', '--name', default=DB_NAME, type=str, help='Name of new database.') +@click.option('--update-default/--no-update-default', default=True, help='If new db should be set as the new default') +@click.pass_context +def init_db_inactive(ctx, dest, name, update_default): + logger.info('init-db invoked') + config = ctx.obj['CONFIG'] + logger.debug(f'str(config)') + + # Retrieve known databases + known_dbs = config.get_known_databases() + + # Determine destination path + if name in known_dbs: + dest = Path(config.get_database_location(name)) + if dest.joinpath(name).is_dir(): + click.echo(f'A database {name} already exists.') + logger.debug(f'DB {name} exists in {dest}') + click.echo(f'Exiting...') + exit() + logger.debug(f'DB name {name} registered but does not exist.') + elif not dest: + logger.info('No destination set, using default path from config.') + dest = Path(config.default_db_location).parent + + # Ensure destination path is absolute + dest = dest.resolve() + + # Combine destination path with database name + db_path = dest / name + logger.debug(f'Full path for database: {str(db_path)}') + + # Create the directory if it doesn't exist + try: + db_path.mkdir(parents=True, exist_ok=True) + logger.info(f'Directory {db_path} created successfully.') + click.echo(f'Created {db_path}') + except Exception as e: + logger.error(f'Failed to create directory {db_path}: {e}') + click.echo(f'Failed to create directory {db_path}: {e}', err=True) + exit(1) + + # Update configuration + config.set_database_location(name, str(db_path)) + if update_default: + config.set_default_database(name, str(db_path)) + config.save_config() + logger.info(f'Updated configuration with database {name} at {db_path}') + click.echo(f'Updated configuration with database {name} at {db_path}') diff --git a/code/iottb-project/iottb/definitions.py b/code/iottb-project/iottb/definitions.py new file mode 100644 index 0000000..76d594c --- /dev/null +++ b/code/iottb-project/iottb/definitions.py @@ -0,0 +1,48 @@ +import logging +from pathlib import Path + +import click + +APP_NAME = 'iottb' +DB_NAME = 'iottb.db' +CFG_FILE_PATH = str(Path(click.get_app_dir(APP_NAME)).joinpath('iottb.cfg')) +CONSOLE_LOG_FORMATS = { + 0: '%(levelname)s - %(message)s', + 1: '%(levelname)s - %(module)s - %(message)s', + 2: '%(levelname)s - %(module)s - %(funcName)s - %(lineno)d - %(message)s' +} + +LOGFILE_LOG_FORMAT = { + 0: '%(levelname)s - %(asctime)s - %(module)s - %(message)s', + 1: '%(levelname)s - %(asctime)s - %(module)s - %(funcName)s - %(message)s', + 2: '%(levelname)s - %(asctime)s - %(module)s - %(funcName)s - %(lineno)d - %(message)s' +} +MAX_VERBOSITY = len(CONSOLE_LOG_FORMATS) - 1 +assert len(LOGFILE_LOG_FORMAT) == len(CONSOLE_LOG_FORMATS), 'Log formats must be same size' + +LOGLEVEL = logging.DEBUG +LOGDIR = Path.cwd() / 'logs' + +# Characters to just replace +REPLACEMENT_SET_CANONICAL_DEVICE_NAMES = {' ', '_', ',', '!', '@', '#', '$', '%', '^', '&', '*', '(', ')', '+', '=', + '{', '}', '[', ']', + '|', + '\\', ':', ';', '"', "'", '<', '>', '?', '/', '`', '~'} +# Characters to possibly error on +ERROR_SET_CANONICAL_DEVICE_NAMES = {',', '!', '@', '#', '$', '%', '^', '&', '*', '(', ')', '+', '=', '{', '}', '[', ']', + '|', + '\\', ':', ';', '"', "'", '<', '>', '?', '/', '`', '~'} + +DEVICE_METADATA_FILE_NAME = 'device_metadata.json' + +TB_ECHO_STYLES = { + 'w': {'fg': 'yellow', 'bold': True}, + 'i': {'fg': 'blue', 'italic': True}, + 's': {'fg': 'green', 'bold': True}, + 'e': {'fg': 'red', 'bold': True}, + 'header': {'fg': 'bright_cyan', 'bold': True, 'italic': True} +} + +NAME_OF_CAPTURE_DIR = 'sniffs' + + diff --git a/code/iottb-project/iottb/main.py b/code/iottb-project/iottb/main.py new file mode 100644 index 0000000..acba6dd --- /dev/null +++ b/code/iottb-project/iottb/main.py @@ -0,0 +1,77 @@ +import os +import shutil + +import click +from pathlib import Path +import logging + +from iottb.commands.sniff import sniff +from iottb.commands.developer import set_key_in_table_to, rm_cfg, rm_dbs, show_cfg, show_everything + +################################################## +# Import package modules +################################################# +from iottb.utils.logger_config import setup_logging +from iottb import definitions +from iottb.models.iottb_config import IottbConfig +from iottb.commands.testbed import init_db +from iottb.commands.add_device import add_device + +############################################################################ +# Module shortcuts for global definitions +########################################################################### +APP_NAME = definitions.APP_NAME +DB_NAME = definitions.DB_NAME +CFG_FILE_PATH = definitions.CFG_FILE_PATH +# These are (possibly) redundant when defined in definitions.py +# keeping them here until refactored and tested +MAX_VERBOSITY = definitions.MAX_VERBOSITY + +# Logger stuff +loglevel = definitions.LOGLEVEL +logger = logging.getLogger(__name__) + + +@click.group(context_settings=dict(auto_envvar_prefix='IOTTB', show_default=True)) +@click.option('-v', '--verbosity', count=True, type=click.IntRange(0, 3), default=0, is_eager=True, + help='Set verbosity') +@click.option('-d', '--debug', is_flag=True, default=False, is_eager=True, + help='Enable debug mode') +@click.option('--dry-run', is_flag=True, default=True, is_eager=True) +@click.option('--cfg-file', type=click.Path(), + default=Path(click.get_app_dir(APP_NAME)).joinpath('iottb.cfg'), + envvar='IOTTB_CONF_HOME', help='Path to iottb config file') +@click.pass_context +def cli(ctx, verbosity, debug, dry_run, cfg_file): + setup_logging(verbosity, debug) # Setup logging based on the loaded configuration and other options + ctx.ensure_object(dict) # Make sure context is ready for use + logger.info("Starting execution.") + ctx.obj['CONFIG'] = IottbConfig(cfg_file) # Load configuration directly + ctx.meta['FULL_PATH_CONFIG_FILE'] = str(cfg_file) + ctx.meta['DRY_RUN'] = dry_run + logger.debug(f'Verbosity: {verbosity}') + ctx.obj['VERBOSITY'] = verbosity + logger.debug(f'Debug: {debug}') + ctx.obj['DEBUG'] = debug + + +################################################################################## +# Add all subcommands to group here +################################################################################# +# TODO: Is there a way to do this without pylint freaking out? +# noinspection PyTypeChecker +cli.add_command(init_db) +cli.add_command(rm_cfg) +cli.add_command(set_key_in_table_to) +cli.add_command(rm_dbs) +# noinspection PyTypeChecker +cli.add_command(add_device) +cli.add_command(show_cfg) +cli.add_command(sniff) +cli.add_command(show_everything) + + +if __name__ == '__main__': + cli() + for log in Path.cwd().iterdir(): + log.chmod(0o777) diff --git a/code/iottb-project/iottb/models/__init__.py b/code/iottb-project/iottb/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/code/iottb-project/iottb/models/database.py b/code/iottb-project/iottb/models/database.py new file mode 100644 index 0000000..63105f2 --- /dev/null +++ b/code/iottb-project/iottb/models/database.py @@ -0,0 +1,6 @@ +class Database: + + def __init__(self, name, path): + self.name = name + self.path = path + self.device_list = [] # List of the canonical names of devices registered in this database diff --git a/code/iottb-project/iottb/models/device_metadata.py b/code/iottb-project/iottb/models/device_metadata.py new file mode 100644 index 0000000..505677a --- /dev/null +++ b/code/iottb-project/iottb/models/device_metadata.py @@ -0,0 +1,44 @@ +import logging +import uuid +from datetime import datetime +import logging +import click + +from iottb.utils.string_processing import make_canonical_name + +logger = logging.getLogger(__name__) + + +class DeviceMetadata: + def __init__(self, device_name, description="", model="", manufacturer="", firmware_version="", device_type="", + supported_interfaces="", companion_applications="", save_to_file=None): + self.device_id = str(uuid.uuid4()) + self.device_name = device_name + cn, aliases = make_canonical_name(device_name) + logger.debug(f'cn, aliases = {cn}, {str(aliases)}') + self.aliases = aliases + self.canonical_name = cn + self.date_added = datetime.now().isoformat() + self.description = description + self.model = model + self.manufacturer = manufacturer + self.current_firmware_version = firmware_version + self.device_type = device_type + self.supported_interfaces = supported_interfaces + self.companion_applications = companion_applications + self.last_metadata_update = datetime.now().isoformat() + if save_to_file is not None: + click.echo('TODO: Implement saving config to file after creation!') + + def add_alias(self, alias: str = ""): + if alias == "": + return + self.aliases.append(alias) + + def get_canonical_name(self): + return self.canonical_name + + def print_attributes(self): + print(f'Printing attribute value pairs in {__name__}') + for attr, value in self.__dict__.items(): + print(f'{attr}: {value}') diff --git a/code/iottb-project/iottb/models/iottb_config.py b/code/iottb-project/iottb/models/iottb_config.py new file mode 100644 index 0000000..25736dc --- /dev/null +++ b/code/iottb-project/iottb/models/iottb_config.py @@ -0,0 +1,124 @@ +import json +from pathlib import Path + +from iottb import definitions +import logging + +logger = logging.getLogger(__name__) + +DB_NAME = definitions.DB_NAME + + +class IottbConfig: + """ Class to handle testbed configuration. + + TODO: Add instead of overwrite Database locations when initializing if a location with valid db + exists. + """ + + @staticmethod + def warn(): + logger.warning(f'DatabaseLocations are DatabaseLocationMap in the class {__name__}') + + def __init__(self, cfg_file=definitions.CFG_FILE_PATH): + logger.info('Initializing Config object') + IottbConfig.warn() + self.cfg_file = Path(cfg_file) + self.default_database = None + self.default_db_location = None + self.db_path_dict = dict() + self.load_config() + + def create_default_config(self): + """Create default iottb config file.""" + logger.info(f'Creating default config file at {self.cfg_file}') + self.default_database = DB_NAME + self.default_db_location = str(Path.home()) + self.db_path_dict = { + DB_NAME: self.default_db_location + } + + defaults = { + 'DefaultDatabase': self.default_database, + 'DefaultDatabasePath': self.default_db_location, + 'DatabaseLocations': self.db_path_dict + } + + try: + self.cfg_file.parent.mkdir(parents=True, exist_ok=True) + with self.cfg_file.open('w') as config_file: + json.dump(defaults, config_file, indent=4) + except IOError as e: + logger.error(f"Failed to create default configuration file at {self.cfg_file}: {e}") + raise RuntimeError(f"Failed to create configuration file: {e}") from e + + def load_config(self): + """Loads or creates default configuration from given file path.""" + logger.info('Loading configuration file') + if not self.cfg_file.is_file(): + logger.info('Config file does not exist.') + self.create_default_config() + else: + logger.info('Config file exists, opening.') + with self.cfg_file.open('r') as config_file: + data = json.load(config_file) + self.default_database = data.get('DefaultDatabase') + self.default_db_location = data.get('DefaultDatabasePath') + self.db_path_dict = data.get('DatabaseLocations', {}) + + def save_config(self): + """Save the current configuration to the config file.""" + data = { + 'DefaultDatabase': self.default_database, + 'DefaultDatabasePath': self.default_db_location, + 'DatabaseLocations': self.db_path_dict + } + try: + with self.cfg_file.open('w') as config_file: + json.dump(data, config_file, indent=4) + except IOError as e: + logger.error(f"Failed to save configuration file at {self.cfg_file}: {e}") + raise RuntimeError(f"Failed to save configuration file: {e}") from e + + def set_default_database(self, name, path): + """Set the default database and its path.""" + self.default_database = name + self.default_db_location = path + self.db_path_dict[name] = path + + def get_default_database_location(self): + return self.default_db_location + + def get_default_database(self): + return self.default_database + + def get_database_location(self, name): + """Get the location of a specific database.""" + return self.db_path_dict.get(name) + + def set_database_location(self, name, path): + """Set the location for a database.""" + logger.debug(f'Type of "path" parameter {type(path)}') + logger.debug(f'String value of "path" parameter {str(path)}') + logger.debug(f'Type of "name" parameter {type(name)}') + logger.debug(f'String value of "name" parameter {str(name)}') + path = Path(path) + name = Path(name) + logger.debug(f'path:name = {path}:{name}') + if path.name == name: + path = path.parent + self.db_path_dict[str(name)] = str(path) + + def get_known_databases(self): + """Get the set of known databases""" + logger.info(f'Getting known databases.') + + return self.db_path_dict.keys() + + def get_know_database_paths(self): + """Get the paths of all known databases""" + logger.info(f'Getting known database paths.') + return self.db_path_dict.values() + + def get_full_default_path(self): + return Path(self.default_db_location) / self.default_database diff --git a/code/iottb-project/iottb/models/sniff_metadata.py b/code/iottb-project/iottb/models/sniff_metadata.py new file mode 100644 index 0000000..9fa5e11 --- /dev/null +++ b/code/iottb-project/iottb/models/sniff_metadata.py @@ -0,0 +1,39 @@ +import json +import logging +import uuid +from datetime import datetime +from pathlib import Path + +logger = logging.getLogger('iottb.sniff') # Log with sniff subcommand + +class CaptureMetadata: + def __init__(self, device_id, capture_dir, interface, address, capture_file, tcpdump_command, tcpdump_stdout, tcpdump_stderr, packet_filter, alias): + self.base_data = { + 'device_id': device_id, + 'capture_id': str(uuid.uuid4()), + 'capture_date': datetime.now().isoformat(), + 'capture_dir': str(capture_dir), + 'capture_file': capture_file, + 'start_time': "", + 'stop_time': "", + 'alias': alias + } + self.features = { + 'interface': interface, + 'device_ip_address': address if address else "No IP Address set", + 'tcpdump_stdout': str(tcpdump_stdout), + 'tcpdump_stderr': str(tcpdump_stderr), + 'packet_filter': packet_filter + } + self.command = tcpdump_command + + def save_to_file(self): + metadata = { + 'base_data': self.base_data, + 'features': self.features, + 'command': self.command + } + metadata_file_path = Path(self.base_data['capture_dir']) / 'metadata.json' + with open(metadata_file_path, 'w') as f: + json.dump(metadata, f, indent=4) + logger.info(f'Metadata saved to {metadata_file_path}') diff --git a/code/iottb-project/iottb/scripts/generate_help.py b/code/iottb-project/iottb/scripts/generate_help.py new file mode 100755 index 0000000..fd4b683 --- /dev/null +++ b/code/iottb-project/iottb/scripts/generate_help.py @@ -0,0 +1,52 @@ +import click +from io import StringIO +import sys + +# Import your CLI app here +from iottb.main import cli + +"""Script to generate the help text and write to file. + + Definitely needs better formatting. + Script is also not very flexible. +""" + + +def get_help_text(command): + """Get the help text for a given command.""" + help_text = StringIO() + with click.Context(command) as ctx: + # chatgpt says this helps: was right + sys_stdout = sys.stdout + sys.stdout = help_text + try: + click.echo(command.get_help(ctx)) + finally: + sys.stdout = sys_stdout + return help_text.getvalue() + + +def write_help_to_file(cli, filename): + """Write help messages of all commands and subcommands to a file.""" + with open(filename, 'w') as f: + # main + f.write(f"Main Command: iottb\n") + f.write(get_help_text(cli)) + f.write("\n\n") + + # go through subcommands + for cmd_name, cmd in cli.commands.items(): + f.write(f"Command: {cmd_name}\n") + f.write(get_help_text(cmd)) + f.write("\n\n") + + # subcommands of subcommands + if isinstance(cmd, click.Group): + for sub_cmd_name, sub_cmd in cmd.commands.items(): + f.write(f"Subcommand: {cmd_name} {sub_cmd_name}\n") + f.write(get_help_text(sub_cmd)) + f.write("\n\n") + + +if __name__ == "__main__": + write_help_to_file(cli, "help_messages.md") diff --git a/code/iottb-project/iottb/scripts/sudo_iottb b/code/iottb-project/iottb/scripts/sudo_iottb new file mode 100644 index 0000000..1f1af62 --- /dev/null +++ b/code/iottb-project/iottb/scripts/sudo_iottb @@ -0,0 +1,4 @@ +#/bin/sh +echo 'Running iottb as sudo' +sudo $(which python) iottb $@ +echo 'Finished executing iottb with sudo' \ No newline at end of file diff --git a/code/iottb-project/iottb/utils/__init__.py b/code/iottb-project/iottb/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/code/iottb-project/iottb/utils/logger_config.py b/code/iottb-project/iottb/utils/logger_config.py new file mode 100644 index 0000000..5cf76ad --- /dev/null +++ b/code/iottb-project/iottb/utils/logger_config.py @@ -0,0 +1,41 @@ +import logging +import sys +from logging.handlers import RotatingFileHandler + +from iottb import definitions +from iottb.definitions import MAX_VERBOSITY, CONSOLE_LOG_FORMATS, APP_NAME, LOGFILE_LOG_FORMAT + +loglevel = definitions.LOGLEVEL + + +def setup_logging(verbosity, debug=loglevel): + """ Setup root logger for iottb """ + log_level = loglevel + handlers = [] + date_format = '%Y-%m-%d %H:%M:%S' + if verbosity > 0: + log_level = logging.WARNING + if verbosity > MAX_VERBOSITY: + verbosity = MAX_VERBOSITY + log_level = logging.INFO + assert verbosity <= MAX_VERBOSITY, f'Verbosity must be <= {MAX_VERBOSITY}' + console_handler = logging.StreamHandler(sys.stdout) + print(str(sys.stdout)) + console_handler.setFormatter(logging.Formatter(CONSOLE_LOG_FORMATS[verbosity], datefmt=date_format)) + console_handler.setLevel(logging.DEBUG) # can keep at debug since it depends on global level? + handlers.append(console_handler) + + if debug: + log_level = logging.DEBUG + + # Logfile logs INFO+, no debugs though + file_handler = RotatingFileHandler(f'{str(definitions.LOGDIR / APP_NAME)}.log', maxBytes=10240, backupCount=5) + file_handler.setFormatter(logging.Formatter(LOGFILE_LOG_FORMAT[verbosity], datefmt=date_format)) + file_handler.setLevel(logging.INFO) + + # finnish root logger setup + handlers.append(file_handler) + # Force this config to be applied to root logger + logging.basicConfig(level=log_level, handlers=handlers, force=True) + + diff --git a/code/iottb-project/iottb/utils/string_processing.py b/code/iottb-project/iottb/utils/string_processing.py new file mode 100644 index 0000000..7b2ae39 --- /dev/null +++ b/code/iottb-project/iottb/utils/string_processing.py @@ -0,0 +1,40 @@ +import re +from iottb import definitions +import logging + +logger = logging.getLogger(__name__) + + +def normalize_string(s, chars_to_replace=None, replacement=None, allow_unicode=False): + pass + + +def make_canonical_name(name): + """ + Normalize the device name to a canonical form: + - Replace the first two occurrences of spaces and transform characters with dashes. + - Remove any remaining spaces and non-ASCII characters. + - Convert to lowercase. + """ + aliases = [name] + logger.info(f'Normalizing name {name}') + + # We first normalize + chars_to_replace = definitions.REPLACEMENT_SET_CANONICAL_DEVICE_NAMES + pattern = re.compile('|'.join(re.escape(char) for char in chars_to_replace)) + norm_name = pattern.sub('-', name) + norm_name = re.sub(r'[^\x00-\x7F]+', '', norm_name) # removes non ascii chars + + aliases.append(norm_name) + # Lower case + norm_name = norm_name.lower() + aliases.append(norm_name) + + # canonical name is only first two parts of resulting string + parts = norm_name.split('-') + canonical_name = canonical_name = '-'.join(parts[:2]) + aliases.append(canonical_name) + aliases = list(set(aliases)) + logger.debug(f'Canonical name: {canonical_name}') + logger.debug(f'Aliases: {aliases}') + return canonical_name, aliases diff --git a/code/iottb-project/iottb/utils/user_interaction.py b/code/iottb-project/iottb/utils/user_interaction.py new file mode 100644 index 0000000..767e286 --- /dev/null +++ b/code/iottb-project/iottb/utils/user_interaction.py @@ -0,0 +1,42 @@ +# iottb/utils/user_interaction.py + +import click +from iottb.definitions import TB_ECHO_STYLES +import sys +import os + + +def tb_echo2(msg: str, lvl='i', log=True): + style = TB_ECHO_STYLES.get(lvl, {}) + click.secho(f'[IOTTB]', **style) + click.secho(f'[IOTTB] \t {msg}', **style) + + +last_prefix = None + + +def tb_echo(msg: str, lvl='i', log=True): + global last_prefix + prefix = f'Testbed [{lvl.upper()}]\n' + + if last_prefix != prefix: + click.secho(prefix, nl=False, **TB_ECHO_STYLES['header']) + last_prefix = prefix + + click.secho(f' {msg}', **TB_ECHO_STYLES[lvl]) + + +def main(): + tb_echo('Info message', 'i') + tb_echo('Warning message', 'w') + tb_echo('Error message', 'e') + tb_echo('Success message', 's') + + +if __name__ == '__main__': + # arrrgggg hacky + current_dir = os.path.dirname(os.path.abspath(__file__)) + project_root = os.path.abspath(os.path.join(current_dir, '../../')) + sys.path.insert(0, project_root) + + main() diff --git a/code/iottb-project/poetry.lock b/code/iottb-project/poetry.lock new file mode 100644 index 0000000..b23fb0e --- /dev/null +++ b/code/iottb-project/poetry.lock @@ -0,0 +1,103 @@ +# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. + +[[package]] +name = "click" +version = "8.1.7" +description = "Composable command line interface toolkit" +optional = false +python-versions = ">=3.7" +files = [ + {file = "click-8.1.7-py3-none-any.whl", hash = "sha256:ae74fb96c20a0277a1d615f1e4d73c8414f5a98db8b799a7931d1582f3390c28"}, + {file = "click-8.1.7.tar.gz", hash = "sha256:ca9853ad459e787e2192211578cc907e7594e294c7ccc834310722b41b9ca6de"}, +] + +[package.dependencies] +colorama = {version = "*", markers = "platform_system == \"Windows\""} + +[[package]] +name = "colorama" +version = "0.4.6" +description = "Cross-platform colored terminal text." +optional = false +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7" +files = [ + {file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"}, + {file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"}, +] + +[[package]] +name = "iniconfig" +version = "2.0.0" +description = "brain-dead simple config-ini parsing" +optional = false +python-versions = ">=3.7" +files = [ + {file = "iniconfig-2.0.0-py3-none-any.whl", hash = "sha256:b6a85871a79d2e3b22d2d1b94ac2824226a63c6b741c88f7ae975f18b6778374"}, + {file = "iniconfig-2.0.0.tar.gz", hash = "sha256:2d91e135bf72d31a410b17c16da610a82cb55f6b0477d1a902134b24a455b8b3"}, +] + +[[package]] +name = "packaging" +version = "24.1" +description = "Core utilities for Python packages" +optional = false +python-versions = ">=3.8" +files = [ + {file = "packaging-24.1-py3-none-any.whl", hash = "sha256:5b8f2217dbdbd2f7f384c41c628544e6d52f2d0f53c6d0c3ea61aa5d1d7ff124"}, + {file = "packaging-24.1.tar.gz", hash = "sha256:026ed72c8ed3fcce5bf8950572258698927fd1dbda10a5e981cdf0ac37f4f002"}, +] + +[[package]] +name = "pluggy" +version = "1.5.0" +description = "plugin and hook calling mechanisms for python" +optional = false +python-versions = ">=3.8" +files = [ + {file = "pluggy-1.5.0-py3-none-any.whl", hash = "sha256:44e1ad92c8ca002de6377e165f3e0f1be63266ab4d554740532335b9d75ea669"}, + {file = "pluggy-1.5.0.tar.gz", hash = "sha256:2cffa88e94fdc978c4c574f15f9e59b7f4201d439195c3715ca9e2486f1d0cf1"}, +] + +[package.extras] +dev = ["pre-commit", "tox"] +testing = ["pytest", "pytest-benchmark"] + +[[package]] +name = "pytest" +version = "8.2.2" +description = "pytest: simple powerful testing with Python" +optional = false +python-versions = ">=3.8" +files = [ + {file = "pytest-8.2.2-py3-none-any.whl", hash = "sha256:c434598117762e2bd304e526244f67bf66bbd7b5d6cf22138be51ff661980343"}, + {file = "pytest-8.2.2.tar.gz", hash = "sha256:de4bb8104e201939ccdc688b27a89a7be2079b22e2bd2b07f806b6ba71117977"}, +] + +[package.dependencies] +colorama = {version = "*", markers = "sys_platform == \"win32\""} +iniconfig = "*" +packaging = "*" +pluggy = ">=1.5,<2.0" + +[package.extras] +dev = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"] + +[[package]] +name = "scapy" +version = "2.5.0" +description = "Scapy: interactive packet manipulation tool" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, <4" +files = [ + {file = "scapy-2.5.0.tar.gz", hash = "sha256:5b260c2b754fd8d409ba83ee7aee294ecdbb2c235f9f78fe90bc11cb6e5debc2"}, +] + +[package.extras] +basic = ["ipython"] +complete = ["cryptography (>=2.0)", "ipython", "matplotlib", "pyx"] +docs = ["sphinx (>=3.0.0)", "sphinx_rtd_theme (>=0.4.3)", "tox (>=3.0.0)"] + +[metadata] +lock-version = "2.0" +python-versions = "^3.12" +content-hash = "10b2c268b0f10db15eab2cca3d2dc9dc25bc60f4b218ebf786fb780fa85557e0" diff --git a/code/iottb-project/pyproject.toml b/code/iottb-project/pyproject.toml new file mode 100644 index 0000000..80acba6 --- /dev/null +++ b/code/iottb-project/pyproject.toml @@ -0,0 +1,21 @@ +[tool.poetry] +name = "iottb" +version = "0.1.0" +description = "IoT Testbed" +authors = ["Sebastian Lenzlinger "] +readme = "README.md" + +[tool.poetry.dependencies] +python = "^3.12" +click = "^8.1" +scapy = "^2.5" + +[tool.poetry.scripts] +iottb = "iottb.main:cli" + +[tool.poetry.group.test.dependencies] +pytest = "^8.2.2" + +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" diff --git a/code/iottb-project/tests/__init__.py b/code/iottb-project/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/code/iottb-project/tests/test_make_canonical_name.py b/code/iottb-project/tests/test_make_canonical_name.py new file mode 100644 index 0000000..eac541c --- /dev/null +++ b/code/iottb-project/tests/test_make_canonical_name.py @@ -0,0 +1,23 @@ +from iottb.utils.string_processing import make_canonical_name + +import pytest + + +class TestMakeCanonicalName: + + def test_normalizes_name_with_spaces_to_dashes(self): + name = "Device Name With Spaces" + expected_canonical_name = "device-name" + canonical_name, aliases = make_canonical_name(name) + assert canonical_name == expected_canonical_name + assert "device-name-with-spaces" in aliases + assert "device-name" in aliases + assert "Device Name With Spaces" in aliases + + def test_name_with_no_spaces_or_special_characters(self): + name = "DeviceName123" + expected_canonical_name = "devicename123" + canonical_name, aliases = make_canonical_name(name) + assert canonical_name == expected_canonical_name + assert "DeviceName123" in aliases + assert "devicename123" in aliases diff --git a/code/iottb/scripts/wifi_ctl.sh b/code/iottb/scripts/wifi_ctl.sh new file mode 100644 index 0000000..076d2fd --- /dev/null +++ b/code/iottb/scripts/wifi_ctl.sh @@ -0,0 +1,55 @@ +#!/usr/bin/env bash +# Note, this is not my original work. Source: https://linuxtldr.com/changing-interface-mode/ + +function list_nic_info () { + ip addr show +} + +function enable_monm_iw () { + interface=$1 + sudo ip link set "$interface" down + sudo iw "$interface" set monitor control + sudo ip link set "$interface" up +} + +function disable_monm_iw () { + interface=$1 + sudo ip link set "$interface" down + sudo iw "$interface" set type managed + sudo ip link set "$interface" up +} + +function enable_monm_iwconfig () { + interface=$1 + sudo ifconfig "$interface" down + sudo iwconfig "$interface" mode monitor + sudo ifconfig "$interface" up +} + +function disable_monm_iwconfig () { + interface=$1 + sudo ifconfig "$interface" down + sudo iwconfig "$interface" mode managed + sudo ifconfig "$interface" up +} + +function enable_monm_acng () { + interface=$1 + sudo airmon-ng check + sudo airmon-ng check kill + sudo airmon-ng start "$interface" +} + +function disable_monm_acng () { + interface="${1}mon" + sudo airmon-ng stop "$interface" + sudo systemctl restart NetworkManager +} + +if declare -f "$1" > /dev/null +then + "$@" +else + echo "Unknown function '$1'" >&2 + exit 1 +fi \ No newline at end of file diff --git a/code/iottb/utils/diagram1.py b/code/iottb/utils/diagram1.py new file mode 100644 index 0000000..28a9657 --- /dev/null +++ b/code/iottb/utils/diagram1.py @@ -0,0 +1,29 @@ +import matplotlib.pyplot as plt +import networkx as nx + +# Create the graph +G1 = nx.DiGraph() + +# Add nodes with positions +G1.add_node("IoT Device", pos=(1, 3)) +G1.add_node("AP", pos=(3, 3)) +G1.add_node("Switch (Port Mirroring Enabled)", pos=(5, 3)) +G1.add_node("Gateway Router", pos=(7, 3)) +G1.add_node("Internet", pos=(9, 3)) +G1.add_node("Capture Device", pos=(5, 1)) + +# Add edges +G1.add_edge("IoT Device", "AP") +G1.add_edge("AP", "Switch (Port Mirroring Enabled)") +G1.add_edge("Switch (Port Mirroring Enabled)", "Gateway Router") +G1.add_edge("Gateway Router", "Internet") +G1.add_edge("Switch (Port Mirroring Enabled)", "Capture Device") + +# Draw the graph +pos = nx.get_node_attributes(G1, 'pos') +plt.figure(figsize=(12, 8)) +nx.draw(G1, pos, with_labels=True, node_size=3000, node_color='lightblue', font_size=10, font_weight='bold') +nx.draw_networkx_edge_labels(G1, pos, edge_labels={("Switch (Port Mirroring Enabled)", "Capture Device"): "Mirrored Traffic"}, font_color='red') + +plt.title("IoT Device Connected via AP to Gateway Router via Switch with Port Mirroring Enabled") +plt.show() diff --git a/code/iottb/utils/diagramm2.py b/code/iottb/utils/diagramm2.py new file mode 100644 index 0000000..4e723da --- /dev/null +++ b/code/iottb/utils/diagramm2.py @@ -0,0 +1,27 @@ +import matplotlib.pyplot as plt +import networkx as nx + +# Create the graph +G2 = nx.DiGraph() + +# Add nodes with positions +G2.add_node("IoT Device", pos=(1, 3)) +G2.add_node("Capture Device (Hotspot)", pos=(3, 3)) +G2.add_node("Ethernet Connection", pos=(5, 3)) +G2.add_node("Gateway Router", pos=(7, 3)) +G2.add_node("Internet", pos=(9, 3)) + +# Add edges +G2.add_edge("IoT Device", "Capture Device (Hotspot)") +G2.add_edge("Capture Device (Hotspot)", "Ethernet Connection") +G2.add_edge("Ethernet Connection", "Gateway Router") +G2.add_edge("Gateway Router", "Internet") + +# Draw the graph +pos = nx.get_node_attributes(G2, 'pos') +plt.figure(figsize=(12, 8)) +nx.draw(G2, pos, with_labels=True, node_size=3000, node_color='lightblue', font_size=10, font_weight='bold') +nx.draw_networkx_edge_labels(G2, pos, edge_labels={("Capture Device (Hotspot)", "Ethernet Connection"): "Bridged Traffic"}, font_color='red') + +plt.title("Capture Device Provides Hotspot and Bridges to Ethernet for Internet") +plt.show() diff --git a/notes/journal/Untitled.md b/notes/journal/Untitled.md new file mode 100644 index 0000000..e69de29