Compare commits

...

No commits in common. "main" and "refactor" have entirely different histories.

182 changed files with 369 additions and 33706 deletions

5
.gitignore vendored
View File

@ -1,13 +1,11 @@
__pycache__ __pycache__
.venv .venv
iottb.egg-info iottb.egg-info
.idea/ .idea
*.log *.log
logs/ logs/
*.pyc *.pyc
.obsidian .obsidian
dist/
build/
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider # Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 # Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
@ -35,4 +33,3 @@ build/
.idea/**/dbnavigator.xml .idea/**/dbnavigator.xml
.private/ .private/
*.pcap

View File

@ -1,31 +1,9 @@
# IOTTB # Iottb
## Basic Invocation
Hello! This is the README file that accompanies the Gitlab repository for your Bachelor or Master thesis. You'll need to update this README as you work on your thesis to reflect relevant information about your thesis. ## Configuration
### Env Vars
- IOTTB_CONF_HOME
[[_TOC_]] By setting this variable you control where the basic iottb application
configuration should be looked for
## Organization of the repository
- **code** folder: holds source code
- **data** folder: Holds no relevant data for this thesis. Files in here where used for debugging and testing.
- **thesis** folder: contains the latex sources + PDF of the final thesis.
- **presentation** folder: contains PDF and sources of the presentation.
- **literature** used can be found in the **thesis** folder .bib or in the **presentation** folders .bib file.
- **notes** folder: Various notes and the beginnings of a wiki.
- `iottb` is the python testbed as a single executable (including python interpreter) which should be able to run on Linux machines.
## Description
In this thesis I design a automation testbed for IoT devices.
The main result is the software `iottb` which automates some aspects of experimenting with IoT devices.
Currently, it implements a database guided by the FAIR principles of open data as well as wraps tcpdump such that metadata is stored.
## Usage
For more info see `code/iottb-project/README.md`.
As well as examples in the thesis writeup at `thesis/BScThesisUnibas_main-5.pdf`. <br>
In general:
```bash
iottb --help # Most general overview
iottb <subcommand> --help
```
## License
The code is licensed under a BSD 3-clause license, a copy of which is provided in the file `code/iottb-project/LICENSE`.

View File

View File

@ -1,38 +0,0 @@
import json
from pathlib import Path
from iottb.definitions import ReturnCodes
def set_device_ip_address(ip_addr: str, file_path: Path):
assert ip_addr is not None
assert file_path.is_file()
with file_path.open('r') as f:
data = json.load(f)
current_ip = data['device_ip_address']
if current_ip is not None:
print(f'Device IP Address is set to {current_ip}')
response = input(f'Do you want to change the recorded IP address to {ip_addr}? [Y/N] ')
if response.upper() == 'N':
print('Aborting change to device IP address')
return ReturnCodes.ABORTED
with file_path.open('w') as f:
json.dump(data, f)
return ReturnCodes.SUCCESS
def set_device_mac_address(mac_addr: str, file_path: Path):
assert mac_addr is not None
assert file_path.is_file()
with file_path.open('r') as f:
data = json.load(f)
current_mac = data['device_mac_address']
if current_mac is not None:
print(f'Device MAC Address is set to {current_mac}')
response = input(f'Do you want to change the recorded MAC address to {mac_addr}? [Y/N] ')
if response.upper() == 'N':
print('Aborting change to device MAC address')
return ReturnCodes.ABORTED
with file_path.open('w') as f:
json.dump(data, f)
return ReturnCodes.SUCCESS

View File

@ -1,51 +0,0 @@
import json
from datetime import datetime
from pathlib import Path
from iottb.definitions import ReturnCodes
def update_firmware_version(version: str, file_path: Path):
assert file_path.is_file()
with file_path.open('r') as file:
metadata = json.load(file)
metadata['device_firmware_version'] = version
metadata['date_updated'] = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
with file_path.open('w') as file:
json.dump(metadata, file)
return ReturnCodes.SUCCESS
def add_capture_file_reference(capture_file_reference: str, file_path: Path):
assert file_path.is_file()
with file_path.open('r') as file:
metadata = json.load(file)
metadata['capture_files'] = capture_file_reference
metadata['date_updated'] = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
with file_path.open('w') as file:
json.dump(metadata, file)
return ReturnCodes.SUCCESS
def update_device_serial_number(device_id: str, file_path: Path):
assert file_path.is_file()
with file_path.open('r') as file:
metadata = json.load(file)
metadata['device_id'] = device_id
metadata['date_updated'] = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
with file_path.open('w') as file:
json.dump(metadata, file)
return ReturnCodes.SUCCESS
def update_device_type(device_type: str, file_path: Path):
assert file_path.is_file()
with file_path.open('r') as file:
metadata = json.load(file)
metadata['device_type'] = device_type
metadata['date_updated'] = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
with file_path.open('w') as file:
json.dump(metadata, file)
return ReturnCodes.SUCCESS

View File

@ -1,59 +0,0 @@
def setup_sniff_tcpdump_parser(parser_sniff):
# arguments which will be passed to tcpdump
parser_sniff_tcpdump = parser_sniff.add_argument_group('tcpdump arguments')
# TODO: tcpdump_parser.add_argument('-c', '--count', re)
parser_sniff_tcpdump.add_argument('-a', '--ip-address=', help='IP address of the device to sniff', dest='device_ip')
parser_sniff_tcpdump.add_argument('-i', '--interface=', help='Interface of the capture device.', dest='capture_interface',default='')
parser_sniff_tcpdump.add_argument('-I', '--monitor-mode', help='Put interface into monitor mode',
action='store_true')
parser_sniff_tcpdump.add_argument('-n', help='Deactivate name resolution. Option is set by default.',
action='store_true')
parser_sniff_tcpdump.add_argument('-#', '--number',
help='Print packet number at beginning of line. Set by default.',
action='store_true')
parser_sniff_tcpdump.add_argument('-e', help='Print link layer headers. Option is set by default.',
action='store_true')
parser_sniff_tcpdump.add_argument('-t', action='count', default=0,
help='Please see tcpdump manual for details. Unused by default.')
def setup_sniff_parser(subparsers):
# create parser for 'sniff' command
parser_sniff = subparsers.add_parser('sniff', help='Start tcpdump capture.')
setup_sniff_tcpdump_parser(parser_sniff)
setup_pcap_filter_parser(parser_sniff)
cap_size_group = parser_sniff.add_mutually_exclusive_group(required=True)
cap_size_group.add_argument('-c', '--count', type=int, help='Number of packets to capture.', default=0)
cap_size_group.add_argument('--mins', type=int, help='Time in minutes to capture.', default=60)
def setup_pcap_filter_parser(parser_sniff):
parser_pcap_filter = parser_sniff.add_argument_parser('pcap-filter expression')
pass
def check_iottb_env():
# This makes the option '--root-dir' obsolescent # TODO How to streamline this?\
try:
iottb_home = environ['IOTTB_HOME'] # TODO WARN implicit declaration of env var name!
except KeyError:
logger.error(f"Environment variable 'IOTTB_HOME' is not set."
f"Setting environment variable 'IOTTB_HOME' to '~/{IOTTB_HOME_ABS}'")
environ['IOTTB_HOME'] = IOTTB_HOME_ABS
finally:
if not Path(IOTTB_HOME_ABS).exists():
print(f'"{IOTTB_HOME_ABS}" does not exist.')
response = input('Do you want to create it now? [y/N]')
logger.debug(f'response: {response}')
if response.lower() != 'y':
logger.debug(f'Not setting "IOTTB_HOME"')
print('TODO')
print("Aborting execution...")
return ReturnCodes.ABORTED
else:
print(f'Setting environment variable IOTTB_HOME""')
Path(IOTTB_HOME_ABS).mkdir(parents=True,
exist_ok=False) # Should always work since in 'not exist' code path
return ReturnCodes.SUCCESS
logger.info(f'"{IOTTB_HOME_ABS}" exists.')
# TODO: Check that it is a valid iottb dir or can we say it is valid by definition if?
return ReturnCodes.SUCCESS

View File

@ -1,107 +0,0 @@
#!/usr/bin/env python3
import argparse
from os import environ
from pathlib import Path
import logging
from archive.iottb.subcommands.add_device import setup_init_device_root_parser
# from iottb.subcommands.capture import setup_capture_parser
from iottb.subcommands.sniff import setup_sniff_parser
from iottb.utils.tcpdump_utils import list_interfaces
from iottb.logger import setup_logging
logger = logging.getLogger('iottbLogger.__main__')
logger.setLevel(logging.DEBUG)
######################
# Argparse setup
######################
def setup_argparse():
# create top level parser
root_parser = argparse.ArgumentParser(prog='iottb')
# shared options
root_parser.add_argument('--verbose', '-v', action='count', default=0)
root_parser.add_argument('--script-mode', action='store_true', help='Run in script mode (non-interactive)')
# Group of args w.r.t iottb.db creation
group = root_parser.add_argument_group('database options')
group.add_argument('--db-home', default=Path.home() / 'IoTtb.db')
group.add_argument('--config-home', default=Path.home() / '.config' / 'iottb.conf', type=Path, )
group.add_argument('--user', default=Path.home().stem, type=Path, )
# configure subcommands
subparsers = root_parser.add_subparsers(title='subcommands', required=True, dest='command')
# setup_capture_parser(subparsers)
setup_init_device_root_parser(subparsers)
setup_sniff_parser(subparsers)
# Utility to list interfaces directly with iottb instead of relying on external tooling
interfaces_parser = subparsers.add_parser('list-interfaces', aliases=['li', 'if'],
help='List available network interfaces.')
interfaces_parser.set_defaults(func=list_interfaces)
return root_parser
###
# Where put ?!
###
class IoTdb:
def __init__(self, db_home=Path.home() / 'IoTtb.db', iottb_config=Path.home() / '.conf' / 'iottb.conf',
user=Path.home().stem):
self.db_home = db_home
self.config_home = iottb_config
self.default_filters_home = db_home / 'default_filters'
self.user = user
def create_db(self, mode=0o777, parents=False, exist_ok=False):
logger.info(f'Creating db at {self.db_home}')
try:
self.db_home.mkdir(mode=mode, parents=parents, exist_ok=exist_ok)
except FileExistsError:
logger.error(f'Database path already at {self.db_home} exists and is not a directory')
finally:
logger.debug(f'Leaving finally clause in create_db')
def create_device_tree(self, mode=0o777, parents=False, exist_ok=False):
logger.info(f'Creating device tree at {self.db_home / 'devices'}')
#TODO
def parse_db_config(self):
pass
def parse_iottb_config(self):
pass
def get_known_devices(self):
pass
def iottb_db_exists(db_home=Path.home() / 'IoTtb.db'):
res = db_home.is_dir()
def main():
logger.debug(f'Pre setup_argparse()')
parser = setup_argparse()
logger.debug('Post setup_argparse().')
args = parser.parse_args()
logger.debug(f'Args parsed: {args}')
if args.command:
try:
args.func(args)
except KeyboardInterrupt:
print('Received keyboard interrupt. Exiting...')
exit(1)
except Exception as e:
logger.debug(f'Error in main: {e}')
print(f'Error: {e}')
# create_capture_directory(args.device_name)
if __name__ == '__main__':
setup_logging()
logger.debug("Debug level is working")
logger.info("Info level is working")
logger.warning("Warning level is working")
main()

View File

@ -1,41 +0,0 @@
from datetime import datetime
from enum import Flag, unique, global_enum
from pathlib import Path
'''
Defining IOTTB_HOME_ABS here implies that it be immutable.
It is used here so that one could configure it.
But after its used in __man__ this cannot be relied upon.
'''
IOTTB_HOME_ABS = Path().home() / 'IOTTB.db'
# TODO maybe wrap this into class to make it easier to pass along to different objects
# But will need more refactoring
DEVICE_METADATA_FILE = 'device_metadata.json'
CAPTURE_METADATA_FILE = 'capture_metadata.json'
TODAY_DATE_STRING = datetime.now().strftime('%d%b%Y').lower() # TODO convert to function in utils or so
CAPTURE_FOLDER_BASENAME = 'capture_###'
AFFIRMATIVE_USER_RESPONSE = {'yes', 'y', 'true', 'Y', 'Yes', 'YES'}
NEGATIVE_USER_RESPONSE = {'no', 'n', 'N', 'No'}
YES_DEFAULT = AFFIRMATIVE_USER_RESPONSE.union({'', ' '})
NO_DEFAULT = NEGATIVE_USER_RESPONSE.union({'', ' '})
@unique
@global_enum
class ReturnCodes(Flag):
SUCCESS = 0
ABORTED = 1
FAILURE = 2
UNKNOWN = 3
FILE_NOT_FOUND = 4
FILE_ALREADY_EXISTS = 5
INVALID_ARGUMENT = 6
INVALID_ARGUMENT_VALUE = 7
def iottb_home_abs():
return None

View File

@ -1,35 +0,0 @@
import logging
import sys
import os
from logging.handlers import RotatingFileHandler
def setup_logging():
# Ensure the logs directory exists
log_directory = 'logs'
if not os.path.exists(log_directory):
os.makedirs(log_directory)
# Create handlers
file_handler = RotatingFileHandler(os.path.join(log_directory, 'iottb.log'), maxBytes=1048576, backupCount=5)
console_handler = logging.StreamHandler(sys.stdout)
# Create formatters and add it to handlers
file_fmt = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
console_fmt = logging.Formatter(
'%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s')
file_handler.setFormatter(file_fmt)
console_handler.setFormatter(console_fmt)
# Get the root logger and add handlers
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
root_logger.addHandler(file_handler)
root_logger.addHandler(console_handler)
# Prevent propagation to the root logger to avoid duplicate logs
root_logger.propagate = False
setup_logging()

View File

@ -1,106 +0,0 @@
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Optional
from iottb.definitions import ReturnCodes, CAPTURE_METADATA_FILE
from iottb.models.device_metadata_model import DeviceMetadata
import logging
logger = logging.getLogger('iottbLogger.capture_metadata_model')
logger.setLevel(logging.DEBUG)
class CaptureMetadata:
# Required Fields
device_metadata: DeviceMetadata
device_id: str
capture_dir: Path
capture_file: str
# Statistics
start_time: str
stop_time: str
# tcpdump
packet_count: Optional[int]
pcap_filter: str = ''
tcpdump_command: str = ''
interface: str = ''
# Optional Fields
device_ip_address: str = 'No IP Address set'
device_mac_address: Optional[str] = None
app: Optional[str] = None
app_version: Optional[str] = None
firmware_version: Optional[str] = None
def __init__(self, device_metadata: DeviceMetadata, capture_dir: Path):
logger.info(f'Creating CaptureMetadata model from DeviceMetadata: {device_metadata}')
self.device_metadata = device_metadata
self.capture_id = str(uuid.uuid4())
self.capture_date = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
self.capture_dir = capture_dir
assert capture_dir.is_dir(), f'Capture directory {capture_dir} does not exist'
def build_capture_file_name(self):
logger.info(f'Building capture file name')
if self.app is None:
logger.debug(f'No app specified')
prefix = "iphone-14" #self.device_metadata.device_short_name
else:
logger.debug(f'App specified: {self.app}')
assert str(self.app).strip() not in {'', ' '}, f'app is not a valid name: {self.app}'
prefix = self.app.lower().replace(' ', '_')
# assert self.capture_dir is not None, f'{self.capture_dir} does not exist'
filename = f'{prefix}_{str(self.capture_id)}.pcap'
logger.debug(f'Capture file name: {filename}')
self.capture_file = filename
def save_capture_metadata_to_json(self, file_path: Path = Path(CAPTURE_METADATA_FILE)):
assert self.capture_dir.is_dir(), f'capture_dir is not a directory: {self.capture_dir}'
if file_path.is_file():
print(f'File {file_path} already exists, update instead.')
return ReturnCodes.FILE_ALREADY_EXISTS
metadata = self.to_json(indent=2)
with file_path.open('w') as file:
json.dump(metadata, file)
return ReturnCodes.SUCCESS
def to_json(self, indent=2):
# TODO: Where to validate data?
logger.info(f'Converting CaptureMetadata to JSON')
data = {}
# List of fields from CaptureData class, if fields[key]==True, then it is a required field
fields = {
'capture_id': True, #
'device_id': True,
'capture_dir': True,
'capture_file': False,
'capture_date': False,
'start_time': True,
'stop_time': True,
'packet_count': False,
'pcap_filter': False,
'tcpdump_command': False,
'interface': False,
'device_ip_address': False,
'device_mac_address': False,
'app': False,
'app_version': False,
'firmware_version': False
}
for field, is_mandatory in fields.items():
value = getattr(self, field, None)
if value not in [None, ''] or is_mandatory:
if value in [None, ''] and is_mandatory:
raise ValueError(f'Field {field} is required and cannot be empty.')
data[field] = str(value) if not isinstance(value, str) else value
logger.debug(f'Capture metadata: {data}')
return json.dumps(data, indent=indent)

View File

