2024-05-05 17:54:21 +02:00

126 lines
4.9 KiB
Python

import subprocess
import uuid
from pathlib import Path
from iottb.definitions import *
from iottb.models.device_metadata_model import DeviceMetadata
from iottb.models.capture_metadata_model import CaptureMetadata
def setup_capture_parser(subparsers):
parser = subparsers.add_parser('sniff', help='Sniff packets with tcpdump')
# metadata args
parser.add_argument("-a", "--ip-address", help="IP address of the device to sniff", dest="device_ip")
# tcpdump args
parser_sniff_tcpdump = parser.add_argument_group('tcpdump arguments')
parser_sniff_tcpdump.add_argument("-i", "--interface", help="Interface to capture on.", dest="capture_interface",
default="any")
parser_sniff_tcpdump.add_argument("-I", "--monitor-mode", help="Put interface into monitor mode",
action="store_true")
parser_sniff_tcpdump.add_argument("-n", help="Deactivate name resolution. Option is set by default.",
action="store_true", dest="no_name_resolution")
parser_sniff_tcpdump.add_argument("-#", "--number",
help="Print packet number at beginning of line. Set by default.",
action="store_true")
parser_sniff_tcpdump.add_argument("-e", help="Print link layer headers. Option is set by default.",
action="store_true", dest="print_link_layer")
parser_sniff_tcpdump.add_argument("-t", action="count", default=0,
help="Please see tcpdump manual for details. Unused by default.")
# parser_sniff_tcpdump.add_argument("--filter",type=str,default="ip help=f"pcap filter expression. \
# Defaults is '{default}'")
# shared args
cap_size_group = parser.add_mutually_exclusive_group(required=False)
cap_size_group.add_argument("-c", "--count", type=int, help="Number of packets to capture.", default=0)
cap_size_group.add_argument("--mins", type=int, help="Time in minutes to capture.", default=60)
parser.set_defaults(func=handle_capture)
# return parser
# parser.add_default(func=handle_sniff(args=sniff_args))
def cwd_is_device_root_dir() -> bool:
device_metadata_file = Path.cwd() / DEVICE_METADATA_FILE
return device_metadata_file.is_file()
def get_user_capture_config():
pass
def start_guided_device_root_dir_setup():
assert False, "Not implemented"
def handle_metadata():
assert not cwd_is_device_root_dir()
print(f"Unable to find {DEVICE_METADATA_FILE} in current working directory")
print("You need to setup a device root directory before using this command")
response = input("Would you like to be guided through the setup? [y/n]")
if response.lower() == "y":
start_guided_device_root_dir_setup()
else:
print("'iottb init-device-root --help' for more information.")
exit(ReturnCodes.ABORTED)
# device_id = handle_capture_metadata()
return ReturnCodes.SUCCESS
def get_device_id_from_file(device_metadata_filename: Path) -> str:
assert device_metadata_filename.is_file(), f"Device metadata file '{device_metadata_filename} does not exist"
device_metadata = DeviceMetadata.load_from_json(device_metadata_filename)
device_id = device_metadata.device_id
return device_id
def setup_capture_folder():
pass
def run_tcpdum(cmd):
try:
p = subprocess.run(cmd)
except KeyboardInterrupt:
pass
def generate_capfile_name():
name = datetime.now().strftime("%Y%m%d_%H%M%S") + uuid.uuid4().hex
def handle_capture(args):
assert args.device_root is not None, f"Device root directory is required"
device_metadata_file = Path.cwd() / DEVICE_METADATA_FILE
device_id = get_device_id_from_file(device_metadata_file)
assert device_metadata_file.is_file(), f"Device metadata file '{device_metadata_file} does not exist"
capture_dir = setup_capture_folder(args.dev_root)
cmd = ['sudo', 'tcpdump', '-i', args.capture_interface]
if args.monitor_mode:
cmd.append('-I')
if args.no_name_resolution:
cmd.append('-n')
if args.number:
cmd.append('-#')
if args.print_link_layer:
cmd.append('-e')
if args.count:
cmd.append('-c')
cmd.append(str(args.count))
elif args.mins:
pass
print('Executing: ' + ' '.join(cmd))
# TODO maybe dump this into file -> put into device metadata
try:
start_time = datetime.now().strftime('%H:%M:%S')
run_tcpdum(cmd)
stop_time = datetime.now().strftime('%H:%M:%S')
except KeyboardInterrupt:
print("Received keyboard interrupt.")
exit(ReturnCodes.ABORTED)
except subprocess.CalledProcessError as e:
print(f"Failed to capture packet: {e}")
exit(ReturnCodes.FAILURE)
except Exception as e:
print(f"Failed to capture packet: {e}")
exit(ReturnCodes.FAILURE)
return ReturnCodes.SUCCESS