import subprocess from pathlib import Path from kydcap.config import * from kydcap.models.device_metadata_model import DeviceMetadata def setup_sniff_parser(subparsers): parser = subparsers.add_parser('sniff', help='Sniff packets with tcpdump') # metadata args parser.add_argument("-a", "--ip-address=", help="IP address of the device to sniff", dest="device_ip") # tcpdump args parser_sniff_tcpdump = parser.add_argument_group('tcpdump arguments') parser_sniff_tcpdump.add_argument("-i", "--interface=", help="Interface to capture on.", dest="capture_interface", default="any") parser_sniff_tcpdump.add_argument("-I", "--monitor-mode", help="Put interface into monitor mode", action="store_true") parser_sniff_tcpdump.add_argument("-n", help="Deactivate name resolution. Option is set by default.", action="store_true", dest="no_name_resolution") parser_sniff_tcpdump.add_argument("-#", "--number", help="Print packet number at beginning of line. Set by default.", action="store_true") parser_sniff_tcpdump.add_argument("-e", help="Print link layer headers. Option is set by default.", action="store_true", dest="print_link_layer") parser_sniff_tcpdump.add_argument("-t", action="count", default=0, help="Please see tcpdump manual for details. Unused by default.") # parser_sniff_tcpdump.add_argument("--filter",type=str,default="ip help=f"pcap filter expression. \ # Defaults is '{default}'") # shared args cap_size_group = parser.add_mutually_exclusive_group(required=False) cap_size_group.add_argument("-c", "--count", type=int, help="Number of packets to capture.", default=0) cap_size_group.add_argument("--mins", type=int, help="Time in minutes to capture.", default=60) parser.set_defaults(func=handle_sniff) # return parser # parser.add_default(func=handle_sniff(args=sniff_args)) def cwd_is_device_root_dir() -> bool: device_metadata_file = Path.cwd() / DEVICE_METADATA_FILE return device_metadata_file.exists() def start_guided_device_root_dir_setup(): assert False, "Not implemented" def handle_metadata(): assert not cwd_is_device_root_dir() print(f"Unable to find {DEVICE_METADATA_FILE} in current working directory") print("You need to setup a device root directory before using this command") response = input("Would you like to be guided through the setup? [y/n]") if response.lower() == "y": start_guided_device_root_dir_setup() else: print("'kydcap init-device-root --help' for more information.") exit(ReturnCodes.ABORTED) # device_id = handle_capture_metadata() return ReturnCodes.SUCCESS def handle_capture_metadata(): device_metadata_json = Path.cwd() / DEVICE_METADATA_FILE device_metadata = DeviceMetadata.load_from_json(device_metadata_json) device_id = device_metadata.device_id return device_id def handle_date_dir(): pass def run_tcpdum(cmd): subprocess.run(cmd) def handle_sniff(args): if cwd_is_device_root_dir(): handle_date_dir() cmd = ['sudo tcpdump', '-i', args.capture_interface] if args.monitor_mode: cmd.append('-I') if args.no_name_resolution: cmd.append('-n') if args.number: cmd.append('-#') if args.print_link_layer: cmd.append('-e') if args.count: cmd.append('-c') cmd.append(str(args.count)) elif args.mins: pass print('Executing: ' + ' '.join(cmd)) # TODO maybe dump this into file -> put into device metadata try: start_time = datetime.now().strftime('%H:%M:%S') run_tcpdum(cmd) stop_time = datetime.now().strftime('%H:%M:%S') except KeyboardInterrupt: print("Received keyboard interrupt.") exit(ReturnCodes.ABORTED) except subprocess.CalledProcessError as e: print(f"Failed to capture packet: {e}") exit(ReturnCodes.FAILURE) except Exception as e: print(f"Failed to capture packet: {e}") exit(ReturnCodes.FAILURE) return ReturnCodes.SUCCESS else: handle_metadata()