@ -1,114 +0,0 @@
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Optional, List
# iottb modules
from iottb.definitions import ReturnCodes, DEVICE_METADATA_FILE
import logging
logger = logging.getLogger('iottbLogger.device_metadata_model')
logger.setLevel(logging.DEBUG)
# 3rd party libs
IMMUTABLE_FIELDS = {'device_name', 'device_short_name', 'device_id', 'date_created'}
class DeviceMetadata:
# Required fields
device_name: str
device_short_name: str
device_id: str
date_created: str
device_root_path: Path
# Optional Fields
aliases: Optional[List[str]] = None
device_type: Optional[str] = None
device_serial_number: Optional[str] = None
device_firmware_version: Optional[str] = None
date_updated: Optional[str] = None
capture_files: Optional[List[str]] = []
def __init__(self, device_name: str, device_root_path: Path):
self.device_name = device_name
self.device_short_name = device_name.lower().replace(' ', '_')
self.device_id = str(uuid.uuid4())
self.date_created = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
self.device_root_path = device_root_path
if not self.device_root_path or not self.device_root_path.is_dir():
logger.error(f'Invalid device root path: {device_root_path}')
raise ValueError(f'Invalid device root path: {device_root_path}')
logger.debug(f'Device name: {device_name}')
logger.debug(f'Device short_name: {self.device_short_name}')
logger.debug(f'Device root dir: {device_root_path}')
logger.info(f'Initialized DeviceMetadata model: {device_name}')
@classmethod
def load_from_json(cls, device_file_path: Path):
logger.info(f'Loading DeviceMetadata from JSON file: {device_file_path}')
assert device_file_path.is_file(), f'{device_file_path} is not a file'
assert device_file_path.name == DEVICE_METADATA_FILE, f'{device_file_path} is not a {DEVICE_METADATA_FILE}'
device_meta_filename = device_file_path
with device_meta_filename.open('r') as file:
metadata_json = json.load(file)
metadata_model_obj = cls.from_json(metadata_json)
return metadata_model_obj
def save_to_json(self, file_path: Path):
logger.info(f'Saving DeviceMetadata to JSON file: {file_path}')
if file_path.is_file():
print(f'File {file_path} already exists, update instead.')
return ReturnCodes.FILE_ALREADY_EXISTS
metadata = self.to_json(indent=2)
with file_path.open('w') as file:
json.dump(metadata, file)
return ReturnCodes.SUCCESS
@classmethod
def from_json(cls, metadata_json):
if isinstance(metadata_json, dict):
return DeviceMetadata(**metadata_json)
def to_json(self, indent=2):
# TODO: atm almost exact copy as in CaptureMetadata
data = {}
fields = {
'device_name': True,
'device_short_name': True,
'device_id': True,
'date_created': True,
'device_root_path': True,
'aliases': False,
'device_type': False,
'device_serial_number': False,
'device_firmware_version': False,
'date_updated': False,
'capture_files': False,
}
for field, is_mandatory in fields.items():
value = getattr(self, field, None)
if value not in [None, ''] or is_mandatory:
if value in [None, ''] and is_mandatory:
logger.debug(f'Mandatory field {field}: {value}')
raise ValueError(f'Field {field} is required and cannot be empty.')
data[field] = str(value) if not isinstance(value, str) else value
logger.debug(f'Device metadata: {data}')
return json.dumps(data, indent=indent)
def dir_contains_device_metadata(dir_path: Path):
if not dir_path.is_dir():
return False
else:
meta_file_path = dir_path / DEVICE_METADATA_FILE
print(f'Device metadata file path {str(meta_file_path)}')
if not meta_file_path.is_file():
return False
else:
return True

View File

@ -1,77 +0,0 @@
import logging
import os
import pathlib
from iottb import definitions
from iottb.definitions import DEVICE_METADATA_FILE, ReturnCodes
from iottb.models.device_metadata_model import DeviceMetadata
# logger.setLevel(logging.INFO) # Since module currently passes all tests
logger = logging.getLogger('iottbLogger.add_device')
logger.setLevel(logging.INFO)
def setup_init_device_root_parser(subparsers):
#assert os.environ['IOTTB_HOME'] is not None, "IOTTB_HOME environment variable is not set"
parser = subparsers.add_parser('add-device', aliases=['add-device-root', 'add'],
help='Initialize a folder for a device.')
parser.add_argument('--root_dir', type=pathlib.Path,
default=definitions.IOTTB_HOME_ABS) # TODO: Refactor code to not use this or handle iottb here
group = parser.add_mutually_exclusive_group()
group.add_argument('--guided', action='store_true', help='Guided setup', default=False)
group.add_argument('--name', action='store', type=str, help='name of device')
parser.set_defaults(func=handle_add)
def handle_add(args):
# TODO: This whole function should be refactored into using the fact that IOTTB_HOME is set, and the dir exists
logger.info(f'Add device handler called with args {args}')
if args.guided:
logger.debug('begin guided setup')
metadata = guided_setup(args.root_dir) # TODO refactor to use IOTTB_HOME
logger.debug('guided setup complete')
else:
logger.debug('Setup through passed args: setup')
if not args.name:
logger.error('No device name specified with unguided setup.')
return ReturnCodes.ERROR
metadata = DeviceMetadata(args.name, args.root_dir)
file_path = args.root_dir / DEVICE_METADATA_FILE # TODO IOTTB_HOME REFACTOR
if file_path.exists():
print('Directory already contains a metadata file. Aborting.')
return ReturnCodes.ABORTED
serialized_metadata = metadata.to_json()
response = input(f'Confirm device metadata: {serialized_metadata} [y/N]')
logger.debug(f'response: {response}')
if response not in definitions.AFFIRMATIVE_USER_RESPONSE:
print('Adding device aborted by user.')
return ReturnCodes.ABORTED
logger.debug(f'Device metadata file {file_path}')
if metadata.save_to_json(file_path) == ReturnCodes.FILE_ALREADY_EXISTS:
logger.error('File exists after checking, which should not happen.')
return ReturnCodes.ABORTED
print('Device metadata successfully created.')
return ReturnCodes.SUCCESS
def configure_metadata():
pass
def guided_setup(device_root) -> DeviceMetadata:
logger.info('Guided setup')
response = 'N'
device_name = ''
while response.upper() == 'N':
device_name = input('Please enter name of device: ')
response = input(f'Confirm device name: {device_name} [y/N] ')
if device_name == '' or device_name is None:
print('Name cannot be empty')
logger.warning('Name cannot be empty')
logger.debug(f'Response is {response}')
logger.debug(f'Device name is {device_name}')
return DeviceMetadata(device_name, device_root)

View File

@ -1,179 +0,0 @@
import subprocess
from pathlib import Path
from iottb.definitions import *
import logging
from iottb.models.capture_metadata_model import CaptureMetadata
from iottb.models.device_metadata_model import DeviceMetadata, dir_contains_device_metadata
from iottb.utils.capture_utils import get_capture_src_folder, make_capture_src_folder
from iottb.utils.tcpdump_utils import check_installed
logger = logging.getLogger('iottbLogger.capture')
logger.setLevel(logging.DEBUG)
def setup_capture_parser(subparsers):
parser = subparsers.add_parser('sniff', help='Sniff packets with tcpdump')
# metadata args
parser.add_argument('-a', '--ip-address', help='IP address of the device to sniff', dest='device_ip')
# tcpdump args
parser.add_argument('device_root', help='Root folder for device to sniff',
type=Path, default=Path.cwd())
parser.add_argument('-s', '--safe', help='Ensure correct device root folder before sniffing', action='store_true')
parser.add_argument('--app', help='Application name to sniff', dest='app_name', default=None)
parser_sniff_tcpdump = parser.add_argument_group('tcpdump arguments')
parser_sniff_tcpdump.add_argument('-i', '--interface', help='Interface to capture on.', dest='capture_interface',
required=True)
parser_sniff_tcpdump.add_argument('-I', '--monitor-mode', help='Put interface into monitor mode',
action='store_true')
parser_sniff_tcpdump.add_argument('-n', help='Deactivate name resolution. True by default.',
action='store_true', dest='no_name_resolution')
parser_sniff_tcpdump.add_argument('-#', '--number',
help='Print packet number at beginning of line. True by default.',
action='store_true')
parser_sniff_tcpdump.add_argument('-e', help='Print link layer headers. True by default.',
action='store_true', dest='print_link_layer')
parser_sniff_tcpdump.add_argument('-t', action='count', default=0,
help='Please see tcpdump manual for details. Unused by default.')
cap_size_group = parser.add_mutually_exclusive_group(required=False)
cap_size_group.add_argument('-c', '--count', type=int, help='Number of packets to capture.', default=10)
cap_size_group.add_argument('--mins', type=int, help='Time in minutes to capture.', default=1)
parser.set_defaults(func=handle_capture)
def cwd_is_device_root_dir() -> bool:
device_metadata_file = Path.cwd() / DEVICE_METADATA_FILE
return device_metadata_file.is_file()
def start_guided_device_root_dir_setup():
assert False, 'Not implemented'
def handle_metadata():
assert not cwd_is_device_root_dir()
print(f'Unable to find {DEVICE_METADATA_FILE} in current working directory')
print('You need to setup a device root directory before using this command')
response = input('Would you like to be guided through the setup? [y/n]')
if response.lower() == 'y':
start_guided_device_root_dir_setup()
else:
print('\'iottb init-device-root --help\' for more information.')
exit(ReturnCodes.ABORTED)
# device_id = handle_capture_metadata()
return ReturnCodes.SUCCESS
def get_device_metadata_from_file(device_metadata_filename: Path) -> str:
assert device_metadata_filename.is_file(), f'Device metadata file f"{device_metadata_filename}" does not exist'
device_metadata = DeviceMetadata.load_from_json(device_metadata_filename)
return device_metadata
def run_tcpdump(cmd):
# TODO: Maybe specify files for stout and stderr
try:
p = subprocess.run(cmd, capture_output=True, text=True, check=True)
if p.returncode != 0:
print(f'Error running tcpdump {p.stderr}')
# TODO add logging
else:
print(f'tcpdump run successfully\n: {p.stdout}')
except KeyboardInterrupt:
pass
def handle_capture(args):
if not check_installed():
print('Please install tcpdump first')
exit(ReturnCodes.ABORTED)
assert args.device_root is not None, f'Device root directory is required'
assert dir_contains_device_metadata(args.device_root), f'Device metadata file \'{args.device_root}\' does not exist'
# get device metadata
logger.info(f'Device root directory: {args.device_root}')
if args.safe and not dir_contains_device_metadata(args.device_root):
print(f'Supplied folder contains no device metadata. '
f'Please setup a device root directory before using this command')
exit(ReturnCodes.ABORTED)
elif dir_contains_device_metadata(args.device_root):
device_metadata_filename = args.device_root / DEVICE_METADATA_FILE
device_data = DeviceMetadata.load_from_json(device_metadata_filename)
else:
name = input('Please enter a device name: ')
args.device_root.mkdir(parents=True, exist_ok=True)
device_data = DeviceMetadata(name, args.device_root)
# start constructing environment for capture
capture_dir = get_capture_src_folder(args.device_root)
make_capture_src_folder(capture_dir)
capture_metadata = CaptureMetadata(device_data, capture_dir)
capture_metadata.interface = args.capture_interface
cmd = ['sudo', 'tcpdump', '-i', args.capture_interface]
cmd = build_tcpdump_args(args, cmd, capture_metadata)
capture_metadata.tcpdump_command = cmd
print('Executing: ' + ' '.join(cmd))
# run capture
try:
start_time = datetime.now().strftime('%H:%M:%S')
run_tcpdump(cmd)
stop_time = datetime.now().strftime('%H:%M:%S')
capture_metadata.start_time = start_time
capture_metadata.stop_time = stop_time
except KeyboardInterrupt:
print('Received keyboard interrupt.')
exit(ReturnCodes.ABORTED)
except subprocess.CalledProcessError as e:
print(f'Failed to capture packet: {e}')
exit(ReturnCodes.FAILURE)
except Exception as e:
print(f'Failed to capture packet: {e}')
exit(ReturnCodes.FAILURE)
return ReturnCodes.SUCCESS
def build_tcpdump_args(args, cmd, capture_metadata: CaptureMetadata):
if args.monitor_mode:
cmd.append('-I')
if args.no_name_resolution:
cmd.append('-n')
if args.number:
cmd.append('-#')
if args.print_link_layer:
cmd.append('-e')
if args.count:
cmd.append('-c')
cmd.append(str(args.count))
elif args.mins:
assert False, 'Unimplemented option'
if args.app_name is not None:
capture_metadata.app = args.app_name
capture_metadata.build_capture_file_name()
cmd.append('-w')
cmd.append(str(capture_metadata.capture_dir) + "/" + capture_metadata.capture_file)
if args.safe:
cmd.append(f'host {args.device_ip}') # if not specified, filter 'any' implied by tcpdump
capture_metadata.device_id = args.device_ip
return cmd
# def capture_file_cmd(args, cmd, capture_dir, capture_metadata: CaptureMetadata):
# capture_file_prefix = capture_metadata.get_device_metadata().get_device_short_name()
# if args.app_name is not None:
# capture_file_prefix = args.app_name
# capture_metadata.set_app(args.app_name)
# capfile_name = capture_file_prefix + '_' + str(capture_metadata.get_capture_id()) + '.pcap'
# capture_metadata.set_capture_file(capfile_name)
# capfile_abs_path = capture_dir / capfile_name
# capture_metadata.set_capture_file(capfile_name)
# cmd.append('-w')
# cmd.append(str(capfile_abs_path))

View File

@ -1,63 +0,0 @@
import subprocess
import logging
logger = logging.getLogger('iottbLogger.capture')
logger.setLevel(logging.DEBUG)
class Sniffer:
def __init__(self):
pass
def setup_sniff_parser(subparsers):
parser = subparsers.add_parser('sniff', help='Sniff packets with tcpdump')
# metadata args
parser.add_argument('-a', '--addr', help='IP or MAC address of IoT device')
# tcpdump args
parser.add_argument('--app', help='Application name to sniff', default=None)
parser_sniff_tcpdump = parser.add_argument_group('tcpdump arguments')
parser_sniff_tcpdump.add_argument('-i', '--interface', help='Interface to capture on.', dest='capture_interface',
required=True)
parser_sniff_tcpdump.add_argument('-I', '--monitor-mode', help='Put interface into monitor mode',
action='store_true')
parser_sniff_tcpdump.add_argument('-n', help='Deactivate name resolution. True by default.',
action='store_true', dest='no_name_resolution')
parser_sniff_tcpdump.add_argument('-#', '--number',
help='Print packet number at beginning of line. True by default.',
action='store_true')
parser_sniff_tcpdump.add_argument('-e', help='Print link layer headers. True by default.',
action='store_true', dest='print_link_layer')
parser_sniff_tcpdump.add_argument('-t', action='count', default=0,
help='Please see tcpdump manual for details. Unused by default.')
cap_size_group = parser.add_mutually_exclusive_group(required=False)
cap_size_group.add_argument('-c', '--count', type=int, help='Number of packets to capture.', default=10)
cap_size_group.add_argument('--mins', type=int, help='Time in minutes to capture.', default=1)
parser.set_defaults(func=sniff)
def parse_addr(addr):
#TODO Implement
pass
def sniff(args):
if args.addr is None:
print('You must supply either a MAC or IP(v4) address to use this tool!')
logger.info("Exiting on account of missing MAC/IP.")
exit(1)
else:
(type, value) = parse_addr(args.addr)
#TODO Get this party started
def sniff_tcpdump(args, filter):
pass
def sniff_mitmproxy(args, filter):
pass
def sniff_raw(cmd,args):
pass

View File

@ -1,44 +0,0 @@
import uuid
from pathlib import Path
from iottb.models.device_metadata_model import dir_contains_device_metadata
from iottb.utils.utils import get_iso_date
def get_capture_uuid():
return str(uuid.uuid4())
def get_capture_date_folder(device_root: Path):
today_iso = get_iso_date()
today_folder = device_root / today_iso
if dir_contains_device_metadata(device_root):
if not today_folder.is_dir():
try:
today_folder.mkdir()
except FileExistsError:
print(f'Folder {today_folder} already exists')
return today_folder
raise FileNotFoundError(f'Given path {device_root} is not a device root directory')
def get_capture_src_folder(device_folder: Path):
assert device_folder.is_dir(), f'Given path {device_folder} is not a folder'
today_iso = get_iso_date()
max_sequence_number = 1
for d in device_folder.iterdir():
if d.is_dir() and d.name.startswith(f'{today_iso}_capture_'):
name = d.name
num = int(name.split('_')[2])
max_sequence_number = max(max_sequence_number, num)
next_sequence_number = max_sequence_number + 1
return device_folder.joinpath(f'{today_iso}_capture_{next_sequence_number:03}')
def make_capture_src_folder(capture_src_folder: Path):
try:
capture_src_folder.mkdir()
except FileExistsError:
print(f'Folder {capture_src_folder} already exists')
finally:
return capture_src_folder

View File

@ -1,41 +0,0 @@
import ipaddress
import shutil
import subprocess
from typing import Optional
def check_installed() -> bool:
"""Check if tcpdump is installed and available on the system path."""
return shutil.which('tcpdump') is not None
def ensure_installed():
"""Ensure that tcpdump is installed, raise an error if not."""
if not check_installed():
raise RuntimeError('tcpdump is not installed. Please install it to continue.')
def list_interfaces(args) -> str:
"""List available network interfaces using tcpdump."""
ensure_installed()
try:
result = subprocess.run(['tcpdump', '--list-interfaces'], capture_output=True, text=True, check=True)
print(result.stdout)
except subprocess.CalledProcessError as e:
print(f'Failed to list interfaces: {e}')
return ''
def is_valid_ipv4(ip: str) -> bool:
try:
ipaddress.IPv4Address(ip)
return True
except ValueError:
return False
def str_to_ipv4(ip: str) -> (bool, Optional[ipaddress]):
try:
address = ipaddress.IPv4Address(ip)
return address == ipaddress.IPv4Address(ip), address
except ipaddress.AddressValueError:
return False, None

