Clean Slate (or not)
This commit is contained in:
@@ -47,22 +47,22 @@ def check_iottb_env():
|
||||
response = input('Do you want to create it now? [y/N]')
|
||||
logger.debug(f'response: {response}')
|
||||
if response.lower() != 'y':
|
||||
logger.debug(f'Not creating "{environ['IOTTB_HOME']}"')
|
||||
logger.debug(f'Not setting "IOTTB_HOME"')
|
||||
print('TODO')
|
||||
print("Aborting execution...")
|
||||
return ReturnCodes.ABORTED
|
||||
else:
|
||||
print(f'Creating "{environ['IOTTB_HOME']}"')
|
||||
print(f'Setting environment variable IOTTB_HOME""')
|
||||
Path(IOTTB_HOME_ABS).mkdir(parents=True,
|
||||
exist_ok=False) # Should always work since in 'not exist' code path
|
||||
return ReturnCodes.OK
|
||||
return ReturnCodes.SUCCESS
|
||||
logger.info(f'"{IOTTB_HOME_ABS}" exists.')
|
||||
# TODO: Check that it is a valid iottb dir or can we say it is valid by definition if?
|
||||
return ReturnCodes.OK
|
||||
return ReturnCodes.SUCCESS
|
||||
|
||||
|
||||
def main():
|
||||
if check_iottb_env() != ReturnCodes.OK:
|
||||
if check_iottb_env() != ReturnCodes.SUCCESS:
|
||||
exit(ReturnCodes.ABORTED)
|
||||
parser = setup_argparse()
|
||||
args = parser.parse_args()
|
||||
|
||||
@@ -12,11 +12,11 @@ from iottb.logger import logger
|
||||
class CaptureMetadata:
|
||||
# Required Fields
|
||||
device_metadata: DeviceMetadata
|
||||
capture_id: str = lambda: str(uuid.uuid4())
|
||||
|
||||
device_id: str
|
||||
capture_dir: Path
|
||||
capture_file: str
|
||||
capture_date: str = lambda: datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
|
||||
|
||||
|
||||
# Statistics
|
||||
start_time: str
|
||||
@@ -39,7 +39,8 @@ class CaptureMetadata:
|
||||
def __init__(self, device_metadata: DeviceMetadata, capture_dir: Path):
|
||||
logger.info(f'Creating CaptureMetadata model from DeviceMetadata: {device_metadata}')
|
||||
self.device_metadata = device_metadata
|
||||
|
||||
self.capture_id = str(uuid.uuid4())
|
||||
self.capture_date = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
|
||||
self.capture_dir = capture_dir
|
||||
assert capture_dir.is_dir(), f'Capture directory {capture_dir} does not exist'
|
||||
|
||||
@@ -47,7 +48,7 @@ class CaptureMetadata:
|
||||
logger.info(f'Building capture file name')
|
||||
if self.app is None:
|
||||
logger.debug(f'No app specified')
|
||||
prefix = self.device_metadata.device_short_name
|
||||
prefix = "iphone-14" #self.device_metadata.device_short_name
|
||||
else:
|
||||
logger.debug(f'App specified: {self.app}')
|
||||
assert str(self.app).strip() not in {'', ' '}, f'app is not a valid name: {self.app}'
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import logging
|
||||
import os
|
||||
import pathlib
|
||||
|
||||
from iottb import definitions
|
||||
@@ -10,9 +11,11 @@ logger.setLevel(logging.INFO) # Since module currently passes all tests
|
||||
|
||||
|
||||
def setup_init_device_root_parser(subparsers):
|
||||
#assert os.environ['IOTTB_HOME'] is not None, "IOTTB_HOME environment variable is not set"
|
||||
parser = subparsers.add_parser('add-device', aliases=['add-device-root', 'add'],
|
||||
help='Initialize a folder for a device.')
|
||||
parser.add_argument('--root_dir', type=pathlib.Path, default=pathlib.Path.cwd())
|
||||
parser.add_argument('--root_dir', type=pathlib.Path,
|
||||
default=definitions.IOTTB_HOME_ABS) # TODO: Refactor code to not use this or handle iottb here
|
||||
group = parser.add_mutually_exclusive_group()
|
||||
group.add_argument('--guided', action='store_true', help='Guided setup', default=False)
|
||||
group.add_argument('--name', action='store', type=str, help='name of device')
|
||||
@@ -20,14 +23,12 @@ def setup_init_device_root_parser(subparsers):
|
||||
|
||||
|
||||
def handle_add(args):
|
||||
# TODO: This whole function should be refactored into using the fact that IOTTB_HOME is set, and the dir exists
|
||||
logger.info(f'Add device handler called with args {args}')
|
||||
|
||||
args.root_dir.mkdir(parents=True,
|
||||
exist_ok=True) # else metadata.save_to_file will fail TODO: unclear what to assume
|
||||
|
||||
if args.guided:
|
||||
logger.debug('begin guided setup')
|
||||
metadata = guided_setup(args.root_dir)
|
||||
metadata = guided_setup(args.root_dir) # TODO refactor to use IOTTB_HOME
|
||||
logger.debug('guided setup complete')
|
||||
else:
|
||||
logger.debug('Setup through passed args: setup')
|
||||
@@ -36,7 +37,7 @@ def handle_add(args):
|
||||
return ReturnCodes.ERROR
|
||||
metadata = DeviceMetadata(args.name, args.root_dir)
|
||||
|
||||
file_path = args.root_dir / DEVICE_METADATA_FILE
|
||||
file_path = args.root_dir / DEVICE_METADATA_FILE # TODO IOTTB_HOME REFACTOR
|
||||
if file_path.exists():
|
||||
print('Directory already contains a metadata file. Aborting.')
|
||||
return ReturnCodes.ABORTED
|
||||
|
||||
@@ -2,11 +2,13 @@ import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
from iottb.definitions import *
|
||||
from iottb.logger import logger
|
||||
from iottb.models.capture_metadata_model import CaptureMetadata
|
||||
from iottb.models.device_metadata_model import DeviceMetadata, dir_contains_device_metadata
|
||||
from iottb.utils.capture_utils import get_capture_src_folder, make_capture_src_folder
|
||||
from iottb.utils.tcpdump_utils import check_installed
|
||||
|
||||
|
||||
def setup_capture_parser(subparsers):
|
||||
parser = subparsers.add_parser('sniff', help='Sniff packets with tcpdump')
|
||||
# metadata args
|
||||
@@ -33,7 +35,7 @@ def setup_capture_parser(subparsers):
|
||||
help='Please see tcpdump manual for details. Unused by default.')
|
||||
|
||||
cap_size_group = parser.add_mutually_exclusive_group(required=False)
|
||||
cap_size_group.add_argument('-c', '--count', type=int, help='Number of packets to capture.', default=1000)
|
||||
cap_size_group.add_argument('-c', '--count', type=int, help='Number of packets to capture.', default=10)
|
||||
cap_size_group.add_argument('--mins', type=int, help='Time in minutes to capture.', default=1)
|
||||
|
||||
parser.set_defaults(func=handle_capture)
|
||||
@@ -88,6 +90,7 @@ def handle_capture(args):
|
||||
assert args.device_root is not None, f'Device root directory is required'
|
||||
assert dir_contains_device_metadata(args.device_root), f'Device metadata file \'{args.device_root}\' does not exist'
|
||||
# get device metadata
|
||||
logger.info(f'Device root directory: {args.device_root}')
|
||||
if args.safe and not dir_contains_device_metadata(args.device_root):
|
||||
print(f'Supplied folder contains no device metadata. '
|
||||
f'Please setup a device root directory before using this command')
|
||||
@@ -98,6 +101,7 @@ def handle_capture(args):
|
||||
else:
|
||||
name = input('Please enter a device name: ')
|
||||
args.device_root.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
device_data = DeviceMetadata(name, args.device_root)
|
||||
# start constructing environment for capture
|
||||
capture_dir = get_capture_src_folder(args.device_root)
|
||||
@@ -152,7 +156,7 @@ def build_tcpdump_args(args, cmd, capture_metadata: CaptureMetadata):
|
||||
|
||||
capture_metadata.build_capture_file_name()
|
||||
cmd.append('-w')
|
||||
cmd.append(capture_metadata.capture_file)
|
||||
cmd.append(str(capture_metadata.capture_dir) + "/" + capture_metadata.capture_file)
|
||||
|
||||
if args.safe:
|
||||
cmd.append(f'host {args.device_ip}') # if not specified, filter 'any' implied by tcpdump
|
||||
@@ -160,7 +164,6 @@ def build_tcpdump_args(args, cmd, capture_metadata: CaptureMetadata):
|
||||
|
||||
return cmd
|
||||
|
||||
|
||||
# def capture_file_cmd(args, cmd, capture_dir, capture_metadata: CaptureMetadata):
|
||||
# capture_file_prefix = capture_metadata.get_device_metadata().get_device_short_name()
|
||||
# if args.app_name is not None:
|
||||
|
||||
Reference in New Issue
Block a user