This commit is contained in:
Sebastian Lenzlinger
2024-06-28 20:43:23 +02:00
parent 87dd548a42
commit 5ccb9daca0
10 changed files with 253 additions and 12 deletions

View File

@@ -1,10 +1,12 @@
from pathlib import Path
import logging
import click
from iottb.main import DB_NAME, logger, CFG_FILE_PATH
from iottb.definitions import DB_NAME, CFG_FILE_PATH
from iottb.models.iottb_config import IottbConfig
logger = logging.getLogger(__name__)
@click.command()
@click.option('--file', default=DB_NAME)

View File

@@ -1,8 +1,133 @@
import click
import subprocess
import json
from pathlib import Path
import logging
from iottb import definitions
import re
from datetime import datetime
from iottb.definitions import APP_NAME, CFG_FILE_PATH
from iottb.models.iottb_config import IottbConfig
from iottb.utils.string_processing import make_canonical_name
# Setup logger
logger = logging.getLogger('iottb.sniff')
def is_ip_address(address):
ip_pattern = re.compile(r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$")
return ip_pattern.match(address) is not None
def is_mac_address(address):
mac_pattern = re.compile(r"^([0-9A-Fa-f]{2}:){5}[0-9A-Fa-f]{2}$")
return mac_pattern.match(address) is not None
def load_config(cfg_file):
"""Loads configuration from the given file path."""
with open(cfg_file, 'r') as config_file:
return json.load(config_file)
def validate_sniff(ctx, param, value):
logger.info('Validating sniff...')
if ctx.params.get('unsafe') and not value:
return None
if not ctx.params.get('unsafe') and not value:
raise click.BadParameter('Address is required unless --unsafe is set.')
return value
@click.command('sniff', help='Sniff packets with tcpdump')
@click.argument('device')
@click.option('-i', '--interface', callback=validate_sniff, help='Network interface to capture on',
envvar='IOTTB_CAPTURE_INTERFACE')
@click.option('-a', '--address', callback=validate_sniff, help='IP or MAC address to filter packets by',
envvar='IOTTB_CAPTURE_ADDRESS')
@click.option('--db', '--database', type=click.Path(exists=True, file_okay=False), envvar='IOTTB_DB',
help='Database of device. Only needed if not current default.')
@click.option('--unsafe', is_flag=True, default=False, envvar='IOTTB_UNSAFE', is_eager=True,
help='Disable checks for otherwise required options')
@click.option('--guided', is_flag=True, default=False)
def sniff(device, interface, address, db, unsafe, guided):
""" Sniff packets from a device """
logger.info('sniff command invoked')
# Step1: Load Config
config = IottbConfig(Path(CFG_FILE_PATH))
logger.debug(f'Config loaded: {config}')
# Step2: determine relevant database
database = db if db else config.default_database
path = config.default_db_location[database]
full_db_path = Path(path) / database
logger.debug(f'Full db path is {str(path)}')
# Check if it exists
assert full_db_path.is_dir(), "DB unexpectedly missing"
canonical_name = make_canonical_name(device)
click.echo(f'Using canonical device name {canonical_name}')
if not database_path:
logger.error('No default database path found in configuration')
click.echo('No default database path found in configuration')
return
# Verify device directory
device_path = Path(database_path) / device
if not device_path.exists():
logger.error(f'Device path {device_path} does not exist')
click.echo(f'Device path {device_path} does not exist')
return
# Generate filter
if not unsafe:
if is_ip_address(address):
packet_filter = f"host {address}"
elif is_mac_address(address):
packet_filter = f"ether host {address}"
else:
logger.error('Invalid address format')
click.echo('Invalid address format')
return
else:
packet_filter = None
# Prepare capture directory
capture_dir = device_path / 'captures' / datetime.now().strftime('%Y%m%d_%H%M%S')
capture_dir.mkdir(parents=True, exist_ok=True)
# Prepare capture file
pcap_file = capture_dir / f"{device}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pcap"
# Build tcpdump command
cmd = ['sudo', 'tcpdump', '-i', interface, '-w', str(pcap_file)]
if packet_filter:
cmd.append(packet_filter)
logger.info(f'Executing: {" ".join(cmd)}')
# Execute tcpdump
try:
subprocess.run(cmd, check=True)
click.echo(f"Capture complete. Saved to {pcap_file}")
except subprocess.CalledProcessError as e:
logger.error(f'Failed to capture packets: {e}')
click.echo(f'Failed to capture packets: {e}')
@click.command('sniff', help='Sniff packets with tcpdump')
@click.argument('device')
@click.option('-i', '--interface', required=False, help='Network interface to capture on', envvar='IOTTB_CAPTURE_INTERFACE')
@click.option('-a', '--address', required=True, help='IP or MAC address to filter packets by', envvar='IOTTB_CAPTURE_ADDRESS')
@click.option('--db', '--database', type=click.Path(exists=True, file_okay=False), envvar='IOTTB_DB',
help='Database of device. Only needed if not current default.')
@click.option('--unsafe', is_flag=True, default=False, envvar='IOTTB_UNSAFE',
help='Disable checks for otherwise required options')
@click.option('--guided', is_flag=True)
def sniff2(device, interface, address, cfg_file):
""" Sniff packets from a device """
logger.info('sniff command invoked')
# Step 1: Load Config
# Dependency: Config file must exist
config = IottbConfig(Path(CFG_FILE_PATH))
logger.debug(f'Config loaded: {config}')

View File

@@ -2,6 +2,7 @@ import click
from pathlib import Path
import logging
from iottb.commands.sniff import sniff
from iottb.commands.developer import set_key_in_table_to, rm_cfg, rm_dbs, show_cfg
##################################################
# Import package modules
@@ -41,7 +42,7 @@ def cli(ctx, verbosity, debug, cfg_file):
ctx.ensure_object(dict) # Make sure context is ready for use
logger.info("Starting execution.")
ctx.obj['CONFIG'] = IottbConfig(cfg_file) # Load configuration directly
# ctx.meta['FULL_PATH_CONFIG_FILE'] =
ctx.meta['FULL_PATH_CONFIG_FILE'] = str(cfg_file)
##################################################################################
@@ -55,6 +56,7 @@ cli.add_command(rm_dbs)
# noinspection PyTypeChecker
cli.add_command(add_device)
cli.add_command(show_cfg)
cli.add_command(sniff)
if __name__ == '__main__':
cli(auto_envvar_prefix='IOTTB', show_default=True, show_envvars=True)

View File

@@ -10,7 +10,7 @@ DB_NAME = definitions.DB_NAME
class IottbConfig:
""" Class to handle operations on the testbed configuration.
""" Class to handle testbed configuration.
TODO: Add instead of overwrite Database locations when initializing if a location with valid db
exists.

View File

@@ -0,0 +1,4 @@
import logging
logger = logging.getLogger('iottb.sniff') # Log with sniff subcommand

View File

@@ -1,10 +1,7 @@
import json
import click
from pathlib import Path
import logging
from logging.handlers import RotatingFileHandler
import sys
from iottb.models.iottb_config import IottbConfig
from logging.handlers import RotatingFileHandler
from iottb import definitions
from iottb.definitions import MAX_VERBOSITY, CONSOLE_LOG_FORMATS, APP_NAME, LOGFILE_LOG_FORMAT

View File

@@ -0,0 +1,7 @@
import click
import logging
from iottb import definitions
class IottbPrompt:
pass