View File

@ -1,18 +0,0 @@
import uuid
from datetime import datetime
from iottb.definitions import TODAY_DATE_STRING, DEVICE_METADATA_FILE, CAPTURE_METADATA_FILE
from pathlib import Path
def get_iso_date():
return datetime.now().strftime('%Y-%m-%d')
def subfolder_exists(parent: Path, child: str):
return parent.joinpath(child).exists()
def generate_unique_string_with_prefix(prefix: str):
return prefix + '_' + str(uuid.uuid4())

View File

@ -1,19 +0,0 @@
import json
import uuid
from datetime import datetime
class Metadata:
def __init__(self, name):
self.device = name
self.timestamp = datetime.now().timestamp()
self.capture_id = uuid.uuid4().hex
self.capture_mode = ... # TODO: eg. promiscuous/monitor/other
self.host_ip = ...
self.host_mac = ...
self.protocol = ...
def create_metadata(filename, unique_id, device_details):
date_string = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
meta_filename = f'meta_{date_string}_{unique_id}.json'

View File

@ -1,69 +0,0 @@
import json
from pathlib import Path
from pydantic import BaseModel
from iottb.models.device_metadata_model import DeviceMetadata
from iottb.definitions import DEVICE_METADATA_FILE
def write_device_metadata_to_file(metadata: DeviceMetadata, device_path: Path):
'''Write the device metadata to a JSON file in the specified directory.'''
meta_file_path = device_path / 'meta.json'
meta_file_path.write_text(metadata.json(indent=2))
def confirm_device_metadata(metadata: DeviceMetadata) -> bool:
'''Display device metadata for user confirmation.'''
print(metadata.json(indent=2))
return input('Confirm device metadata? (y/n): ').strip().lower() == 'y'
def get_device_metadata_from_user() -> DeviceMetadata:
'''Prompt the user to enter device details and return a populated DeviceMetadata object.'''
device_name = input('Device name: ')
device_short_name = device_name.lower().replace(' ', '-')
return DeviceMetadata(device_name=device_name, device_short_name=device_short_name)
def initialize_device_root_dir(device_name: str) -> Path:
'''Create and return the path for the device directory.'''
device_path = Path.cwd() / device_name
device_path.mkdir(exist_ok=True)
return device_path
def write_metadata(metadata: BaseModel, device_name: str):
'''Write device metadata to a JSON file.'''
meta_path = Path.cwd() / device_name / DEVICE_METADATA_FILE
meta_path.parent.mkdir(parents=True, exist_ok=True)
with meta_path.open('w') as f:
json.dump(metadata.dict(), f, indent=4)
def get_device_metadata(file_path: Path) -> DeviceMetadata | None:
'''Fetch device metadata from a JSON file.'''
if dev_metadata_exists(file_path):
with file_path.open('r') as f:
device_metadata_json = json.load(f)
try:
device_metadata = DeviceMetadata.from_json(device_metadata_json)
return device_metadata
except ValueError as e:
print(f'Validation error for device metadata: {e}')
else:
# TODO Decide what to do (e.g. search for file etc)
print(f'No device metadata at {file_path}')
return None
def search_device_metadata(filename=DEVICE_METADATA_FILE, start_dir=Path.cwd(), max_parents=3) -> Path:
pass # TODO
def dev_metadata_exists(file_path: Path) -> bool:
if file_path.is_file():
return True
else:
return False

View File

@ -1,6 +0,0 @@
interface=wlp0s20f0u1
dhcp-range=192.168.1.2,192.168.1.250,12h
# Gateway
dhcp-option=3,192.168.1.1
# Dns server addr
dhcp-option=6,192.168.1.1

View File

@ -1,6 +0,0 @@
#!
# Run as root
#
sysctl -w net.ipv4.conf.all.forwarding=1
sysctl -w net.ipv6.conf.all.forwading=1

View File

@ -1,8 +0,0 @@
interface=wlp0s20f0u1
driver=nl80211
ssid=t3u
hw_mode=g
channel=11
macaddr_acl=0
auth_algs=1
ignore_broadcast_ssid=0

View File

@ -1,12 +0,0 @@
interface=wlp0s20f0u1
driver=nl80211
ssid=t3u
hw_mode=g
channel=11
macaddr_acl=0
auth_algs=1
ignore_broadcast_ssid=0
wpa=2
wpa_passphrase=11help22help33
wpa_key_mgmt=WPA-PSK
rsn_pairwise=CCMP

View File

@ -1,35 +0,0 @@
#!/bin/bash
# DISCLAIMER! THIS CODE HAS BEEN TAKEN FROM:
# https://nims11.wordpress.com/2012/04/27/hostapd-the-linux-way-to-create-virtual-wifi-access-point/
# Usage: ./initSoftAP
########### Initial wifi interface configuration #############
ip link set $1 down
ip addr flush dev $1
ip link set $1 up
ip addr add 10.0.0.1/24 dev $1
# If you still use ifconfig for some reason, replace the above lines with the following
# ifconfig $1 up 10.0.0.1 netmask 255.255.255.0
sleep 2
###########
########### Start dnsmasq ##########
if [ -z "$(ps -e | grep dnsmasq)" ]
then
dnsmasq
fi
###########
########### Enable NAT ############
iptables -t nat -A POSTROUTING -o $2 -j MASQUERADE
iptables -A FORWARD -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
iptables -A FORWARD -i $1 -o $2 -j ACCEPT
#Thanks to lorenzo
#Uncomment the line below if facing problems while sharing PPPoE, see lorenzo's comment for more details
#iptables -I FORWARD -p tcp --tcp-flags SYN,RST SYN -j TCPMSS --clamp-mss-to-pmtu
sysctl -w net.ipv4.ip_forward=1
###########
########## Start hostapd ###########
hostapd $PWD/hostapd.conf ## TODO! either put config in normal place
#killall dnsmasq

View File

@ -1,36 +0,0 @@
#!/bin/bash
# DISCLAIMER! THIS CODE HAS BEEN TAKEN FROM:
# https://nims11.wordpress.com/2012/04/27/hostapd-the-linux-way-to-create-virtual-wifi-access-point/
# Usage: ./initSoftAP
########### Initial wifi interface configuration #############
ip link set $1 down
ip addr flush dev $1
ip link set $1 up
ip addr add 10.0.0.1/24 dev $1
# If you still use ifconfig for some reason, replace the above lines with the following
# ifconfig $1 up 10.0.0.1 netmask 255.255.255.0
sleep 2
###########
########### Start dnsmasq ##########
if [ -z "$(ps -e | grep dnsmasq)" ]
then
dnsmasq
fi
###########
########### Enable NAT ############
nft add table nat
nft -- add chain nat prerouting { type nat hook prerouting priority -100 \; }
nft add chain nat postrouting { type nat hook postrouting priority 100 \; }
nft add rule nat postrouting oifname wlp44s0 wlp masquerade
#Thanks to lorenzo
#Uncomment the line below if facing problems while sharing PPPoE, see lorenzo's comment for more details
#iptables -I FORWARD -p tcp --tcp-flags SYN,RST SYN -j TCPMSS --clamp-mss-to-pmtu
sysctl -w net.ipv4.ip_forward=1
###########
########## Start hostapd ###########
hostapd $PWD/hostapd.conf ## TODO! either put config in normal place
#killall dnsmasq

View File

@ -1,22 +0,0 @@
#! /bin/env bash
TYPE="wifi"
IFNAME="wlp0s20f0u1"
CONNAME="T3UminiConn"
SSID="T3Umini"
BAND="bg"
CHAN=1
KMGMT="wpa-psk"
PSK=11223344
nmcli con add type wifi ifname wlp0s20f0u1 mode ap con-name WIFI_AP_TEST ssid MY_AP_TEST &&
nmcli con modify WIFI_AP_TEST 802-11-wireless.band bg &&
nmcli con modify WIFI_AP_TEST 802-11-wireless.channel 1 &&
nmcli con modify WIFI_AP_TEST 802-11-wireless-security.key-mgmt wpa-psk &&
nmcli con modify WIFI_AP_TEST 802-11-wireless-security.pairwise ccmp &&
nmcli con modify WIFI_AP_TEST 802-11-wireless-security.psk 11223344 &&
nmcli con modify WIFI_AP_TEST ipv4.method shared && nmcli con up WIFI_AP_TEST
' nmcli con modify WIFI_AP_TEST 802-11-wireless-security.proto rsn &&
nmcli con modify WIFI_AP_TEST 802-11-wireless-security.group ccmp && NOT USED FOR APPLE`

View File

@ -1,16 +0,0 @@
[build-system]
requires = ["setuptools>=42", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = 'iottb'
version = '0.1.0'
authors = [{name = "Sebastian Lenzlinger", email = "sebastian.lenzlinger@unibas.ch"}]
description = "Automation Tool for Capturing Network packets of IoT devices."
requires-python = ">=3.8"
[tool.setuptools]
packages = ["iottb"]
[project.scripts]
iottb = "iottb.__main__:main"

View File

View File

@ -1,15 +0,0 @@
import pytest
import tempfile
from pathlib import Path
@pytest.fixture(scope='session')
def tmp_dir():
with tempfile.TemporaryDirectory() as tmp_dir:
yield Path(tmp_dir)
@pytest.fixture
def mock_device_metadata_json_(tmp_dir):
with tempfile.TemporaryDirectory() as tmp_dir:
pass

View File

@ -1,47 +0,0 @@
import sys
import unittest
from io import StringIO
from unittest.mock import patch, MagicMock
from pathlib import Path
from iottb.definitions import DEVICE_METADATA_FILE
import shutil
from iottb.__main__ import main
class TestDeviceMetadataFileCreation(unittest.TestCase):
def setUp(self):
self.test_dir = Path('/tmp/iottbtest/test_add_device')
self.test_dir.mkdir(parents=True, exist_ok=True)
# self.captured_output = StringIO()
# sys.stdout = self.captured_output
def tearDown(self):
# shutil.rmtree(str(self.test_dir))
for item in self.test_dir.iterdir():
if item.is_dir():
item.rmdir()
else:
item.unlink()
self.test_dir.rmdir()
# sys.stdout = sys.__stdout__
@patch('builtins.input', side_effect=['iPhone 14', 'y', 'y'])
def test_guided_device_setup(self, mock_input):
sys.argv = ['__main__.py', 'add', '--root_dir', str(self.test_dir), '--guided']
main()
expected_file = self.test_dir / DEVICE_METADATA_FILE
self.assertTrue(expected_file.exists()), f'Expected file not created: {expected_file}'
@patch('builtins.input', side_effect=['y']) # need mock_input else wont work
def test_device_setup(self, mock_input):
sys.argv = ['__main__.py', 'add', '--root_dir', str(self.test_dir), '--name', 'iPhone 14']
main()
expected_file = self.test_dir / DEVICE_METADATA_FILE
self.assertTrue(expected_file.exists()), f'Expected file not created: {expected_file}'
def test_add_when_file_exists(self):
# TODO
pass
if __name__ == '__main__':
unittest.main()

View File

@ -1,2 +0,0 @@
def test_save_to_json():
assert False

View File

@ -1,28 +0,0 @@
BSD 3-Clause License
Copyright (c) 2024, Sebastian Lenzlinger
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
3. Neither the name of the copyright holder nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -1,82 +0,0 @@
# Iottb
## Installation
There are a few different ways to install `iottb`.
In Linux, to install to a users local bin directory using poetry or pip:
- Move into the project root `cd path/to/iottb-project`, so that you are in the directory which contains the `pyproject.toml` file.
```bash
poetry install --editable
# or with pip
pip install -e .
```
Currently, this is the recommended method.
Alternatively install with pip into any activated environment:
```bash
pip install -r requirements.txt
```
It is possible to make a single executable for you machine which you can just put in your path using pyinstaller.
1. Install pyinstaller
```bash
pip install pyinstaller
```
2. Make the executable
```bash
pyinstaller --onefile --name iottb --distpath ~/opt iottb/main.py
```
to be able to run it as `iottb` if `~/opt' is a directory on your PATH.
A executable which should be able to run on linux is included in the repo.
## Basic Invocation
```bash
Usage: iottb [OPTIONS] COMMAND [ARGS]...
Options:
-v, --verbosity Set verbosity [default: 0; 0<=x<=3]
-d, --debug Enable debug mode
--cfg-file PATH Path to iottb config file [default:
/home/seb/.config/iottb/iottb.cfg]
--help Show this message and exit.
--dry-run BOOLEAN currently NOT USED! [default: True]
Commands:
add-device Add a device to a database
init-db
sniff Sniff packets with tcpdump
Debugging Commands:
show-all Show everything: configuration, databases, and...
show-cfg Show the current configuration context
```
## Usage Examples
### Initializing a database
Before devices can be added and packet captures performed, there must be a database.
Initialze a database with default values at the default location:
```bash
iottb init-db
```
### Adding a device
Typically, captures are performed for devices. To add a device (to the current default database)
```bash
iottb add-device 'Echo Dot 2'
```
if the devices is named 'Echo Dot 2'. This will get the cannonical name 'echo-dot'. This name should be used when performing
captures with `iottb`.
### Performing captures/sniffing traffic
```bash
iottb sniff -a <ipv4-addr or mac-addr> 'echo-dot'
```
to sniff traffic on the previously added device 'Echo Dot 2' which received the canonical name 'echo-dot'.
You can get the subcommand specif helptext by adding the `--help` option.
## Configuration
### Env Vars
- IOTTB_CONF_HOME
By setting this variable you control where the basic iottb application
configuration should be looked for
## License
This project is licensed under a BSD 3-clause License, a copy of which is provided in the file `code/iottb-project/LICENSE`.

View File

@ -1,110 +0,0 @@
Usage: iottb [OPTIONS] COMMAND [ARGS]...
Options:
-v, --verbosity Set verbosity [default: 0; 0<=x<=3]
-d, --debug Enable debug mode
--dry-run [default: True]
--cfg-file PATH Path to iottb config file [default:
/home/seb/.config/iottb/iottb.cfg]
--help Show this message and exit.
Commands:
add-device Add a device to a database
init-db
rm-cfg Removes the cfg file from the filesystem.
rm-dbs Removes ALL(!) databases from the filesystem if...
set-key-in-table-to Edit config or metadata files.
show-all Show everything: configuration, databases, and...
show-cfg Show the current configuration context
sniff Sniff packets with tcpdump
Usage: iottb init-db [OPTIONS]
Options:
-d, --dest PATH Location to put (new) iottb database
-n, --name TEXT Name of new database. [default: iottb.db]
--update-default / --no-update-default
If new db should be set as the new default
[default: update-default]
--help Show this message and exit.
Usage: iottb add-device [OPTIONS]
Add a device to a database
Options:
--dev, --device-name TEXT The name of the device to be added. If this
string contains spaces or other special
characters normalization is
performed to derive a canonical name [required]
--db, --database DIRECTORY Database in which to add this device. If not
specified use default from config. [env var:
IOTTB_DB]
--guided Add device interactively [env var:
IOTTB_GUIDED_ADD]
--help Show this message and exit.
Usage: iottb sniff [OPTIONS] [TCPDUMP-ARGS] [DEVICE]
Sniff packets with tcpdump
Options:
Testbed sources:
--db, --database TEXT Database of device. Only needed if not current
default. [env var: IOTTB_DB]
--app TEXT Companion app being used during capture
Runtime behaviour:
--unsafe Disable checks for otherwise required options.
[env var: IOTTB_UNSAFE]
--guided [env var: IOTTB_GUIDED]
--pre TEXT Script to be executed before main command is
started.
--post TEXT Script to be executed upon completion of main
command.
Tcpdump options:
-i, --interface TEXT Network interface to capture on.If not specified
tcpdump tries to find and appropriate one. [env
var: IOTTB_CAPTURE_INTERFACE]
-a, --address TEXT IP or MAC address to filter packets by. [env var:
IOTTB_CAPTURE_ADDRESS]
-I, --monitor-mode Put interface into monitor mode.
--ff TEXT tcpdump filter as string or file path. [env var:
IOTTB_CAPTURE_FILTER]
-#, --print-pacno Print packet number at beginning of line. True by
default. [default: True]
-e, --print-ll Print link layer headers. True by default.
-c, --count INTEGER Number of packets to capture. [default: 1000]
--help Show this message and exit.
Utility Commands mostly for development
Usage: iottb rm-cfg [OPTIONS]
Removes the cfg file from the filesystem.
This is mostly a utility during development. Once non-standard database
locations are implemented, deleting this would lead to iottb not being able
to find them anymore.
Options:
--yes Confirm the action without prompting.
--help Show this message and exit.
Usage: iottb rm-dbs [OPTIONS]
Removes ALL(!) databases from the filesystem if they're empty.
Development utility currently unfit for use.
Options:
--yes Confirm the action without prompting.
--help Show this message and exit.
Usage: iottb show-cfg [OPTIONS]
Show the current configuration context
Options:
--cfg-file PATH Path to the config file [default:
/home/seb/.config/iottb/iottb.cfg]
-pp Pretty Print
--help Show this message and exit.
Usage: iottb show-all [OPTIONS]
Show everything: configuration, databases, and device metadata
Options:
--help Show this message and exit.

