2024-06-18 03:12:40 +02:00

109 lines
4.2 KiB
Python

import subprocess
import re
from datetime import datetime
from pathlib import Path
import logging
from models.capture_metadata import CaptureMetadata
from models.device_metadata import DeviceMetadata
from utils.capture_utils import get_capture_src_folder, make_capture_src_folder
from utils.tcpdump_utils import check_installed
from utils.file_utils import ensure_directory_exists
from config import Config
logger = logging.getLogger('iottb.sniff')
def is_ip_address(address):
ip_pattern = re.compile(r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$")
return ip_pattern.match(address) is not None
def is_mac_address(address):
mac_pattern = re.compile(r"^([0-9A-Fa-f]{2}:){5}[0-9A-Fa-f]{2}$")
return mac_pattern.match(address) is not None
class Sniffer:
def __init__(self, device_id, capture_interface, address=None, safe=True):
#TODO Decide if we want to use the device_id as the device_name (seems unhandy)
self.device_id = device_id
self.capture_interface = capture_interface
self.address = address
self.safe = safe
self.config = Config().load_config()
self.device_path = self.initialize_device(device_id)
self.filter = self.generate_filter()
def initialize_device(self, device_id):
db_path = Path(self.config.get('database_path', '~/iottb.db')).expanduser()
device_path = db_path / device_id
ensure_directory_exists(device_path)
metadata_file = device_path / 'device_metadata.json'
if not metadata_file.exists():
device_metadata = DeviceMetadata(device_name=device_id, device_root_path=device_path)
device_metadata.save_to_file()
return device_path
def get_capture_metadata(self, capture_dir):
metadata = CaptureMetadata(device_id=self.device_id, capture_dir=capture_dir)
metadata.build_capture_file_name()
metadata.interface = self.capture_interface
metadata.device_ip_address = self.address or "No IP Address set"
return metadata
def generate_filter(self):
if not self.address and self.safe:
raise ValueError("Address must be provided in safe mode.")
if is_ip_address(self.address):
return f"host {self.address}"
elif is_mac_address(self.address):
return f"ether host {self.address}"
else:
raise ValueError("Invalid address format.")
def capture(self):
if not check_installed():
print('Please install tcpdump first')
return
capture_dir = make_capture_src_folder(get_capture_src_folder(self.device_path))
metadata = self.get_capture_metadata(capture_dir)
pcap_file = capture_dir / metadata.capture_file
cmd = ['sudo', 'tcpdump', '-i', self.capture_interface, '-w', str(pcap_file)]
if self.filter:
cmd.append(self.filter)
metadata.tcpdump_command = ' '.join(cmd)
print(f'Executing: {metadata.tcpdump_command}')
try:
metadata.start_time = datetime.now().isoformat()
subprocess.run(cmd, check=True)
metadata.stop_time = datetime.now().isoformat()
except subprocess.CalledProcessError as e:
logger.error(f'Failed to capture packets: {e}')
return
metadata.save_to_file()
print(f"Capture complete. Metadata saved to {capture_dir / 'metadata.json'}")
def setup_sniff_parser(subparsers):
parser = subparsers.add_parser('sniff', help='Sniff packets with tcpdump')
parser.add_argument('device_id', help='ID of the device to sniff')
parser.add_argument('-i', '--interface', required=True, help='Network interface to capture on')
parser.add_argument('-a', '--address', help='IP or MAC address to filter packets by')
parser.add_argument('-u', '--unsafe', action='store_true', help='Run in unsafe mode without supplying an address. '
'Highly discouraged.')
parser.set_defaults(func=handle_sniff)
def handle_sniff(args):
sniffer = Sniffer(device_id=args.device_id, capture_interface=args.interface, address=args.address,
safe=not args.unsafe)
sniffer.capture()