import subprocess import re from datetime import datetime from pathlib import Path import logging from models.capture_metadata import CaptureMetadata from models.device_metadata import DeviceMetadata from utils.capture_utils import get_capture_src_folder, make_capture_src_folder from utils.tcpdump_utils import check_installed from utils.file_utils import ensure_directory_exists from config import Config logger = logging.getLogger('iottb.sniff') def is_ip_address(address): ip_pattern = re.compile(r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$") return ip_pattern.match(address) is not None def is_mac_address(address): mac_pattern = re.compile(r"^([0-9A-Fa-f]{2}:){5}[0-9A-Fa-f]{2}$") return mac_pattern.match(address) is not None class Sniffer: def __init__(self, device_id, capture_interface, address=None, safe=True): #TODO Decide if we want to use the device_id as the device_name (seems unhandy) self.device_id = device_id self.capture_interface = capture_interface self.address = address self.safe = safe self.config = Config().load_config() self.device_path = self.initialize_device(device_id) self.filter = self.generate_filter() def initialize_device(self, device_id): db_path = Path(self.config.get('database_path', '~/iottb.db')).expanduser() device_path = db_path / device_id ensure_directory_exists(device_path) metadata_file = device_path / 'device_metadata.json' if not metadata_file.exists(): device_metadata = DeviceMetadata(device_name=device_id, device_root_path=device_path) device_metadata.save_to_file() return device_path def get_capture_metadata(self, capture_dir): metadata = CaptureMetadata(device_id=self.device_id, capture_dir=capture_dir) metadata.build_capture_file_name() metadata.interface = self.capture_interface metadata.device_ip_address = self.address or "No IP Address set" return metadata def generate_filter(self): if not self.address and self.safe: raise ValueError("Address must be provided in safe mode.") if is_ip_address(self.address): return f"host {self.address}" elif is_mac_address(self.address): return f"ether host {self.address}" else: raise ValueError("Invalid address format.") def capture(self): if not check_installed(): print('Please install tcpdump first') return capture_dir = make_capture_src_folder(get_capture_src_folder(self.device_path)) metadata = self.get_capture_metadata(capture_dir) pcap_file = capture_dir / metadata.capture_file cmd = ['sudo', 'tcpdump', '-i', self.capture_interface, '-w', str(pcap_file)] if self.filter: cmd.append(self.filter) metadata.tcpdump_command = ' '.join(cmd) print(f'Executing: {metadata.tcpdump_command}') try: metadata.start_time = datetime.now().isoformat() subprocess.run(cmd, check=True) metadata.stop_time = datetime.now().isoformat() except subprocess.CalledProcessError as e: logger.error(f'Failed to capture packets: {e}') return metadata.save_to_file() print(f"Capture complete. Metadata saved to {capture_dir / 'metadata.json'}") def setup_sniff_parser(subparsers): parser = subparsers.add_parser('sniff', help='Sniff packets with tcpdump') parser.add_argument('device_id', help='ID of the device to sniff') parser.add_argument('-i', '--interface', required=True, help='Network interface to capture on') parser.add_argument('-a', '--address', help='IP or MAC address to filter packets by') parser.add_argument('-u', '--unsafe', action='store_true', help='Run in unsafe mode without supplying an address. ' 'Highly discouraged.') parser.set_defaults(func=handle_sniff) def handle_sniff(args): sniffer = Sniffer(device_id=args.device_id, capture_interface=args.interface, address=args.address, safe=not args.unsafe) sniffer.capture()