View File

@ -1,38 +0,0 @@
Usage: iottb [OPTIONS] COMMAND [ARGS]...
Options:
-v, --verbosity Set verbosity [default: 0; 0<=x<=3]
-d, --debug Enable debug mode
--dry-run [default: True]
--cfg-file PATH Path to iottb config file [default:
/home/seb/.config/iottb/iottb.cfg]
--help Show this message and exit.
Commands:
add-device Add a device to a database
init-db
rm-cfg Removes the cfg file from the filesystem.
rm-dbs Removes ALL(!) databases from the filesystem if...
set-key-in-table-to Edit config or metadata files.
show-all Show everything: configuration, databases, and...
show-cfg Show the current configuration context
sniff Sniff packets with tcpdump
Usage: iottb [OPTIONS] COMMAND [ARGS]...
Options:
-v, --verbosity Set verbosity [default: 0; 0<=x<=3]
-d, --debug Enable debug mode
--dry-run [default: True]
--cfg-file PATH Path to iottb config file [default:
/home/seb/.config/iottb/iottb.cfg]
--help Show this message and exit.
Commands:
add-device Add a device to a database
init-db
rm-cfg Removes the cfg file from the filesystem.
rm-dbs Removes ALL(!) databases from the filesystem if...
set-key-in-table-to Edit config or metadata files.
show-all Show everything: configuration, databases, and...
show-cfg Show the current configuration context
sniff Sniff packets with tcpdump

View File

@ -1,142 +0,0 @@
# Main Command: `iottb`
Usage: `iottb [OPTIONS] COMMAND [ARGS]...`
Options:
-v, --verbosity Set verbosity [0<=x<=3] \n
-d, --debug Enable debug mode
--dry-run
--cfg-file PATH Path to iottb config file
--help Show this message and exit.
Commands:
add-device Add a device to a database
init-db
rm-cfg Removes the cfg file from the filesystem.
rm-dbs Removes ALL(!) databases from the filesystem if...
set-key-in-table-to Edit config or metadata files.
show-all Show everything: configuration, databases, and...
show-cfg Show the current configuration context
sniff Sniff packets with tcpdump
Command: init-db
Usage: [OPTIONS]
Options:
-d, --dest PATH Location to put (new) iottb database
-n, --name TEXT Name of new database.
--update-default / --no-update-default
If new db should be set as the new default
--help Show this message and exit.
Command: rm-cfg
Usage: [OPTIONS]
Removes the cfg file from the filesystem.
This is mostly a utility during development. Once non-standard database
locations are implemented, deleting this would lead to iottb not being able
to find them anymore.
Options:
--yes Confirm the action without prompting.
--help Show this message and exit.
Command: set-key-in-table-to
Usage: [OPTIONS]
Edit config or metadata files. TODO: Implement
Options:
--file TEXT
--table TEXT
--key TEXT
--value TEXT
--help Show this message and exit.
Command: rm-dbs
Usage: [OPTIONS]
Removes ALL(!) databases from the filesystem if they're empty.
Development utility currently unfit for use.
Options:
--yes Confirm the action without prompting.
--help Show this message and exit.
Command: add-device
Usage: [OPTIONS]
Add a device to a database
Options:
--dev, --device-name TEXT The name of the device to be added. If this
string contains spaces or other special
characters normalization is
performed to derive a canonical name [required]
--db, --database DIRECTORY Database in which to add this device. If not
specified use default from config. [env var:
IOTTB_DB]
--guided Add device interactively [env var:
IOTTB_GUIDED_ADD]
--help Show this message and exit.
Command: show-cfg
Usage: [OPTIONS]
Show the current configuration context
Options:
--cfg-file PATH Path to the config file
-pp Pretty Print
--help Show this message and exit.
Command: sniff
Usage: [OPTIONS] [TCPDUMP-ARGS] [DEVICE]
Sniff packets with tcpdump
Options:
Testbed sources:
--db, --database TEXT Database of device. Only needed if not current
default. [env var: IOTTB_DB]
--app TEXT Companion app being used during capture
Runtime behaviour:
--unsafe Disable checks for otherwise required options.
[env var: IOTTB_UNSAFE]
--guided [env var: IOTTB_GUIDED]
--pre PATH Script to be executed before main commandis
started.
Tcpdump options:
-i, --interface TEXT Network interface to capture on.If not specified
tcpdump tries to find and appropriate one. [env
var: IOTTB_CAPTURE_INTERFACE]
-a, --address TEXT IP or MAC address to filter packets by. [env var:
IOTTB_CAPTURE_ADDRESS]
-I, --monitor-mode Put interface into monitor mode.
--ff TEXT tcpdump filter as string or file path. [env var:
IOTTB_CAPTURE_FILTER]
-#, --print-pacno Print packet number at beginning of line. True by
default.
-e, --print-ll Print link layer headers. True by default.
-c, --count INTEGER Number of packets to capture.
--help Show this message and exit.
Command: show-all
Usage: [OPTIONS]
Show everything: configuration, databases, and device metadata
Options:
--help Show this message and exit.

View File

@ -1,199 +0,0 @@
import json
import sys
import click
from pathlib import Path
import logging
import re
from iottb import definitions
from iottb.models.device_metadata import DeviceMetadata
from iottb.models.iottb_config import IottbConfig
from iottb.definitions import CFG_FILE_PATH, TB_ECHO_STYLES
logger = logging.getLogger(__name__)
def prompt_for_device_details():
device_details = {}
aliases = []
while True:
click.echo("\nEnter the details for the new device:")
click.echo("1. Device Name")
click.echo("2. Description")
click.echo("3. Model")
click.echo("4. Manufacturer")
click.echo("5. Current Firmware Version")
click.echo("6. Device Type")
click.echo("7. Supported Interfaces")
click.echo("8. Companion Applications")
click.echo("9. Add Alias")
click.echo("10. Finish and Save")
choice = click.prompt("Choose an option", type=int)
if choice == 1:
device_details['device_name'] = click.prompt("Enter the device name")
elif choice == 2:
device_details['description'] = click.prompt("Enter the description")
elif choice == 3:
device_details['model'] = click.prompt("Enter the model")
elif choice == 4:
device_details['manufacturer'] = click.prompt("Enter the manufacturer")
elif choice == 5:
device_details['firmware_version'] = click.prompt("Enter the current firmware version")
elif choice == 6:
device_details['device_type'] = click.prompt("Enter the device type")
elif choice == 7:
device_details['supported_interfaces'] = click.prompt("Enter the supported interfaces")
elif choice == 8:
device_details['companion_applications'] = click.prompt("Enter the companion applications")
elif choice == 9:
alias = click.prompt("Enter an alias")
aliases.append(alias)
elif choice == 10:
break
else:
click.echo("Invalid choice. Please try again.")
device_details['aliases'] = aliases
return device_details
def confirm_and_add_device(device_details, db_path):
click.echo("\nDevice metadata:")
for key, value in device_details.items():
click.echo(f"{key.replace('_', ' ').title()}: {value}")
confirm = click.confirm("Do you want to add this device with above metadata?")
if confirm:
device_name = device_details.get('device_name')
if not device_name:
click.echo("Device name is required. Exiting...")
return
device_metadata = DeviceMetadata(**device_details)
device_dir = db_path / device_metadata.canonical_name
if device_dir.exists():
click.echo(f"Device {device_name} already exists in the database.")
click.echo("Exiting...")
return
try:
device_dir.mkdir(parents=True, exist_ok=True)
metadata_path = device_dir / definitions.DEVICE_METADATA_FILE_NAME
device_metadata.save_metadata_to_file(metadata_path)
click.echo(f"Successfully added device {device_name} to database.")
except OSError as e:
click.echo(f"Error trying to create device directory: {e}")
click.echo("Exiting...")
else:
click.echo("Operation cancelled. Exiting...")
def add_device_guided(cfg, db):
logger.info('Adding device interactively')
# logger.debug(f'Parameters: {params}. value: {value}')
databases = cfg.db_path_dict
if not databases:
click.echo('No databases found in config file.')
return
click.echo('Available Databases:')
last = 0
for i, db_name in enumerate(databases.keys(), start=1):
click.echo(f'[{i}] {db_name}')
last = i if last < i else last
db_choice = click.prompt('Select the database to add the new device to (1 - {last}, 0 to quit)',
type=int, default=1)
if 1 <= db_choice <= last:
selected_db = list(databases.keys())[db_choice - 1]
click.confirm(f'Use {selected_db}?', abort=True)
db_path = Path(databases[selected_db]) / selected_db
logger.debug(f'DB Path {str(db_path)}')
device_details = prompt_for_device_details()
confirm_and_add_device(device_details, db_path)
elif db_choice == 0:
click.echo(f'Quitting...')
else:
click.echo(f'{db_choice} is not a valid choice. Please rerun command and select a valid database.')
@click.command('add-device', help='Add a device to a database')
@click.argument('device', type=str, default="")
@click.option('--db', '--database', type=str,
envvar='IOTTB_DB', show_envvar=True, default="",
help='Database in which to add this device. If not specified use default from config.')
@click.option('--guided', is_flag=True,
help='Add device interactively')
def add_device(device, db, guided):
"""Add a new device to a database
Device name must be supplied unless in an interactive setup.
Database is taken from config by default.
If this device name contains spaces or other special characters normalization is performed to derive a canonical name.
"""
logger.info('add-device invoked')
# Step 1: Load Config
# Dependency: Config file must exist
config = IottbConfig(Path(CFG_FILE_PATH))
logger.debug(f'Config loaded: {config}')
# If guided flag set, continue with guided add and leave
if guided:
click.echo('Guided option set. Continuing with guided add.')
add_device_guided(config, device, db)
logger.info('Finished guided device add.')
return
# Step 2: Load database
# dependency: Database folder must exist
if db != "":
database = db
path = config.db_path_dict[database]
logger.debug(f'Resolved (path, db) {path}, {database}')
else:
path = config.default_db_location
database = config.default_database
logger.debug(f'Default (path, db) {path}, {database}')
click.secho(f'Using database {database}')
full_db_path = Path(path) / database
if not full_db_path.is_dir():
logger.warning(f'No database at {database}')
click.echo(f'No database found at {full_db_path}', lvl='w')
click.echo(
f'You need to initialize the testbed before before you add devices!')
click.echo(
f'To initialize the testbed in the default location run "iottb init-db"')
click.echo('Exiting...')
sys.exit()
# Ensure a device name was passed as argument
if device == "":
click.echo("Device name cannot be an empty string. Exiting...", lvl='w')
return
# Step 3: Check if device already exists in database
# dependency: DeviceMetadata object
device_metadata = DeviceMetadata(device_name=device)
device_dir = full_db_path / device_metadata.canonical_name
# Check if device is already registered
if device_dir.exists():
logger.warning(f'Device directory {device_dir} already exists.')
click.echo(f'Device {device} already exists in the database.')
click.echo('Exiting...')
sys.exit()
try:
device_dir.mkdir()
except OSError as e:
logger.error(f'Error trying to create device {e}')
click.echo('Exiting...')
sys.exit()
# Step 4: Save metadata into device_dir
metadata_path = device_dir / definitions.DEVICE_METADATA_FILE_NAME
with metadata_path.open('w') as metadata_file:
json.dump(device_metadata.__dict__, metadata_file, indent=4)
click.echo(f'Successfully added device {device} to database')
logger.debug(f'Added device {device} to database {database}. Full path of metadata {metadata_path}')
logger.info(f'Metadata for {device} {device_metadata.print_attributes()}')

View File

