Refactor various names.
This commit is contained in:
0
code/iottb/subcommands/__init__.py
Normal file
0
code/iottb/subcommands/__init__.py
Normal file
117
code/iottb/subcommands/capture.py
Normal file
117
code/iottb/subcommands/capture.py
Normal file
@@ -0,0 +1,117 @@
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
from iottb.definitions import *
|
||||
from iottb.models.device_metadata_model import DeviceMetadata
|
||||
|
||||
|
||||
def setup_capture_parser(subparsers):
|
||||
parser = subparsers.add_parser('sniff', help='Sniff packets with tcpdump')
|
||||
# metadata args
|
||||
parser.add_argument("-a", "--ip-address", help="IP address of the device to sniff", dest="device_ip")
|
||||
# tcpdump args
|
||||
parser_sniff_tcpdump = parser.add_argument_group('tcpdump arguments')
|
||||
parser_sniff_tcpdump.add_argument("-i", "--interface", help="Interface to capture on.", dest="capture_interface",
|
||||
default="any")
|
||||
parser_sniff_tcpdump.add_argument("-I", "--monitor-mode", help="Put interface into monitor mode",
|
||||
action="store_true")
|
||||
parser_sniff_tcpdump.add_argument("-n", help="Deactivate name resolution. Option is set by default.",
|
||||
action="store_true", dest="no_name_resolution")
|
||||
parser_sniff_tcpdump.add_argument("-#", "--number",
|
||||
help="Print packet number at beginning of line. Set by default.",
|
||||
action="store_true")
|
||||
parser_sniff_tcpdump.add_argument("-e", help="Print link layer headers. Option is set by default.",
|
||||
action="store_true", dest="print_link_layer")
|
||||
parser_sniff_tcpdump.add_argument("-t", action="count", default=0,
|
||||
help="Please see tcpdump manual for details. Unused by default.")
|
||||
# parser_sniff_tcpdump.add_argument("--filter",type=str,default="ip help=f"pcap filter expression. \
|
||||
# Defaults is '{default}'")
|
||||
# shared args
|
||||
cap_size_group = parser.add_mutually_exclusive_group(required=False)
|
||||
cap_size_group.add_argument("-c", "--count", type=int, help="Number of packets to capture.", default=0)
|
||||
cap_size_group.add_argument("--mins", type=int, help="Time in minutes to capture.", default=60)
|
||||
parser.set_defaults(func=handle_capture)
|
||||
# return parser
|
||||
# parser.add_default(func=handle_sniff(args=sniff_args))
|
||||
|
||||
|
||||
def cwd_is_device_root_dir() -> bool:
|
||||
device_metadata_file = Path.cwd() / DEVICE_METADATA_FILE
|
||||
return device_metadata_file.exists()
|
||||
|
||||
|
||||
def get_user_capture_config():
|
||||
pass
|
||||
|
||||
def start_guided_device_root_dir_setup():
|
||||
assert False, "Not implemented"
|
||||
|
||||
|
||||
def handle_metadata():
|
||||
assert not cwd_is_device_root_dir()
|
||||
print(f"Unable to find {DEVICE_METADATA_FILE} in current working directory")
|
||||
print("You need to setup a device root directory before using this command")
|
||||
response = input("Would you like to be guided through the setup? [y/n]")
|
||||
if response.lower() == "y":
|
||||
start_guided_device_root_dir_setup()
|
||||
else:
|
||||
print("'iottb init-device-root --help' for more information.")
|
||||
exit(ReturnCodes.ABORTED)
|
||||
# device_id = handle_capture_metadata()
|
||||
return ReturnCodes.SUCCESS
|
||||
|
||||
|
||||
def handle_capture_metadata():
|
||||
device_metadata_json = Path.cwd() / DEVICE_METADATA_FILE
|
||||
device_metadata = DeviceMetadata.load_from_json(device_metadata_json)
|
||||
device_id = device_metadata.device_id
|
||||
return device_id
|
||||
|
||||
|
||||
def handle_date_dir():
|
||||
pass
|
||||
|
||||
|
||||
def run_tcpdum(cmd):
|
||||
subprocess.run(cmd)
|
||||
|
||||
|
||||
def handle_capture(args):
|
||||
if cwd_is_device_root_dir():
|
||||
handle_date_dir()
|
||||
cmd = ['sudo tcpdump', '-i', args.capture_interface]
|
||||
if args.monitor_mode:
|
||||
cmd.append('-I')
|
||||
if args.no_name_resolution:
|
||||
cmd.append('-n')
|
||||
if args.number:
|
||||
cmd.append('-#')
|
||||
if args.print_link_layer:
|
||||
cmd.append('-e')
|
||||
if args.count:
|
||||
cmd.append('-c')
|
||||
cmd.append(str(args.count))
|
||||
elif args.mins:
|
||||
pass
|
||||
print('Executing: ' + ' '.join(cmd))
|
||||
# TODO maybe dump this into file -> put into device metadata
|
||||
try:
|
||||
start_time = datetime.now().strftime('%H:%M:%S')
|
||||
run_tcpdum(cmd)
|
||||
stop_time = datetime.now().strftime('%H:%M:%S')
|
||||
except KeyboardInterrupt:
|
||||
print("Received keyboard interrupt.")
|
||||
exit(ReturnCodes.ABORTED)
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"Failed to capture packet: {e}")
|
||||
exit(ReturnCodes.FAILURE)
|
||||
except Exception as e:
|
||||
print(f"Failed to capture packet: {e}")
|
||||
exit(ReturnCodes.FAILURE)
|
||||
|
||||
return ReturnCodes.SUCCESS
|
||||
else:
|
||||
handle_metadata()
|
||||
|
||||
|
||||
|
||||
0
code/iottb/subcommands/config.py
Normal file
0
code/iottb/subcommands/config.py
Normal file
40
code/iottb/subcommands/init.py
Normal file
40
code/iottb/subcommands/init.py
Normal file
@@ -0,0 +1,40 @@
|
||||
import pathlib
|
||||
|
||||
from iottb.definitions import DEVICE_METADATA_FILE
|
||||
from iottb.models.device_metadata_model import DeviceMetadata
|
||||
|
||||
|
||||
def setup_init_iottb_root_parser(subparsers):
|
||||
parser = subparsers.add_parser("init", aliases=["idr"])
|
||||
parser.add_argument("--root_dir", type=pathlib.Path, default=pathlib.Path.cwd())
|
||||
group = parser.add_mutually_exclusive_group()
|
||||
group.add_argument("--dynamic", action="store_true", help="enable guided setup", default=False)
|
||||
group.add_argument("-n", "--name", action="store", type=str, help="name of device")
|
||||
parser.set_defaults(func=handle_idr)
|
||||
|
||||
|
||||
def handle_idr(args):
|
||||
print("Entered iottb initialize-device-root")
|
||||
root_dir = args.root_dir
|
||||
device_name = None
|
||||
if args.dynamic:
|
||||
response = "N"
|
||||
while response == "N":
|
||||
name = input("Please enter name of device: ")
|
||||
# TODO extended config for other fields like apps, firmware etc.
|
||||
response = input(f"Confirm device name: {name} [y/N]")
|
||||
device_name = name
|
||||
else:
|
||||
device_name = args.name
|
||||
root_dir.mkdir(parents=True, exist_ok=True)
|
||||
root_dir.chdir()
|
||||
dev_metadata_model = DeviceMetadata(device_name)
|
||||
file_path = root_dir / device_name / DEVICE_METADATA_FILE
|
||||
assert not file_path.exists(), f"{file_path} already exists"
|
||||
if args.dynamic:
|
||||
response = input(f"Confirm device metadata: {dev_metadata_model.model_dump()} [y/N]")
|
||||
if response.lower() != "y":
|
||||
assert False, "TODO implement dynamic setup"
|
||||
code = dev_metadata_model.save_to_json(file_path)
|
||||
print(f"Device metadata saved to {file_path}")
|
||||
return code
|
||||
Reference in New Issue
Block a user