@ -1,347 +0,0 @@
import json
import logging
import os
import re
import subprocess
import sys
import uuid
from datetime import datetime
from pathlib import Path
from time import time
import click
from click_option_group import optgroup
from iottb.utils.string_processing import make_canonical_name
# Setup logger
logger = logging.getLogger('iottb.sniff')
def is_ip_address(address):
ip_pattern = re.compile(r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$")
return ip_pattern.match(address) is not None
def is_mac_address(address):
mac_pattern = re.compile(r"^([0-9A-Fa-f]{2}:){5}[0-9A-Fa-f]{2}$")
return mac_pattern.match(address) is not None
def load_config(cfg_file):
"""Loads configuration from the given file path."""
with open(cfg_file, 'r') as config_file:
return json.load(config_file)
def validate_sniff(ctx, param, value):
logger.info('Validating sniff...')
if ctx.params.get('unsafe') and not value:
return None
if not ctx.params.get('unsafe') and not value:
raise click.BadParameter('Address is required unless --unsafe is set.')
if not is_ip_address(value) and not is_mac_address(value):
raise click.BadParameter('Address must be a valid IP address or MAC address.')
return value
def run_pre(pre):
subprocess.run(pre, shell=True)
logger.debug(f'finnished {pre}')
def run_post(post):
subprocess.run(post, shell=True)
logger.debug(f'finnished {post}')
@click.command('sniff', help='Sniff packets with tcpdump')
@optgroup.group('Testbed sources')
@optgroup.option('--db', '--database', type=str, envvar='IOTTB_DB', show_envvar=True,
help='Database of device. Only needed if not current default.')
@optgroup.option('--app', type=str, help='Companion app being used during capture', required=False)
@optgroup.group('Runtime behaviour')
@optgroup.option('--unsafe', is_flag=True, default=False, envvar='IOTTB_UNSAFE', is_eager=True,
help='Disable checks for otherwise required options.\n', show_envvar=True)
@optgroup.option('--guided', is_flag=True, default=False, envvar='IOTTB_GUIDED', show_envvar=True)
@optgroup.option('--pre', help='Script to be executed before main command is started.')
@optgroup.option('--post', help='Script to be executed upon completion of main command.')
@optgroup.group('Tcpdump options')
@optgroup.option('-i', '--interface',
help='Network interface to capture on.' +
'If not specified tcpdump tries to find and appropriate one.\n', show_envvar=True,
envvar='IOTTB_CAPTURE_INTERFACE')
@optgroup.option('-a', '--address', callback=validate_sniff,
help='IP or MAC address to filter packets by.\n', show_envvar=True,
envvar='IOTTB_CAPTURE_ADDRESS')
@optgroup.option('-I', '--monitor-mode', help='Put interface into monitor mode.\n', is_flag=True)
@optgroup.option('--ff', type=str, envvar='IOTTB_CAPTURE_FILTER', show_envvar=True,
help='tcpdump filter as string or file path.')
@optgroup.option('-#', '--print-pacno', is_flag=True, default=True,
help='Print packet number at beginning of line. True by default.\n')
@optgroup.option('-e', '--print-ll', is_flag=True, default=False,
help='Print link layer headers. True by default.')
@optgroup.option('-c', '--count', type=int, help='Number of packets to capture.', default=1000)
# @optgroup.option('--mins', type=int, help='Time in minutes to capture.', default=1)
@click.argument('tcpdump-args', nargs=-1, required=False, metavar='[TCPDUMP-ARGS]')
@click.argument('device', required=False)
@click.pass_context
def sniff(ctx, device, interface, print_pacno, ff, count, monitor_mode, print_ll, address, db, unsafe, guided,
app, tcpdump_args, pre, post, **params):
""" Sniff packets from a device """
logger.info('sniff command invoked')
# Step 0: run pre script:
if pre:
click.echo(f'Running pre command {pre}')
run_pre(pre)
# Step1: Load Config
config = ctx.obj['CONFIG']
logger.debug(f'Config loaded: {config}')
# Step2: determine relevant database
database = db if db else config.default_database
path = config.db_path_dict[database]
full_db_path = Path(path) / database
logger.debug(f'Full db path is {str(full_db_path)}')
# 2.2: Check if it exists
if not full_db_path.is_dir():
logger.error('DB unexpectedly missing')
click.echo('DB unexpectedly missing')
return
canonical_name, aliases = make_canonical_name(device)
click.echo(f'Using canonical device name {canonical_name}')
device_path = full_db_path / canonical_name
# Step 3: now the device
if not device_path.exists():
if not unsafe:
logger.error(f'Device path {device_path} does not exist')
click.echo(f'Device path {device_path} does not exist')
return
else:
device_path.mkdir(parents=True, exist_ok=True)
logger.info(f'Device path {device_path} created')
click.echo(f'Found device at path {device_path}')
# Step 4: Generate filter
generic_filter = None
cap_filter = None
if ff:
logger.debug(f'ff: {ff}')
if Path(ff).is_file():
logger.info('Given filter option is a file')
with open(ff, 'r') as f:
cap_filter = f.read().strip()
else:
logger.info('Given filter option is an expression')
cap_filter = ff
else:
if address is not None:
if is_ip_address(address):
generic_filter = 'net'
cap_filter = f'{generic_filter} {address}'
elif is_mac_address(address):
generic_filter = 'ether net'
cap_filter = f'{generic_filter} {address}'
elif not unsafe:
logger.error('Invalid address format')
click.echo('Invalid address format')
return
logger.info(f'Generic filter {generic_filter}')
click.echo(f'Using filter {cap_filter}')
# Step 5: prep capture directory
capture_date = datetime.now().strftime('%Y-%m-%d')
capture_base_dir = device_path / f'sniffs/{capture_date}'
capture_base_dir.mkdir(parents=True, exist_ok=True)
logger.debug(f'Previous captures {capture_base_dir.glob('cap*')}')
capture_count = sum(1 for _ in capture_base_dir.glob('cap*'))
logger.debug(f'Capture count is {capture_count}')
capture_dir = f'cap{capture_count:04d}-{datetime.now().strftime('%H%M')}'
logger.debug(f'capture_dir: {capture_dir}')
# Full path
capture_dir_full_path = capture_base_dir / capture_dir
capture_dir_full_path.mkdir(parents=True, exist_ok=True)
click.echo(f'Files will be placed in {str(capture_dir_full_path)}')
logger.debug(f'successfully created capture directory')
# Step 6: Prepare capture file names
# Generate UUID for filenames
capture_uuid = str(uuid.uuid4())
click.echo(f'Capture has id {capture_uuid}')
pcap_file = f"{canonical_name}_{capture_uuid}.pcap"
pcap_file_full_path = capture_dir_full_path / pcap_file
stdout_log_file = f'stdout_{capture_uuid}.log'
stderr_log_file = f'stderr_{capture_uuid}.log'
logger.debug(f'Full pcap file path is {pcap_file_full_path}')
logger.info(f'pcap file name is {pcap_file}')
logger.info(f'stdout log file is {stdout_log_file}')
logger.info(f'stderr log file is {stderr_log_file}')
# Step 7: Build tcpdump command
logger.debug(f'pgid {os.getpgrp()}')
logger.debug(f'ppid {os.getppid()}')
logger.debug(f'(real, effective, saved) user id: {os.getresuid()}')
logger.debug(f'(real, effective, saved) group id: {os.getresgid()}')
cmd = ['sudo', 'tcpdump']
# 7.1 process flags
flags = []
if print_pacno:
flags.append('-#')
if print_ll:
flags.append('-e')
if monitor_mode:
flags.append('-I')
flags.append('-n') # TODO: Integrate, in case name resolution is wanted!
cmd.extend(flags)
flags_string = " ".join(flags)
logger.debug(f'Flags: {flags_string}')
# debug interlude
verbosity = ctx.obj['VERBOSITY']
if verbosity > 0:
verbosity_flag = '-'
for i in range(0, verbosity):
verbosity_flag = verbosity_flag + 'v'
logger.debug(f'verbosity string to pass to tcpdump: {verbosity_flag}')
cmd.append(verbosity_flag)
# 7.2 generic (i.e. reusable) kw args
generic_kw_args = []
if count:
generic_kw_args.extend(['-c', str(count)])
# if mins:
# generic_kw_args.extend(['-G', str(mins * 60)]) TODO: this currently loads to errors with sudo
cmd.extend(generic_kw_args)
generic_kw_args_string = " ".join(generic_kw_args)
logger.debug(f'KW args: {generic_kw_args_string}')
# 7.3 special kw args (not a priori reusable)
non_generic_kw_args = []
if interface:
non_generic_kw_args.extend(['-i', interface])
non_generic_kw_args.extend(['-w', str(pcap_file_full_path)])
cmd.extend(non_generic_kw_args)
non_generic_kw_args_string = " ".join(non_generic_kw_args)
logger.debug(f'Non transferable (special) kw args: {non_generic_kw_args_string}')
# 7.4 add filter expression
if cap_filter:
logger.debug(f'cap_filter (not generic): {cap_filter}')
cmd.append(cap_filter)
full_cmd_string = " ".join(cmd)
logger.info(f'tcpdump command: {"".join(full_cmd_string)}')
click.echo('Capture setup complete!')
# Step 8: Execute tcpdump command
start_time = datetime.now().strftime("%H:%M:%S")
start = time()
try:
if guided:
click.confirm(f'Execute following command: {full_cmd_string}')
stdout_log_file_abs_path = capture_dir_full_path / stdout_log_file
stderr_log_file_abs_path = capture_dir_full_path / stderr_log_file
stdout_log_file_abs_path.touch(mode=0o777)
stderr_log_file_abs_path.touch(mode=0o777)
with open(stdout_log_file_abs_path, 'w') as out, open(stderr_log_file_abs_path, 'w') as err:
logger.debug(f'\nstdout: {out}.\nstderr: {err}.\n')
tcp_complete = subprocess.run(cmd, check=True, capture_output=True, text=True)
out.write(tcp_complete.stdout)
err.write(tcp_complete.stderr)
# click.echo(f'Mock sniff execution')
click.echo(f"Capture complete. Saved to {pcap_file}")
except subprocess.CalledProcessError as e:
logger.error(f'Failed to capture packets: {e}')
click.echo(f'Failed to capture packets: {e}')
click.echo(f'Check {stderr_log_file} for more info.')
if ctx.obj['DEBUG']:
msg = [f'STDERR log {stderr_log_file} contents:\n']
with open(capture_dir_full_path / stderr_log_file) as log:
for line in log:
msg.append(line)
click.echo("\t".join(msg), lvl='e')
# print('DEBUG ACTIVE')
if guided:
click.prompt('Create metadata anyway?')
else:
click.echo('Aborting capture...')
sys.exit()
end_time = datetime.now().strftime("%H:%M:%S")
end = time()
delta = end - start
click.echo(f'tcpdump took {delta:.2f} seconds.')
# Step 9: Register metadata
metadata = {
'device': canonical_name,
'device_id': device,
'capture_id': capture_uuid,
'capture_date_iso': datetime.now().isoformat(),
'invoked_command': " ".join(map(str, cmd)),
'capture_duration': delta,
'generic_parameters': {
'flags': flags_string,
'kwargs': generic_kw_args_string,
'filter': generic_filter
},
'non_generic_parameters': {
'kwargs': non_generic_kw_args_string,
'filter': cap_filter
},
'features': {
'interface': interface,
'address': address
},
'resources': {
'pcap_file': str(pcap_file),
'stdout_log': str(stdout_log_file),
'stderr_log': str(stderr_log_file),
'pre': str(pre),
'post': str(post)
},
'environment': {
'capture_dir': capture_dir,
'database': database,
'capture_base_dir': str(capture_base_dir),
'capture_dir_abs_path': str(capture_dir_full_path)
}
}
click.echo('Ensuring correct ownership of created files.')
username = os.getlogin()
gid = os.getgid()
# Else there are issues when running with sudo:
try:
subprocess.run(f'sudo chown -R {username}:{username} {device_path}', shell=True)
except OSError as e:
click.echo(f'Some error {e}')
click.echo(f'Saving metadata.')
metadata_abs_path = capture_dir_full_path / 'capture_metadata.json'
with open(metadata_abs_path, 'w') as f:
json.dump(metadata, f, indent=4)
click.echo(f'END SNIFF SUBCOMMAND')
if post:
click.echo(f'Running post script {post}')
run_post(post)

View File

@ -1,70 +0,0 @@
import click
from pathlib import Path
import logging
from logging.handlers import RotatingFileHandler
import sys
from iottb.models.iottb_config import IottbConfig
from iottb.definitions import DB_NAME, CFG_FILE_PATH
logger = logging.getLogger(__name__)
@click.command()
@click.option('-d', '--dest', type=click.Path(exists=True, file_okay=False, dir_okay=True),
help='Location to put (new) iottb database')
@click.option('-n', '--name', default=DB_NAME, type=str,
help='Name of new database.')
@click.option('--update-default/--no-update-default', default=True,
help='If new db should be set as the new default')
@click.pass_context
def init_db(ctx, dest, name, update_default):
logger.info('init-db invoked')
config = ctx.obj['CONFIG']
logger.debug(f'str(config)')
# Use the default path from config if dest is not provided
known_dbs = config.get_known_databases()
logger.debug(f'Known databases: {known_dbs}')
if name in known_dbs:
dest = config.get_database_location(name)
if Path(dest).joinpath(name).is_dir():
click.echo(f'A database {name} already exists.')
logger.debug(f'DB {name} exists in {dest}')
click.echo(f'Exiting...')
sys.exit()
logger.debug(f'DB name {name} registered but does not exist.')
if not dest:
logger.info('No dest set, choosing default destination.')
dest = Path(config.default_db_location)
db_path = Path(dest).joinpath(name)
logger.debug(f'Full path for db {str(db_path)}')
# Create the directory if it doesn't exist
db_path.mkdir(parents=True, exist_ok=True)
logger.info(f"mkdir {db_path} successful")
click.echo(f'Created {db_path}')
# Update configuration
config.set_database_location(name, str(dest))
if update_default:
config.set_default_database(name, str(dest))
config.save_config()
logger.info(f"Updated configuration with database {name} at {db_path}")
# @click.group('config')
# @click.pass_context
# def cfg(ctx):
# pass
#
# @click.command('set', help='Set the location of a database.')
# @click.argument('database', help='Name of database')
# @click.argument('location', help='Where the database is located (i.e. its parent directory)')
# @click.pass_context
# def set(ctx, key, value):
# click.echo(f'Setting {key} to {value} in config')
# config = ctx.obj['CONFIG']
# logger.warning('No checks performed!')
# config.set_database_location(key, value)
# config.save_config()

View File

@ -1,50 +0,0 @@
import json
import logging
import uuid
from datetime import datetime
import logging
import click
from iottb.utils.string_processing import make_canonical_name
logger = logging.getLogger(__name__)
class DeviceMetadata:
def __init__(self, device_name, description="", model="", manufacturer="", firmware_version="", device_type="",
supported_interfaces="", companion_applications="", save_to_file=None, aliases=None):
self.device_id = str(uuid.uuid4())
self.device_name = device_name
cn, default_aliases = make_canonical_name(device_name)
logger.debug(f'cn, default aliases = {cn}, {str(default_aliases)}')
self.aliases = default_aliases if aliases is None else default_aliases + aliases
self.canonical_name = cn
self.date_added = datetime.now().isoformat()
self.description = description
self.model = model
self.manufacturer = manufacturer
self.current_firmware_version = firmware_version
self.device_type = device_type
self.supported_interfaces = supported_interfaces
self.companion_applications = companion_applications
self.last_metadata_update = datetime.now().isoformat()
if save_to_file is not None:
click.echo('TODO: Implement saving config to file after creation!')
def add_alias(self, alias: str = ""):
if alias == "":
return
self.aliases.append(alias)
def get_canonical_name(self):
return self.canonical_name
def print_attributes(self):
print(f'Printing attribute value pairs in {__name__}')
for attr, value in self.__dict__.items():
print(f'{attr}: {value}')
def save_metadata_to_file(self, metadata_path):
with open(metadata_path, 'w') as metadata_file:
json.dump(self.__dict__, metadata_file, indent=4)
click.echo(f'Metadata saved to {metadata_path}')

View File

@ -1,39 +0,0 @@
import json
import logging
import uuid
from datetime import datetime
from pathlib import Path
logger = logging.getLogger('iottb.sniff') # Log with sniff subcommand
class CaptureMetadata:
def __init__(self, device_id, capture_dir, interface, address, capture_file, tcpdump_command, tcpdump_stdout, tcpdump_stderr, packet_filter, alias):
self.base_data = {
'device_id': device_id,
'capture_id': str(uuid.uuid4()),
'capture_date': datetime.now().isoformat(),
'capture_dir': str(capture_dir),
'capture_file': capture_file,
'start_time': "",
'stop_time': "",
'alias': alias
}
self.features = {
'interface': interface,
'device_ip_address': address if address else "No IP Address set",
'tcpdump_stdout': str(tcpdump_stdout),
'tcpdump_stderr': str(tcpdump_stderr),
'packet_filter': packet_filter
}
self.command = tcpdump_command
def save_to_file(self):
metadata = {
'base_data': self.base_data,
'features': self.features,
'command': self.command
}
metadata_file_path = Path(self.base_data['capture_dir']) / 'metadata.json'
with open(metadata_file_path, 'w') as f:
json.dump(metadata, f, indent=4)
logger.info(f'Metadata saved to {metadata_file_path}')

View File

@ -1,74 +0,0 @@
from pathlib import Path
import click
from io import StringIO
import sys
from iottb import DOCS_FOLDER
# Import your CLI app here
from iottb.main import cli
"""Script to generate the help text and write to file.
Definitely needs better formatting.
Script is also not very flexible.
"""
def get_help_text(command):
"""Get the help text for a given command."""
help_text = StringIO()
with click.Context(command) as ctx:
# chatgpt says this helps: was right
sys_stdout = sys.stdout
sys.stdout = help_text
try:
click.echo(command.get_help(ctx))
finally:
sys.stdout = sys_stdout
return help_text.getvalue()
def write_help_to_file(cli, filename):
"""Write help messages of all commands and subcommands to a file."""
with open(filename, 'w+') as f:
# main
f.write(f"Main Command: iottb\n")
f.write(get_help_text(cli))
f.write("\n\n")
# go through subcommands
for cmd_name, cmd in cli.commands.items():
f.write(f"Command: {cmd_name}\n")
f.write(get_help_text(cmd))
f.write("\n\n")
# subcommands of subcommands
if isinstance(cmd, click.Group):
for sub_cmd_name, sub_cmd in cmd.commands.items():
f.write(f"Subcommand: {cmd_name} {sub_cmd_name}\n")
f.write(get_help_text(sub_cmd))
f.write("\n\n")
def manual():
comands = [
'init-db',
'add-device',
'sniff'
]
dev_commands = [
'show-all',
'rm-dbs',
'show-cfg',
'show-all'
]
if __name__ == "__main__":
from iottb import DOCS_FOLDER
print('Must be in project root for this to work properly!')
print(f'CWD is {str(Path.cwd())}')
DOCS_FOLDER.mkdir(exist_ok=True)
write_help_to_file(cli, str(DOCS_FOLDER / "help_messages.md"))
print(f'Wrote help_messages.md to {str(DOCS_FOLDER / "help_messages.md")}')

View File

@ -1,4 +0,0 @@
#/bin/sh
echo 'Running iottb as sudo'
sudo $(which python) iottb $@
echo 'Finished executing iottb with sudo'

View File

@ -1,9 +0,0 @@
click-option-group==0.5.6 ; python_version >= "3.12" and python_version < "4" \
--hash=sha256:38a26d963ee3ad93332ddf782f9259c5bdfe405e73408d943ef5e7d0c3767ec7 \
--hash=sha256:97d06703873518cc5038509443742b25069a3c7562d1ea72ff08bfadde1ce777
click==8.1.7 ; python_version >= "3.12" and python_version < "4.0" \
--hash=sha256:ae74fb96c20a0277a1d615f1e4d73c8414f5a98db8b799a7931d1582f3390c28 \
--hash=sha256:ca9853ad459e787e2192211578cc907e7594e294c7ccc834310722b41b9ca6de
colorama==0.4.6 ; python_version >= "3.12" and python_version < "4.0" and platform_system == "Windows" \
--hash=sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44 \
--hash=sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6

View File

File diff suppressed because it is too large Load Diff

Binary file not shown.

Binary file not shown.

File diff suppressed because it is too large Load Diff

Binary file not shown.

Binary file not shown.

BIN
iottb

Binary file not shown.

View File

@ -1,5 +1,3 @@
from pathlib import Path
from iottb import definitions from iottb import definitions
import logging import logging
from iottb.utils.user_interaction import tb_echo from iottb.utils.user_interaction import tb_echo
@ -11,6 +9,3 @@ log_dir = definitions.LOGDIR
# Ensure logs dir exists before new handlers are registered in main.py # Ensure logs dir exists before new handlers are registered in main.py
if not log_dir.is_dir(): if not log_dir.is_dir():
log_dir.mkdir() log_dir.mkdir()
DOCS_FOLDER = Path.cwd() / 'docs'

View File

@ -0,0 +1,89 @@
import json
import click
from pathlib import Path
import logging
import re
from iottb import definitions
from iottb.models.device_metadata import DeviceMetadata
from iottb.models.iottb_config import IottbConfig
from iottb.definitions import CFG_FILE_PATH, TB_ECHO_STYLES
logger = logging.getLogger(__name__)
def add_device_guided(ctx, cn, db):
click.echo('TODO: Implement')
logger.info('Adding device interactively')
#logger.debug(f'Parameters: {params}. value: {value}')
@click.command('add-device', help='Add a device to a database')
@click.option('--dev', '--device-name', type=str, required=True,
help='The name of the device to be added. If this string contains spaces or other special characters \
normalization is performed to derive a canonical name')
@click.option('--db', '--database', type=click.Path(exists=True, file_okay=False, dir_okay=True, path_type=Path),
envvar='IOTTB_DB', show_envvar=True,
help='Database in which to add this device. If not specified use default from config.')
@click.option('--guided', is_flag=True, default=False, show_default=True, envvar='IOTTB_GUIDED_ADD', show_envvar=True,
help='Add device interactively')
def add_device(dev, db, guided):
"""Add a new device to a database
Device name must be supplied unless in an interactive setup. Database is taken from config by default.
"""
logger.info('add-device invoked')
# Step 1: Load Config
# Dependency: Config file must exist
config = IottbConfig(Path(CFG_FILE_PATH))
logger.debug(f'Config loaded: {config}')
# Step 2: Load database
# dependency: Database folder must exist
if db:
database = db
path = config.db_path_dict
logger.debug(f'Resolved (path, db) {path}, {database}')
else:
path = config.default_db_location
database = config.default_database
logger.debug(f'Default (path, db) {path}, {database}')
click.secho(f'Using database {database}')
full_db_path = Path(path) / database
if not full_db_path.is_dir():
logger.warning(f'No database at {database}')
click.echo(f'Could not find a database.')
click.echo(f'You need to initialize the testbed before before you add devices!')
click.echo(f'To initialize the testbed in the default location run "iottb init-db"')
click.echo('Exiting...')
exit()
# Step 3: Check if device already exists in database
# dependency: DeviceMetadata object
device_metadata = DeviceMetadata(device_name=dev)
device_dir = full_db_path / device_metadata.canonical_name
# Check if device is already registered
if device_dir.exists():
logger.warning(f'Device directory {device_dir} already exists.')
click.echo(f'Device {dev} already exists in the database.')
click.echo('Exiting...')
exit()
try:
device_dir.mkdir()
except OSError as e:
logger.error(f'Error trying to create device {e}')
click.echo('Exiting...')
exit()
# Step 4: Save metadata into device_dir
metadata_path = device_dir / definitions.DEVICE_METADATA_FILE_NAME
with metadata_path.open('w') as metadata_file:
json.dump(device_metadata.__dict__, metadata_file, indent=4)
click.echo(f'Successfully added device {dev} to database')
logger.debug(f'Added device {dev} to database {database}. Full path of metadata {metadata_path}')
logger.info(f'Metadata for {dev} {device_metadata.print_attributes()}')

View File

@ -2,7 +2,6 @@ from pathlib import Path
import logging import logging
import click import click
from iottb import tb_echo
from iottb.definitions import DB_NAME, CFG_FILE_PATH from iottb.definitions import DB_NAME, CFG_FILE_PATH
from iottb.models.iottb_config import IottbConfig from iottb.models.iottb_config import IottbConfig
@ -95,17 +94,12 @@ def show_everything(ctx):
click.echo(f"Default Database: {config.default_database}") click.echo(f"Default Database: {config.default_database}")
click.echo(f"Default Database Path: {config.default_db_location}") click.echo(f"Default Database Path: {config.default_db_location}")
click.echo("Database Locations:") click.echo("Database Locations:")
everything_dict = {}
for db_name, db_path in config.db_path_dict.items():
click.echo(f" - {db_name}: {db_path}")
for db_name, db_path in config.db_path_dict.items(): for db_name, db_path in config.db_path_dict.items():
full_db_path = Path(db_path) / db_name full_db_path = Path(db_path) / db_name
click.echo(f" - {db_name}: {full_db_path}")
if full_db_path.is_dir(): if full_db_path.is_dir():
click.echo(f"\nContents of {full_db_path}:") click.echo(f"Contents of {db_name} at {full_db_path}:")
flag = True
for item in full_db_path.iterdir(): for item in full_db_path.iterdir():
flag = False
if item.is_file(): if item.is_file():
click.echo(f" - {item.name}") click.echo(f" - {item.name}")
try: try:
@ -121,10 +115,9 @@ def show_everything(ctx):
click.echo(f" - {subitem.name}") click.echo(f" - {subitem.name}")
elif subitem.is_dir(): elif subitem.is_dir():
click.echo(f" - {subitem.name}/") click.echo(f" - {subitem.name}/")
if flag:
tb_echo(f'\t EMPTY')
else: else:
click.echo(f" {full_db_path} is not a directory") click.echo(f" {full_db_path} is not a directory")
warnstyle = {'fg': 'red', 'bold': True}
click.secho('Developer command used', **warnstyle)

View File

@ -0,0 +1,100 @@
import click
from pathlib import Path
import logging
from logging.handlers import RotatingFileHandler
import sys
from iottb.models.iottb_config import IottbConfig
from iottb.definitions import DB_NAME
logger = logging.getLogger(__name__)
@click.command()
@click.option('-d', '--dest', type=click.Path(), help='Location to put (new) iottb database')
@click.option('-n', '--name', default=DB_NAME, type=str, help='Name of new database.')
@click.option('--update-default/--no-update-default', default=True, help='If new db should be set as the new default')
@click.pass_context
def init_db(ctx, dest, name, update_default):
logger.info('init-db invoked')
config = ctx.obj['CONFIG']
logger.debug(f'str(config)')
# Use the default path from config if dest is not provided
known_dbs = config.get_known_databases()
logger.debug(f'Known databases: {known_dbs}')
if name in known_dbs:
dest = config.get_database_location(name)
if Path(dest).joinpath(name).is_dir():
click.echo(f'A database {name} already exists.')
logger.debug(f'DB {name} exists in {dest}')
click.echo(f'Exiting...')
exit()
logger.debug(f'DB name {name} registered but does not exist.')
if not dest:
logger.info('No dest set, choosing default destination.')
dest = Path(config.default_db_location).parent
db_path = Path(dest).joinpath(name)
logger.debug(f'Full path for db {str(db_path)}')
# Create the directory if it doesn't exist
db_path.mkdir(parents=True, exist_ok=True)
logger.info(f"mkdir {db_path} successful")
click.echo(f'Created {db_path}')
# Update configuration
config.set_database_location(name, str(dest))
if update_default:
config.set_default_database(name, str(dest))
config.save_config()
logger.info(f"Updated configuration with database {name} at {db_path}")
@click.command()
@click.option('-d', '--dest', type=click.Path(), help='Location to put (new) iottb database')
@click.option('-n', '--name', default=DB_NAME, type=str, help='Name of new database.')
@click.option('--update-default/--no-update-default', default=True, help='If new db should be set as the new default')
@click.pass_context
def init_db_inactive(ctx, dest, name, update_default):
logger.info('init-db invoked')
config = ctx.obj['CONFIG']
logger.debug(f'str(config)')
# Retrieve known databases
known_dbs = config.get_known_databases()
# Determine destination path
if name in known_dbs:
dest = Path(config.get_database_location(name))
if dest.joinpath(name).is_dir():
click.echo(f'A database {name} already exists.')
logger.debug(f'DB {name} exists in {dest}')
click.echo(f'Exiting...')
exit()
logger.debug(f'DB name {name} registered but does not exist.')
elif not dest:
logger.info('No destination set, using default path from config.')
dest = Path(config.default_db_location).parent
# Ensure destination path is absolute
dest = dest.resolve()
# Combine destination path with database name
db_path = dest / name
logger.debug(f'Full path for database: {str(db_path)}')
# Create the directory if it doesn't exist
try:
db_path.mkdir(parents=True, exist_ok=True)
logger.info(f'Directory {db_path} created successfully.')
click.echo(f'Created {db_path}')
except Exception as e:
logger.error(f'Failed to create directory {db_path}: {e}')
click.echo(f'Failed to create directory {db_path}: {e}', err=True)
exit(1)
# Update configuration
config.set_database_location(name, str(db_path))
if update_default:
config.set_default_database(name, str(db_path))
config.save_config()
logger.info(f'Updated configuration with database {name} at {db_path}')
click.echo(f'Updated configuration with database {name} at {db_path}')

133
iottb/commands/sniff.py Normal file
View File

@ -0,0 +1,133 @@
import click
import subprocess
import json
from pathlib import Path
import logging
import re
from datetime import datetime
from iottb.definitions import APP_NAME, CFG_FILE_PATH
from iottb.models.iottb_config import IottbConfig
from iottb.utils.string_processing import make_canonical_name
# Setup logger
logger = logging.getLogger('iottb.sniff')
def is_ip_address(address):
ip_pattern = re.compile(r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$")
return ip_pattern.match(address) is not None
def is_mac_address(address):
mac_pattern = re.compile(r"^([0-9A-Fa-f]{2}:){5}[0-9A-Fa-f]{2}$")
return mac_pattern.match(address) is not None
def load_config(cfg_file):
"""Loads configuration from the given file path."""
with open(cfg_file, 'r') as config_file:
return json.load(config_file)
def validate_sniff(ctx, param, value):
logger.info('Validating sniff...')
if ctx.params.get('unsafe') and not value:
return None
if not ctx.params.get('unsafe') and not value:
raise click.BadParameter('Address is required unless --unsafe is set.')
return value
@click.command('sniff', help='Sniff packets with tcpdump')
@click.argument('device')
@click.option('-i', '--interface', callback=validate_sniff, help='Network interface to capture on',
envvar='IOTTB_CAPTURE_INTERFACE')
@click.option('-a', '--address', callback=validate_sniff, help='IP or MAC address to filter packets by',
envvar='IOTTB_CAPTURE_ADDRESS')
@click.option('--db', '--database', type=click.Path(exists=True, file_okay=False), envvar='IOTTB_DB',
help='Database of device. Only needed if not current default.')
@click.option('--unsafe', is_flag=True, default=False, envvar='IOTTB_UNSAFE', is_eager=True,
help='Disable checks for otherwise required options')
@click.option('--guided', is_flag=True, default=False)
def sniff(device, interface, address, db, unsafe, guided):
""" Sniff packets from a device """
logger.info('sniff command invoked')
# Step1: Load Config
config = IottbConfig(Path(CFG_FILE_PATH))
logger.debug(f'Config loaded: {config}')
# Step2: determine relevant database
database = db if db else config.default_database
path = config.default_db_location[database]
full_db_path = Path(path) / database
logger.debug(f'Full db path is {str(path)}')
# Check if it exists
assert full_db_path.is_dir(), "DB unexpectedly missing"
canonical_name = make_canonical_name(device)
click.echo(f'Using canonical device name {canonical_name}')
if not database_path:
logger.error('No default database path found in configuration')
click.echo('No default database path found in configuration')
return
# Verify device directory
device_path = Path(database_path) / device
if not device_path.exists():
logger.error(f'Device path {device_path} does not exist')
click.echo(f'Device path {device_path} does not exist')
return
# Generate filter
if not unsafe:
if is_ip_address(address):
packet_filter = f"host {address}"
elif is_mac_address(address):
packet_filter = f"ether host {address}"
else:
logger.error('Invalid address format')
click.echo('Invalid address format')
return
else:
packet_filter = None
# Prepare capture directory
capture_dir = device_path / 'captures' / datetime.now().strftime('%Y%m%d_%H%M%S')
capture_dir.mkdir(parents=True, exist_ok=True)
# Prepare capture file
pcap_file = capture_dir / f"{device}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pcap"
# Build tcpdump command
cmd = ['sudo', 'tcpdump', '-i', interface, '-w', str(pcap_file)]
if packet_filter:
cmd.append(packet_filter)
logger.info(f'Executing: {" ".join(cmd)}')
# Execute tcpdump
try:
subprocess.run(cmd, check=True)
click.echo(f"Capture complete. Saved to {pcap_file}")
except subprocess.CalledProcessError as e:
logger.error(f'Failed to capture packets: {e}')
click.echo(f'Failed to capture packets: {e}')
@click.command('sniff', help='Sniff packets with tcpdump')
@click.argument('device')
@click.option('-i', '--interface', required=False, help='Network interface to capture on', envvar='IOTTB_CAPTURE_INTERFACE')
@click.option('-a', '--address', required=True, help='IP or MAC address to filter packets by', envvar='IOTTB_CAPTURE_ADDRESS')
@click.option('--db', '--database', type=click.Path(exists=True, file_okay=False), envvar='IOTTB_DB',
help='Database of device. Only needed if not current default.')
@click.option('--unsafe', is_flag=True, default=False, envvar='IOTTB_UNSAFE',
help='Disable checks for otherwise required options')
@click.option('--guided', is_flag=True)
def sniff2(device, interface, address, cfg_file):
""" Sniff packets from a device """
logger.info('sniff command invoked')
# Step 1: Load Config
# Dependency: Config file must exist
config = IottbConfig(Path(CFG_FILE_PATH))
logger.debug(f'Config loaded: {config}')

View File

@ -42,7 +42,3 @@ TB_ECHO_STYLES = {
'e': {'fg': 'red', 'bold': True}, 'e': {'fg': 'red', 'bold': True},
'header': {'fg': 'bright_cyan', 'bold': True, 'italic': True} 'header': {'fg': 'bright_cyan', 'bold': True, 'italic': True}
} }
NAME_OF_CAPTURE_DIR = 'sniffs'

View File

@ -1,19 +1,16 @@
import sys
import click import click
from pathlib import Path from pathlib import Path
import logging import logging
from iottb.commands.sniff import sniff from iottb.commands.sniff import sniff
from iottb.commands.developer import set_key_in_table_to, rm_cfg, rm_dbs, show_cfg, show_everything from iottb.commands.developer import set_key_in_table_to, rm_cfg, rm_dbs, show_cfg, show_everything
################################################## ##################################################
# Import package modules # Import package modules
################################################# #################################################
from iottb.utils.logger_config import setup_logging from iottb.utils.logger_config import setup_logging
from iottb import definitions from iottb import definitions
from iottb.models.iottb_config import IottbConfig from iottb.models.iottb_config import IottbConfig
from iottb.commands.testbed import init_db from iottb.commands.initialize_testbed import init_db
from iottb.commands.add_device import add_device from iottb.commands.add_device import add_device
############################################################################ ############################################################################
@ -31,34 +28,26 @@ loglevel = definitions.LOGLEVEL
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@click.group(context_settings=dict(auto_envvar_prefix='IOTTB', show_default=True)) @click.group()
@click.option('-v', '--verbosity', count=True, type=click.IntRange(0, 3), default=0, is_eager=True, @click.option('-v', '--verbosity', count=True, type=click.IntRange(0, 3), default=0,
help='Set verbosity') help='Set verbosity')
@click.option('-d', '--debug', is_flag=True, default=False, is_eager=True, @click.option('-d', '--debug', is_flag=True, default=False,
help='Enable debug mode') help='Enable debug mode')
@click.option('--dry-run', is_flag=False, default=True, is_eager=True, help='NOT USED!')
@click.option('--cfg-file', type=click.Path(), @click.option('--cfg-file', type=click.Path(),
default=Path(click.get_app_dir(APP_NAME)).joinpath('iottb.cfg'), default=Path(click.get_app_dir(APP_NAME)).joinpath('iottb.cfg'),
envvar='IOTTB_CONF_HOME', help='Path to iottb config file') envvar='IOTTB_CONF_HOME', help='Path to iottb config file')
@click.pass_context @click.pass_context
def cli(ctx, verbosity, debug, dry_run, cfg_file): def cli(ctx, verbosity, debug, cfg_file):
# Setup logging based on the loaded configuration and other options setup_logging(verbosity, debug) # Setup logging based on the loaded configuration and other options
setup_logging(verbosity, debug)
ctx.ensure_object(dict) # Make sure context is ready for use ctx.ensure_object(dict) # Make sure context is ready for use
logger.info("Starting execution.") logger.info("Starting execution.")
ctx.obj['CONFIG'] = IottbConfig(cfg_file) # Load configuration directly ctx.obj['CONFIG'] = IottbConfig(cfg_file) # Load configuration directly
ctx.meta['FULL_PATH_CONFIG_FILE'] = str(cfg_file) ctx.meta['FULL_PATH_CONFIG_FILE'] = str(cfg_file)
ctx.meta['DRY_RUN'] = dry_run
logger.debug(f'Verbosity: {verbosity}')
ctx.obj['VERBOSITY'] = verbosity
logger.debug(f'Debug: {debug}')
ctx.obj['DEBUG'] = debug
################################################################################## ##################################################################################
# Add all subcommands to group here # Add all subcommands to group here
################################################################################# #################################################################################
# TODO: Is there a way to do this without pylint freaking out?
# noinspection PyTypeChecker # noinspection PyTypeChecker
cli.add_command(init_db) cli.add_command(init_db)
cli.add_command(rm_cfg) cli.add_command(rm_cfg)
@ -69,9 +58,5 @@ cli.add_command(add_device)
cli.add_command(show_cfg) cli.add_command(show_cfg)
cli.add_command(sniff) cli.add_command(sniff)
cli.add_command(show_everything) cli.add_command(show_everything)
if __name__ == '__main__': if __name__ == '__main__':
cli() cli(auto_envvar_prefix='IOTTB', show_default=True, show_envvars=True)
for log in Path.cwd().iterdir():
log.chmod(0o777)

View File

@ -10,13 +10,12 @@ logger = logging.getLogger(__name__)
class DeviceMetadata: class DeviceMetadata:
def __init__(self, device_name, description="", model="", def __init__(self, device_name, description="", model="", manufacturer="", firmware_version="", device_type="",
manufacturer="", firmware_version="", device_type="", supported_interfaces="", companion_applications="", save_to_file=None):
supported_interfaces="", companion_applications="",
save_to_file=None):
self.device_id = str(uuid.uuid4()) self.device_id = str(uuid.uuid4())
self.device_name = device_name self.device_name = device_name
cn, aliases = make_canonical_name(device_name) cn, aliases = make_canonical_name(device_name)
logger.debug(f'cn, aliases = {cn}, {str(aliases)}')
self.aliases = aliases self.aliases = aliases
self.canonical_name = cn self.canonical_name = cn
self.date_added = datetime.now().isoformat() self.date_added = datetime.now().isoformat()

View File

@ -0,0 +1,4 @@
import logging
logger = logging.getLogger('iottb.sniff') # Log with sniff subcommand

View File

@ -34,7 +34,7 @@ def make_canonical_name(name):
parts = norm_name.split('-') parts = norm_name.split('-')
canonical_name = canonical_name = '-'.join(parts[:2]) canonical_name = canonical_name = '-'.join(parts[:2])
aliases.append(canonical_name) aliases.append(canonical_name)
aliases = list(set(aliases))
logger.debug(f'Canonical name: {canonical_name}') logger.debug(f'Canonical name: {canonical_name}')
logger.debug(f'Aliases: {aliases}') logger.debug(f'Aliases: {aliases}')
return canonical_name, aliases return canonical_name, list(set(aliases))

View File

View File

@ -1,34 +0,0 @@
IoT.db/
├── Device1/
│ ├── Rawdata/
│ │ ├── measurement#D1#1/
│ │ │ ├── capfile
│ │ │ └── meta
│ │ └── measurement#D1#2/
│ │ └── ...
│ ├── Experiments/
│ │ ├── exp1#D1/
│ │ │ └── files etc
│ │ └── exp2#D1/
│ │ └── ...
│ └── Device 1 (Fixed) metadata
├── Device2/
│ ├── Rawdata/
│ │ ├── measurement#D2#1/
│ │ │ ├── capfile
│ │ │ └── meta
│ │ └── ...
│ ├── Experiments/
│ │ ├── exp1#d2/
│ │ │ └── ...
│ │ └── ...
│ └── Device 2 fixed metadata
└── .../
├── .../
│ ├── ..
│ └── ..
├── .../
│ ├── .../
│ │ └── ...
│ └── ...
└── ...

View File

@ -1,34 +0,0 @@
IoT.db/
├── Device1/
│ ├── Rawdata/
│ │ ├── measurement#D1#1/
│ │ │ ├── capfile
│ │ │ └── meta
│ │ └── measurement#D1#2/
│ │ └── ...
│ ├── Experiments/
│ │ ├── exp1#D1/
│ │ │ └── files etc
│ │ └── exp2#D1/
│ │ └── ...
│ └── Device 1 (Fixed) metadata
├── Device2/
│ ├── Rawdata/
│ │ ├── measurement#D2#1/
│ │ │ ├── capfile
│ │ │ └── meta
│ │ └── ...
│ ├── Experiments/
│ │ ├── exp1#d2/
│ │ │ └── ...
│ │ └── ...
│ └── Device 2 fixed metadata
└── .../
├── .../
│ ├── ..
│ └── ..
├── .../
│ ├── .../
│ │ └── ...
│ └── ...
└── ...

View File

@ -1,51 +0,0 @@
Reasoning is that experiments might want data from measurements of multiple
devices.
IoT.db2/
├── Devices/
│ ├── Dev1/
│ │ ├── devmeta
│ │ └── Measurements/
│ │ ├── m1/
│ │ │ ├── raw
│ │ │ ├── meta
│ │ │ └── spec
│ │ └── m2/
│ │ └── ...
│ ├── Dev2/
│ │ ├── devmeta
│ │ └── Measurements/
│ │ ├── m1/
│ │ │ ├── raw
│ │ │ ├── meta
│ │ │ └── spec
│ │ ├── m2/
│ │ │ └── ...
│ │ ├── m3/
│ │ │ └── ...
│ │ └── ...
│ └── Dev3/
│ └── ....
└── Experiments/(Or projects? Or cleaned data)
├── E1/
│ ├── involved measurements
│ ├── filters/ feature extraction algo etc.
│ └── etcetc...
├── E2/
│ ├── .....
│ ├── ..
│ ├── ...
│ └── ..
└── ....
IoT.db3/
├── Measurements/
│ ├── m1/ (Specification of device in this substructure)
│ │ ├── follows from above
│ │ └── ...
│ ├── m2
│ └── ....
└── Experiments/
├── e1/
│ ├── follows from above
│ └── ...
├── e2
└── ...

View File

@ -1,15 +0,0 @@
Like IoTdb but has no opinion on experiments
IoT.db4/
├── Dev1/
│ ├── Measurements (basically raw data)/
│ │ ├── m1/
│ │ │ └── ....
│ │ └── m2/
│ │ └── ....
│ └── Cleaned?/Features extracted?/Merged?/
│ └── -- Where to put clean data?
├── Dev2/
│ └── Measurements/
│ └── ...
└── Algos/Scripts?/
└── ..

View File

@ -1,92 +0,0 @@
# Testbed
- What is a testbed?
- "[...] wissenschaftliche Plattform für Experimente" german [Wikipedia](https://de.wikipedia.org/wiki/Testbed)
- What is a "Platform"?
- Example [ORBIT](https://www.orbit-lab.org/) Testbed as wireless network emulator (software I guess) + computing resources. Essence of offered service: Predictable environment. What is tested: Applications and protocols.
- [APE](https://apetestbed.sourceforge.net/) "APE testbed is short for **Ad hoc Protocol Evaluation testbed**." But also ["What exaclty is APE"](https://apetestbed.sourceforge.net/#What_exactly_is_APE): "There is no clear definition of what a testbed is or what it comprises. APE however, can be seen as containing two things:
- An encapsulated execution environment, or more specifically, a small Linux distribution.
- Tools for post testrun data analysis."
- [DES-Testbed](https://www.des-testbed.net) Freie Universität Berlin. Random assortment of sometimes empy(?!) posts to a sort of bulletin board.
## IoT Automation Testbed
#### From Abstract:
In this project, the student de-
signs a testbed for the **automated analysis** of the **privacy implications** IoT devices, paying particular
attention to features that support reproducibility.
#### From Project description:
To study the privacy and security as-
pects of IoT devices **_systematically_** and **_reproducibly_** , we need an easy-to-use testbed that _automates_ the
**_process of experimenting_** with **_IoT devices_**.
**Automation recipes**:
Automate important aspects of experiments, in particular:
- Data Collection
- Analysis (= Experiment in most places)
**FAIR data storage**
making data
- Findable
- Accessible
- Interoperable
- Reusable
### Implications/Open questions
#### FAIR Data Storage
1. Who are the stakeholders? What is the scope of "FAIRness"?
1. PersonalDB? --> [X], Tiny scope, $\lnot$ FAIR almost by definition. would only be tool/ suggestion on layout.
2. ProjectDB? --> [X], no, probably a project _uses_ a testbed
3. Research Group --> Focues on **F a IR**. Accessibility _per se_ not an issue. Findability -> By machine AND Human. Interoperable --> Specs may rely on local/uni/group idiosyncracies.
4. AcademicDB --> (Strict)Subset of 3. Consider field-specific standards. Must start decerning between public/non-public parts of db/testbed. One may unwittingly leak privacy information: Like location, OS of capture host, usernames, absolute file paths etc.See [here](https://www.netresec.com/?page=Blog&month=2013-02&post=Forensics-of-Chinese-MITM-on-GitHub) and [pcapng.com](https://pcapng.com/) under "Metadata Block Types"
5. Public DB --> (Strict) Subset of 4.
2. Seems like something between 3. and 4. Some type of repository. Full Fledged DB? Probably unnecessary. Mix text + low spec like sqlite? Could still be tracked by git probably.
3. Interoperability $\cap$ Automation recipes --> Recipes built from and depend only on widly available, platform-independent tools.
4. Accessibility $\cap$ Autorec --> Built from and only depend on tools which are 1. widly available and (have permissive license OR have equivalent with permissive license). Human: Documentation.
5. Reusable $\cap$ Autorec --> Modular tools, and accessible (license, etc.) dependencies (e.g. experiment specific scripts).
6. Findable $\cap$ Autorec--> Must assume that recipe is found and selected manually by researcher.
7. Interoperable --> Collected Data (Measurements) across different must follow a schema which is meaning full for
#### Usage paths/ Workflows:
Data Collection --> Deposit in FAIR repository
Primary Experiment --> Define Spec. Write script/code --> Access FAIR repo for data. Possibly Access FAIR repo for predefined scripts --> Where do results go. Results "repo"
Replication Experiment --> Chose experiment/benchmark script from testbed. --> Execute --> Publish (Produces Replication Result, i.e. same "schema" as primary experiment)
Replication Experiment Variant --> Chose experiment/benchmark. add additional processing and input --> run --> posibbly publish
How to define static vs dynamic aspect of experiment?
Haven't even thought about encryption/decryption specifics....
But also could go like this:
First design analysis/experiment --> Collect data --> data cleaned according to testbed scripts --> #TODO
Get new device and want to perform some predefined tests --> first need to collect data
For _some_ device (unknown if data already exists) want to perform test _T_ --> run script with device spec as input -> Script checks if data already available; If not, perform data collection first -> run analysis on data --> publish results to results/benchmark repo of device; if was new device, open new results branch for that device and publish initial results. _Primary Experiment_ with data collection.
Types of Experiments:
"Full Stack": Data Collection + Analysis
"Model Test": Data Access (+ Sampling) + Model (Or complete Workflow). Test subject: Model
"Replicaton Experiment": _secondary_ data collection + testbed model + quality criteria? Test Subject: Collection scheme + analysis model = result
"Exploratory Collection + Analysis": aka unsupervised #TODO
**Note**:
#TODO What types of metadata are of interest. Are metadata simple, minimal compute features. Complicated extracted/computed features? Where do we draw the line.
#TODO Say for the same devices. When is data merged, when not? I.e. under what conditions can datasets automatically be enlarged? How is this tracked as to not tamper with reproducibility?
### Reproducibility:
What are we trying to reproduce?
What are possible results from experiments/tests?
Types of artifacts:
Static:
Raw data.
Labaled Data.
Computational/ Instructive:
Supervised Training. Input: Labaled Data + Learning algo. Output: Model.
Model Applicability Test: Input: unlabeled data + model. Output: Predication/Label
Feature Extraction: (raw, labeled?) data + extraction algo. Output: Labaled Dataset.
New Feature Test: labeled data + feature extraction algo + learning algo. Output: Model + Model Verification -> Usability of new features... ( #todo this case exemplifies why we need modularity: we want to apply/compose new "feature extraction algo" e.g. to all those devices where applicable and train new models and verify "goodness" of new features per device/dataset etc.... )
### data collection and cleaning (and features):
How uniform is the schema of data we want to collect accross IoT spectrum. Per device? Say two (possibly unrelated) datasets happen to share the same schema, can we just merge them, say, even if one set is from a VR headset and another from a roomba?
Is the scheema always the same e.g. (Timestamp, src ip, dst ip, (mac ports? or unused features), payload?, protocols?).
If testbed data contains uniform data --> only "one" extraction algo and dataset schema = all relevant features
Alternativly, testbed data is heterogeneous --> feature extracts defines interoperability/mergeability of datasets.
Training Algo: Flexible schema, output only usable on data with same schema(?)
Model Eval: Schema fixed, eval data must have correct schema
Say a project output is model which retrieves privacy relevant information from the network traffic of IoT device. #TODO how to guaranty applicability to other devices? What are the needs in the aftermath? Apply same model to other data? What of raw data schema match, but incompatible labels?
#todo schema <-> applicable privacy metric matching

View File

@ -1,11 +0,0 @@
### Completed:
- All Devices unpacked except [[xiaomi tv stick]].
- [[ledvance led strip]] wont enter pairing mode.
- [[echodot]] is setup and works.
- [[mi 360 home security camera]] needs microsd card.
## Plan for this week:
- Get microsd card
- MAINLY: Get AP working or find other way to capture traffic.
## Misc.:
Much time lost resetting router. [[ledvance led strip]] will only connect to 2.5GHz networks.
If laptop is connected to internet via ethernet, then I can make a AP, but iPhone wont connect to it. But IoT devices connect

View File

@ -1,4 +0,0 @@
- Bought two USB Wifi Adapters (Completes [[TODO1]]):
- tp-link AC1300 Archer T3U (Mini Wireless MU-MIMO USB Adapter).
- tp-link AC1300 Archer T3U Plus (High Gain Wireless Dual Band USB Adapter)

View File

@ -1,12 +0,0 @@
Plan: Setup wifi adapter to capture Amazon echodot.
Flow for setting up Access Point:
1. Setup Access Point
2. Configure Routing/Bridge or similar so IoT device can access internet.
Tried [linux-wifi-hotspot](https://github.com/lakinduakash/linux-wifi-hotspot) repo. Running it makes AP visible to iPhone, but issue is IP Address. Need to configure dhcp server or manually assign address.
Problem: Wifi Adapter In monitor mode sees nothing.
Neither Adapter has driver for modern macos
Archer T3U is using rtw_8822bu driver from kernel, this supports mac
Decide to go down hostapd route.

View File

@ -1,119 +0,0 @@
Example [hostapd.conf](http://w1.fi/cgit/hostap/plain/hostapd/hostapd.conf)
Simple article for basic setup [here](https://medium.com/p/3c18760e6f7e)
AP can be started an iPhone manages to connect. Now must 1:.ensure WPA2 or WPA3 and 2. enable ipmasquerading for internet connection. Then finally should be able to setup devices properly and start sniffing on traffic.
# 1st attempt AP setup
### Config files
File:`/etc/dnsmasq.d/dhcp-for-ap.conf`
Content:
```config
interface=wlp0s20f0u1
dhcp-range=10.0.0.3,10.0.0.20,12h
```
**BEWARE**: Must load above into `/etc/dnsmasq.conf` with a line that goes `conf-file=/etc/dnsmasq.d/dhcp-for-ap.conf` or `conf-dir=/etc/dnsmasq.d/,*.conf` see [here](https://wiki.archlinux.org/title/Dnsmasq#Configuration)
Other configs in `code/` directory.
## Used commands
See `code/` dir commit `devel@299912e` .
## Sanity Check
```bash
$ sudo hostapd ./hostapd.conf
# Output upon trying to connect with iPhone
wlp0s20f0u1: interface state UNINITIALIZED->ENABLED
wlp0s20f0u1: AP-ENABLED
wlp0s20f0u1: STA f2:10:60:95:28:05 IEEE 802.11: authenticated
wlp0s20f0u1: STA f2:10:60:95:28:05 IEEE 802.11: authenticated
wlp0s20f0u1: STA f2:10:60:95:28:05 IEEE 802.11: associated (aid 1)
wlp0s20f0u1: AP-STA-CONNECTED f2:10:60:95:28:05
wlp0s20f0u1: STA f2:10:60:95:28:05 RADIUS: starting accounting session 9C7F40AA0385E2B2
wlp0s20f0u1: STA f2:10:60:95:28:05 WPA: pairwise key handshake completed (RSN)
wlp0s20f0u1: EAPOL-4WAY-HS-COMPLETED f2:10:60:95:28:05
```
Connection established but no internet as expected.
## Test
*Input*
```bash
sudo ./initSwAP wlp
```
*Output*
```
net.ipv4.ip_forward = 1
wlp0s20f0u1: interface state UNINITIALIZED->ENABLED
wlp0s20f0u1: AP-ENABLED
wlp0s20f0u1: STA f2:10:60:95:28:05 IEEE 802.11: authenticated
wlp0s20f0u1: STA f2:10:60:95:28:05 IEEE 802.11: associated (aid 1)
wlp0s20f0u1: AP-STA-CONNECTED f2:10:60:95:28:05
wlp0s20f0u1: STA f2:10:60:95:28:05 RADIUS: starting accounting session C77A903F5D15F3B3
wlp0s20f0u1: STA f2:10:60:95:28:05 WPA: pairwise key handshake completed (RSN)
wlp0s20f0u1: EAPOL-4WAY-HS-COMPLETED f2:10:60:95:28:05
```
Unfortunatly still no internet connection.
## Analysis
Had forgot to import dhcp config file.
**Changes**: Add dnsmasq dhcp config and change wpa=3 to wpa=2 s.t. only WPA2 is used -> Now iPhone doesn't warn for security.
Unfortunatly still no internet connectino can be established.
## Todays 2nd attempt at Establishing an internet connection.
__Remarks/Observations:__
- iPhone connects to AP. Receieves IP Address `169.254.196.21` with subnet mask `255.255.0.0`. I
- P is a reserved non-routable for link-local ->Thus it seems that iPhone did not get an address from dhcp server.
- Could firewall be the problem? TODO -> iptables for dns and dhcp
- Maybe need to set static ip first etc as mentioned [here](https://woshub.com/create-wi-fi-access-point-hotspot-linux/)
```bash
# nano /etc/network/interfaces
auto wlp0s20f0u1
iface wlp0s20f0u1 inet static
address 10.10.0.1
netmask 255.255.255.0
```
- `/etc/network/interfaces` doesn't exist on my machine...
### Some configs to remember for later
dnsmasq:
```
#interface=wlp0s20f0u1
listen-address=10.0.0.2
dhcp-range=10.0.0.3,10.0.0.20,12h
dhcp-option=3,192.168.1.1
dhcp-option=6,192.168.1.1
domain-needed
bogus-priv
filterwin2k
server=1.1.1.1
no-hosts
```
Maybe need to enable ipv6 forwarding?
```
net.ipv4.ip_forward = 1
net.ipv4.conf.all.forwarding = 1
net.ipv6.conf.all.forwarding = 1
```
Flushing iptables: `iptables -F` flushes all tables. For more see [archwiki/iptables/Reset Rules](https://wiki.archlinux.org/title/Iptables#Resetting_rules)
- `sudo systemctl status iptables` says there is no such service unit!? -> Fedora uses [[firewalld]], which _is_ reported as running .........
#### Firewalld exploring
```bash
sudo firewall-cmd --get-active-zones
# Output:
# FedoraWorkstation (default)
# interfaces: wlp44s0
```
### Steps taken after restarting with [[firewalld]]
1. Followed steps in chapters 2.3.3 and 2.4 [here](https://wiki.archlinux.org/title/Internet_sharing#Enable_packet_forwarding). This should have enabled masquerading and have the ports ACCEPT for dns and dhcp.
2. Firewalld is not powerfull enough it seems
### nftables
* #TODO : What is the source of this info?!
Overview of a common configuration and packet flow
A host acting as a simple firewall and gateway may define only a small number of nft chains, each matching a kernel hook:
a prerouting chain, for all newly-arrived IP traffic
an input chain, for traffic addressed to the local host itself
an output chain, for traffic originating from the local host itself
a forward chain, for packets the host is asked to simply pass from one network to another
a postrouting chain for all IP traffic leaving the firewall
For configuration convenience and by convention, we group the input, output, and forward chains into a filter table. Most rules in setups like this attach to the forward chain.
If NAT is required, we follow the convention of creating a nat table to hold the prerouting and postrouting chains. Source-NAT rules (where we rewrite the packet source) attach to the postrouting chain, and destination-NAT rules (where we rewrite the packets destination) attach to the prerouting chain.
Packet flow is straightforward. Only one chain attaches to each hook. The first accept or drop rule a packet matches wins.

View File

@ -1,10 +0,0 @@
First success using mac mini.
Could record some data of amazon echo.
Setup gues network on router without any security, this enabled some capture since no keys had to be configured or handshakes captured (would be an issue without any channel controll)
Issue: Channalhopping -> missing a lot of traffic
To avoid channelhopping: Somehow fix the channel on router.
By leaving out any authentication/security config in hostapd.conf one can create an unsecured AP (on the usb wifi card) on my linux machine to. Having an open auth AP seems fine for this use case.
In the end this seems to be the way. For doing experiments we want to record all traffic. For this we cannot loose traffic just because we are not connected. This is why we'd want an access point we controll fully. We don't want to rely an some other router. But even then there would still be much manual config (channel, making an open access vlan or whatever).
Essentially we need to know the channel exaclty and don't want to deal with any more cryptography than we must. So, ideally we can create an AP on a laptop or local computer, using a low cost wifi adapter. (Since we are talking about testing IoT devices we must rely on wireless internet, since this is how virtually all of them work.) We should be able to configure that device to be an AP. Then we need to forward to whatever interface the experiment computer has internet access to.

View File

@ -1,7 +0,0 @@
New promising setup:
- Raspberry Pi 5
- Wired connection to router for internet
- Can very easily create wifi network and also connect to it (tested form iPhone 13)
- Can capture on the wifi card while still providing internet access to iPhone
- Sanity Test: Opening Youtube app on iPhone produces a large flow of QUIC packets, likely from the video that starts autoplaying.

View File

@ -1,177 +0,0 @@
# Commands to remember + sample output
Used commands: [[nmcli]], [[iw]], [[grep]], [[sed]]
Resources: [Capturing Wireless LAN Packets in Monitor Mode with iw](https://sandilands.info/sgordon/capturing-wifi-in-monitor-mode-with-iw)
Foreign BSSIDs have been made anonymous by replacing with `XX:XX:XX:XX:XX:XX`.
## [[nmcli]]
Useful for getting channel needed to setup monitor mode properly.
### `nmcli dev wifi`
```
IN-USE BSSID SSID MODE CHAN RATE SIGNAL BARS SECURITY
XX:XX:XX:XX:XX:XX FRITZ!Box 5490 PB Infra 6 195 Mbit/s 75 ▂▄▆_ WPA2
* 4C:1B:86:D1:06:7B LenbrO Infra 100 540 Mbit/s 67 ▂▄▆_ WPA2
4C:1B:86:D1:06:7C LenbrO Infra 6 260 Mbit/s 64 ▂▄▆_ WPA2
B8:BE:F4:4D:48:17 LenbrO Infra 1 130 Mbit/s 62 ▂▄▆_ WPA
XX:XX:XX:XX:XX:XX -- Infra 6 260 Mbit/s 60 ▂▄▆_ WPA2
XX:XX:XX:XX:XX:XX FRITZ!Box 5490 PB Infra 60 405 Mbit/s 37 ▂▄__ WPA2
XX:XX:XX:XX:XX:XX FRITZ!Box Fon WLAN 7360 BP Infra 1 130 Mbit/s 34 ▂▄__ WPA1 WPA2
XX:XX:XX:XX:XX:XX FRITZ!Box 5490 PB Infra 6 195 Mbit/s 34 ▂▄__ WPA2
XX:XX:XX:XX:XX:XX Sunrise_Wi-Fi_09FB29 Infra 7 540 Mbit/s 34 ▂▄__ WPA2 WPA3
XX:XX:XX:XX:XX:XX Madchenband Infra 11 260 Mbit/s 34 ▂▄__ WPA2
XX:XX:XX:XX:XX:XX LenbrO Infra 36 270 Mbit/s 34 ▂▄__ WPA2
XX:XX:XX:XX:XX:XX FibreBox_X6-01EF47 Infra 1 260 Mbit/s 32 ▂▄__ WPA2
XX:XX:XX:XX:XX:XX -- Infra 11 260 Mbit/s 32 ▂▄__ WPA2
XX:XX:XX:XX:XX:XX EEG-04666 Infra 1 405 Mbit/s 30 ▂___ WPA2
XX:XX:XX:XX:XX:XX Salt_2GHz_8A9170 Infra 11 260 Mbit/s 29 ▂___ WPA2
XX:XX:XX:XX:XX:XX -- Infra 11 260 Mbit/s 24 ▂___ WPA2
XX:XX:XX:XX:XX:XX FRITZ!Box 5490 PB Infra 60 405 Mbit/s 19 ▂___ WPA2
```
### `nmcli -t dev wifi`
```
XX\:XX\:XX\:XX\:XX\:XX:FRITZ!Box 5490 PB:Infra:6:195 Mbit/s:79:▂▄▆_:WPA2
:XX\:XX\:XX\:XX\:XX\:XX::Infra:6:260 Mbit/s:75:▂▄▆_:WPA2
:4C\:1B\:86\:D1\:06\:7C:LenbrO:Infra:6:260 Mbit/s:74:▂▄▆_:WPA2
*:4C\:1B\:86\:D1\:06\:7B:LenbrO:Infra:100:540 Mbit/s:72:▂▄▆_:WPA2
:B8\:BE\:F4\:4D\:48\:17:LenbrO:Infra:1:130 Mbit/s:65:▂▄▆_:WPA2
:XX\:XX\:XX\:XX\:XX\:XX:Sunrise_Wi-Fi_09FB29:Infra:7:540 Mbit/s:52:▂▄__:WPA2 WPA3
:XX\:XX\:XX\:XX\:XX\:XX:FRITZ!Box 5490 PB:Infra:60:405 Mbit/s:50:▂▄__:WPA2
:XX\:XX\:XX\:XX\:XX\:XX:FRITZ!Box Fon WLAN 7360 BP:Infra:1:130 Mbit/s:47:▂▄__:WPA1 WPA2
:XX\:XX\:XX\:XX\:XX\:XX:FRITZ!Box 5490 PB:Infra:6:195 Mbit/s:45:▂▄__:WPA2
:XX\:XX\:XX\:XX\:XX\:XX:Zentrum der Macht:Infra:1:195 Mbit/s:44:▂▄__:WPA2
:XX\:XX\:XX\:XX\:XX\:XX:FibreBox_X6-01EF47:Infra:1:260 Mbit/s:42:▂▄__:WPA2
:XX\:XX\:XX\:XX\:XX\:XX:Madchenband:Infra:11:260 Mbit/s:40:▂▄__:WPA2
:XX\:XX\:XX\:XX\:XX\:XX:LenbrO:Infra:36:270 Mbit/s:37:▂▄__:WPA2
:XX\:XX\:XX\:XX\:XX\:XX::Infra:11:260 Mbit/s:34:▂▄__:WPA2
:XX\:XX\:XX\:XX\:XX\:XX:EEG-04666:Infra:1:405 Mbit/s:30:▂___:WPA2
:XX\:XX\:XX\:XX\:XX\:XX:Salt_2GHz_8A9170:Infra:11:260 Mbit/s:29:▂___:WPA2
:XX\:XX\:XX\:XX\:XX\:XX:FRITZ!Box 5490 PB:Infra:60:405 Mbit/s:27:▂___:WPA2
:XX\:XX\:XX\:XX\:XX\:XX:Madchenband2.0:Infra:100:540 Mbit/s:25:▂___:WPA2
:XX\:XX\:XX\:XX\:XX\:XX::Infra:11:260 Mbit/s:24:▂___:WPA2
:XX\:XX\:XX\:XX\:XX\:XX:FibreBox_X6-01EF47:Infra:44:540 Mbit/s:20:▂___:WPA2
```
## [[iw]]
### `iw dev`
Useful to list interfaces and see which hardware they correspond to.
Can use that to create a monitor interface with an easier to remember name.
```
phy#1
Unnamed/non-netdev interface
wdev 0x100000002
addr 3c:21:9c:f2:e4:00
type P2P-device
Interface wlp44s0
ifindex 5
wdev 0x100000001
addr e6:bf:0c:3c:47:ba
ssid LenbrO
type managed
channel 100 (5500 MHz), width: 80 MHz, center1: 5530 MHz
txpower 22.00 dBm
multicast TXQ:
qsz-byt qsz-pkt flows drops marks overlmt hashcol tx-bytes tx-packets
0 0 0 0 0 0 0 0 0
phy#0
Interface mon0
ifindex 7
wdev 0x2
addr a8:42:a1:8b:f4:e3
type monitor
channel 6 (2437 MHz), width: 20 MHz (no HT), center1: 2437 MHz
txpower 20.00 dBm
Interface wlp0s20f0u6
ifindex 4
wdev 0x1
addr a8:42:a1:8b:f4:e3
type monitor
channel 6 (2437 MHz), width: 20 MHz (no HT), center1: 2437 MHz
txpower 20.00 dBm
multicast TXQ:
qsz-byt qsz-pkt flows drops marks overlmt hashcol tx-bytes tx-packets
0 0 0 0 0 0 0 0 0
```
Here, `phy#1` is my laptops built-in WiFi card, and `phy#0` is a WiFi USB adapter.
### `iw [phy phy<index> | phy#<index>] info | grep -f monitor -B 10`
```
➜ iw phy phy0 info | fgrep monitor -B 10
* CMAC-256 (00-0f-ac:13)
* GMAC-128 (00-0f-ac:11)
* GMAC-256 (00-0f-ac:12)
Available Antennas: TX 0x3 RX 0x3
Configured Antennas: TX 0x3 RX 0x3
Supported interface modes:
* IBSS
* managed
* AP
* AP/VLAN
* monitor
--
* register_beacons
* start_p2p_device
* set_mcast_rate
* connect
* disconnect
* set_qos_map
* set_multicast_to_unicast
* set_sar_specs
software interface modes (can always be added):
* AP/VLAN
* monitor
```
Can do better
### `iw phy#0 info | grep monitor`
```
* monitor
* monitor
```
Concise but possible need more context to be sure?
### `iw phy phy0 info | sed -n '/software interface modes/,/monitor/p'`
More concise but with good context. Assuming only sw interfaces need to support monitor mode
```
software interface modes (can always be added):
* AP/VLAN
* monitor
```
### Getting a monitor interface
```
iw phy#0 interface add mon0 type monitor
```
Add a easy interface to wifi hw and make it a monitor. Can check again with 'iw dev' to make sure it is really in monitor mode. If there is an other interface it must be taken down or deleted e.g with
```
iw dev <phy#0 other interface> del # or
ip link set <phy#0 other interface> down
```
Then to enable `mon0` interface,
```
ip link set mon0 up
```
To effectively capture packets, we should set the interface to the correct frequency. For this we get the channel e.g. via the above mentioned `nmcli dev wifi`. We can see that, e.g. the BSSID I am connected to (marked with `*`) is on channel 100. We can also see that it there is also a BSSID belonging to the same SSID with the interface on channel 6. I.e., it is running one interface in 2.4 GHz (802.11b/g/n/ax/be) and one in 5 GHz (802.11a/h/n/ac/ax/be). We chose which which channel to tune our `mon0` interface to, then we can lookup what the center frequency is on [wikipedia(List of Wifi Channels)](https://en.wikipedia.org/wiki/List_of_WLAN_channels). E.g. for channel 6 (i.e. 2.4 GHz radio) we see that the center frequency is 2437. We set our interface to that frequency:
```
iw dev mon0 set freq 2437
```
Now double check that the interface is in monitor mode and tunedto the correct frequency:
```
iw dev mon0 info
```
Should give an output like
```
Interface mon0
ifindex 7
wdev 0x2
addr a8:42:a1:8b:f4:e3
type monitor
wiphy 0
channel 6 (2437 MHz), width: 20 MHz (no HT), center1: 2437 MHz
txpower 20.00 dBm
```
This concludes preparing the wifi card for packet capture in monitor mode.
### [remarks]
- `sudo` is probably required for these commands
- These network tools are what is available on fedora 40, on $(uname -r)= 6.8.8 Linux Kernel. It might be that other OSs still use older tools, which are being phased out. But other operating systems might still be using older versions of these commands. For a table on how they match up, see [this](https://www.tecmint.com/deprecated-linux-networking-commands-and-their-replacements/) recent article (July 2023), according to which the old commands are even deprecated in recent Debian and Ubuntu releases.
- If smth is not working run `rfkill list` to check device is blocked. If it is, `rfkill unblock 0`, where `0` is the same index used above and represents `phy0` /`phy#0`.
- To ensure that [[NetworkManager]] not managing you card, `nmcli device set wlp0s20f0u6 managed no` if the interface is called `wlp0s20f0u6`. Check with `nmcli dev`, the STATE should be "unmanaged".
- See resources on how to put interface/wifi hardware back into managed mode, if you need the card for personal use.
# Important
Monitor mode is actually completely useless, unless we can observe the EAPOL handshake. That means the Wifi AP should be using WPA/WPA2 with psk. Also we need to know the SSID and passphrase. So it is still better if we can setup an environment where we can just do port mirroring from the wifi router, or setup ourselves in AP mode, but then we need to be able to bridge to the internet somehow, which I haven't managed reliably. Have done some testing on raspberry pi seemed to work. But raspberry pi sometimes goes to sleep so the AP goes down which means the IoT device loses connection.
If we happen to know the MAC address we need, then in wireshark we can filter `wlan.addr == [MAC]`. In tcpdump we can use the filter

View File

@ -1,15 +0,0 @@
# `IOTTB_HOME`
I introduced the environment variable `IOTTB_HOME` into the code. It is used to configure where the root of a iottb database is. #TODO this means that some code needs refactoring. But, I think it will streamline the code. The path in `IOTTB_HOME` shall be used to define the database root. Then, all the code handling adding devices and running captures can rely on the fact that a canonical home exists. Unfortunately I've hard coded quite a bit of ad-hoc configuration to use `Path.cwd()`, i.e. the current working directory, by default. So there will be some refactoring involved in switching over to using `IOTTB_HOME`s value as the default path.
# Adding Functionality
## Quick and dirty capture
I want to have a mode which just takes a command and runs it directly with its arguments.
The question is weather to only allow a preconfigured list of commands or in principle allow any command to be passed and write the output. I tend toward providing a subcommand for each utility we want to support. The question is what to do about the syntax errors of those commands. Maybe the thing to do is only write a file into the db if the command runs successfully.
### Refactoring the tcpdump capture
With the above idea it would be possible to also refactor or rewrite how tcpdump is called completely. But, the command has a lot of options and maybe its better also offer some guidance to users via `-h`, e.g. to only input the needed and correct filters for example. Choosing the wrong filter could make the capture potentially useless and one might only see that after the capture has completed.
## Converting pcap to csv
I want an option such that one can automatically convert a captures resulting file into a csv. Probably will focus on tcpdump for now, since other tools like [[mitmproxy]] have different output files.
## Defining Experiment
I want a pair of commands that 1. provide a guided cli interface to define an experiment and 2. to run that experiment -> Here [Collective Knowledge Framework](https://github.com/mlcommons/ck) might actually come in handy. The already have tooling for setting up and defining aspects of experiments so that they become reproducible. So maybe one part of the `iottb` as a tool would be to write the correct json files into the directory which contain the informatin on how the command was run. Caveat: All all option values are the same, basically only, if it was used or not (flagging options) or that it was used (e.g. an ip address was used in the filter but the specific value of the ip is of no use for reproducing). Also, Collective Minds tooling relies very common ML algos/framework and static data. So maybe this only comes into play after a capture has been done. So maybe a feature extraction tool (see [[further considerations#Usage paths/ Workflows]]) should create the data and built the database separately.
#remark TCP dump filter could also be exported into an environment variable? But then again what is the use of defining a conformance, then could use the raw capture idea for tcpdump, too.

View File

@ -1,13 +0,0 @@
`iottb sniff`:
min: nothing
min meaningfull: interface
min usefull: ip/mac addr of dev
good: ip/mac, device type
better:
`iottb device`
`add`: add new device config
`iottb db`
`init` initialize device database
`add` add device

Some files were not shown because too many files have changed in this diff Show More