HANDIN COMMIT

This commit is contained in:
2024-07-01 00:08:04 +02:00
parent 854fba049d
commit e62914e738
36 changed files with 195 additions and 883 deletions

View File

@@ -1,3 +0,0 @@
__pycache__
.venv
iottb.egg-info

View File

View File

@@ -1,30 +0,0 @@
import argparse
import logging
from pathlib import Path
from .commands.sniff import setup_sniff_parser
from .config import Config
from .utils.file_utils import ensure_directory_exists
def setup_logging():
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(name)s: %(message)s')
def main():
setup_logging()
parser = argparse.ArgumentParser(description='IoT Testbed')
subparsers = parser.add_subparsers()
setup_sniff_parser(subparsers)
args = parser.parse_args()
if hasattr(args, 'func'):
args.func(args)
else:
parser.print_help()
if __name__ == "__main__":
main()

View File

@@ -1,108 +0,0 @@
import subprocess
import re
from datetime import datetime
from pathlib import Path
import logging
from models.capture_metadata import CaptureMetadata
from models.device_metadata import DeviceMetadata
from utils.capture_utils import get_capture_src_folder, make_capture_src_folder
from utils.tcpdump_utils import check_installed
from utils.file_utils import ensure_directory_exists
from config import Config
logger = logging.getLogger('iottb.sniff')
def is_ip_address(address):
ip_pattern = re.compile(r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$")
return ip_pattern.match(address) is not None
def is_mac_address(address):
mac_pattern = re.compile(r"^([0-9A-Fa-f]{2}:){5}[0-9A-Fa-f]{2}$")
return mac_pattern.match(address) is not None
class Sniffer:
def __init__(self, device_id, capture_interface, address=None, safe=True):
#TODO Decide if we want to use the device_id as the device_name (seems unhandy)
self.device_id = device_id
self.capture_interface = capture_interface
self.address = address
self.safe = safe
self.config = Config().load_config()
self.device_path = self.initialize_device(device_id)
self.filter = self.generate_filter()
def initialize_device(self, device_id):
db_path = Path(self.config.get('database_path', '~/iottb.db')).expanduser()
device_path = db_path / device_id
ensure_directory_exists(device_path)
metadata_file = device_path / 'device_metadata.json'
if not metadata_file.exists():
device_metadata = DeviceMetadata(device_name=device_id, device_root_path=device_path)
device_metadata.save_to_file()
return device_path
def get_capture_metadata(self, capture_dir):
metadata = CaptureMetadata(device_id=self.device_id, capture_dir=capture_dir)
metadata.build_capture_file_name()
metadata.interface = self.capture_interface
metadata.device_ip_address = self.address or "No IP Address set"
return metadata
def generate_filter(self):
if not self.address and self.safe:
raise ValueError("Address must be provided in safe mode.")
if is_ip_address(self.address):
return f"host {self.address}"
elif is_mac_address(self.address):
return f"ether host {self.address}"
else:
raise ValueError("Invalid address format.")
def capture(self):
if not check_installed():
print('Please install tcpdump first')
return
capture_dir = make_capture_src_folder(get_capture_src_folder(self.device_path))
metadata = self.get_capture_metadata(capture_dir)
pcap_file = capture_dir / metadata.capture_file
cmd = ['sudo', 'tcpdump', '-i', self.capture_interface, '-w', str(pcap_file)]
if self.filter:
cmd.append(self.filter)
metadata.tcpdump_command = ' '.join(cmd)
print(f'Executing: {metadata.tcpdump_command}')
try:
metadata.start_time = datetime.now().isoformat()
subprocess.run(cmd, check=True)
metadata.stop_time = datetime.now().isoformat()
except subprocess.CalledProcessError as e:
logger.error(f'Failed to capture packets: {e}')
return
metadata.save_to_file()
print(f"Capture complete. Metadata saved to {capture_dir / 'metadata.json'}")
def setup_sniff_parser(subparsers):
parser = subparsers.add_parser('sniff', help='Sniff packets with tcpdump')
parser.add_argument('device_id', help='ID of the device to sniff')
parser.add_argument('-i', '--interface', required=True, help='Network interface to capture on')
parser.add_argument('-a', '--address', help='IP or MAC address to filter packets by')
parser.add_argument('-u', '--unsafe', action='store_true', help='Run in unsafe mode without supplying an address. '
'Highly discouraged.')
parser.set_defaults(func=handle_sniff)
def handle_sniff(args):
sniffer = Sniffer(device_id=args.device_id, capture_interface=args.interface, address=args.address,
safe=not args.unsafe)
sniffer.capture()

View File

@@ -1 +0,0 @@
{"database_path": "~/.iottb.db", "log_level": "INFO"}

View File

@@ -1,45 +0,0 @@
import json
from pathlib import Path
import logging
logger = logging.getLogger('iottb.config')
class Config:
DEFAULT_CONFIG = {
"database_path": "~/.iottb.db",
"log_level": "INFO"
}
def __init__(self, config_file=None):
self.config_file = Path(config_file or "config.json")
if not self.config_file.exists():
self.create_default_config()
else:
self.config = self.load_config()
def create_default_config(self):
try:
self.save_config(self.DEFAULT_CONFIG)
except (IsADirectoryError, PermissionError) as e:
logger.error(f"Error creating default config: {e}")
raise
def load_config(self):
try:
with open(self.config_file, "r") as file:
return json.load(file)
except IsADirectoryError as e:
logger.error(f"Error loading config: {e}")
raise
except PermissionError as e:
logger.error(f"Error loading config: {e}")
raise
def save_config(self, config):
try:
with open(self.config_file, "w") as f:
json.dump(config, f, indent=2)
except (IsADirectoryError, PermissionError) as e:
logger.error(f"Error saving config: {e}")
raise

View File

View File

@@ -1,29 +0,0 @@
import json
import uuid
from datetime import datetime
from pathlib import Path
class CaptureMetadata:
def __init__(self, device_id, capture_dir):
self.device_id = device_id
self.capture_id = str(uuid.uuid4())
self.capture_date = datetime.now().isoformat()
self.capture_dir = Path(capture_dir)
self.capture_file = ""
self.start_time = ""
self.stop_time = ""
self.tcpdump_command = ""
self.interface = ""
self.device_ip_address = ""
def build_capture_file_name(self):
self.capture_file = f"{self.device_id}_{self.capture_id}.pcap"
def to_dict(self):
return self.__dict__
def save_to_file(self, file_path=None):
file_path = file_path or self.capture_dir / 'metadata.json'
with open(file_path, 'w') as f:
json.dump(self.to_dict(), f, indent=4)

View File

@@ -1,27 +0,0 @@
import json
import uuid
from datetime import datetime
from pathlib import Path
class DeviceMetadata:
def __init__(self, device_name, device_root_path):
self.device_name = device_name
self.device_short_name = device_name.lower().replace(' ', '_')
self.device_id = str(uuid.uuid4())
self.date_created = datetime.now().isoformat()
self.device_root_path = Path(device_root_path)
def to_dict(self):
return self.__dict__
def save_to_file(self):
file_path = self.device_root_path / 'device_metadata.json'
with open(file_path, 'w') as f:
json.dump(self.to_dict(), f, indent=4)
@classmethod
def load_from_file(cls, file_path):
with open(file_path, 'r') as f:
data = json.load(f)
return cls(**data)

View File

@@ -1,18 +0,0 @@
[build-system]
requires = ["setuptools>=42", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "iottb"
version = "0.1.0"
authors = [{name = "Sebastian Lenzlinger", email = "sebastian.lenzlinger@unibas.ch"}]
description = "Automation Tool for Capturing Network packets of IoT devices."
requires-python = ">=3.8"
dependencies = []
[tool.setuptools.packages.find]
where = ["."]
exclude = ["tests*", "docs*"]
[project.scripts]
iottb = "iottb.__main__:main"

View File

@@ -1,55 +0,0 @@
#!/usr/bin/env bash
# Note, this is not my original work. Source: https://linuxtldr.com/changing-interface-mode/
function list_nic_info () {
ip addr show
}
function enable_monm_iw () {
interface=$1
sudo ip link set "$interface" down
sudo iw "$interface" set monitor control
sudo ip link set "$interface" up
}
function disable_monm_iw () {
interface=$1
sudo ip link set "$interface" down
sudo iw "$interface" set type managed
sudo ip link set "$interface" up
}
function enable_monm_iwconfig () {
interface=$1
sudo ifconfig "$interface" down
sudo iwconfig "$interface" mode monitor
sudo ifconfig "$interface" up
}
function disable_monm_iwconfig () {
interface=$1
sudo ifconfig "$interface" down
sudo iwconfig "$interface" mode managed
sudo ifconfig "$interface" up
}
function enable_monm_acng () {
interface=$1
sudo airmon-ng check
sudo airmon-ng check kill
sudo airmon-ng start "$interface"
}
function disable_monm_acng () {
interface="${1}mon"
sudo airmon-ng stop "$interface"
sudo systemctl restart NetworkManager
}
if declare -f "$1" > /dev/null
then
"$@"
else
echo "Unknown function '$1'" >&2
exit 1
fi

View File

@@ -1,15 +0,0 @@
{
"device_id": "",
"capture_id": "",
"capture_date": "",
"capture_file": "",
"start_time": "",
"stop_time": "",
"capture_duration": "",
"interfaces": "",
"device_ip_address": "",
"device_mac_address": "",
"contacted_ip_address": [],
"device_firmware_version": "",
"campanion_app": ""
}

View File

@@ -1,4 +0,0 @@
{
"database_path": "~/.iottb.db",
"log_level": "INFO"
}

View File

@@ -1,14 +0,0 @@
{
"device_id": "",
"device_name": "",
"device_short_name": "",
"date_created": "",
"description": "",
"model": "",
"manufacturer": "",
"firmware_version": "",
"device_type": "",
"supported_interfaces": "",
"companion_applications": "",
"last_metadata_update": ""
}

View File

@@ -1,43 +0,0 @@
import json
from pathlib import Path
from unittest import mock
from config import Config
import unittest
class TestConfig(unittest.TestCase):
def test_creates_new_config_file_if_not_exists(self):
config_path = Path("test_config.json")
if config_path.exists():
config_path.unlink()
config = Config(config_file=config_path)
self.assertTrue(config_path.exists())
config_path.unlink()
def test_writes_default_configuration_to_config_file(self):
config_path = Path("test_config.json")
if config_path.exists():
config_path.unlink()
config = Config(config_file=config_path)
with open(config_path, "r") as f:
data = json.load(f)
self.assertEqual(data, {"database_path": "~/.iottb.db", "log_level": "INFO"})
config_path.unlink()
@unittest.mock.patch("builtins.open", side_effect=PermissionError)
def test_config_file_path_not_writable(self, mock_open):
config_path = Path("test_config.json")
with self.assertRaises(PermissionError):
config = Config(config_file=config_path)
config.create_default_config()
def test_config_file_path_is_directory(self):
config_dir = Path("test_config_dir")
config_dir.mkdir(exist_ok=True)
with self.assertRaises(IsADirectoryError):
config = Config(config_file=config_dir)
config.create_default_config()
config_dir.rmdir()

View File

@@ -1,38 +0,0 @@
from pathlib import Path
# Generated by CodiumAI
import unittest
from utils.file_utils import ensure_directory_exists
class TestEnsureDirectoryExists(unittest.TestCase):
# creates directory if it does not exist
def test_creates_directory_if_not_exists(self):
path = Path('/tmp/testdir')
if path.exists():
path.rmdir()
ensure_directory_exists(path)
self.assertTrue(path.exists())
path.rmdir()
# does not create directory if it already exists
def test_does_not_create_directory_if_exists(self):
path = Path('/tmp/testdir')
path.mkdir(exist_ok=True)
ensure_directory_exists(path)
self.assertTrue(path.exists())
path.rmdir()
# path is a symbolic link
def test_path_is_a_symbolic_link(self):
target_dir = Path('/tmp/targetdir')
symlink_path = Path('/tmp/symlinkdir')
target_dir.mkdir(exist_ok=True)
symlink_path.symlink_to(target_dir)
ensure_directory_exists(symlink_path)
self.assertTrue(symlink_path.exists())
self.assertTrue(symlink_path.is_symlink())
symlink_path.unlink()
target_dir.rmdir()

View File

@@ -1,62 +0,0 @@
from commands.sniff import is_ip_address
import unittest
class TestIsIpAddress(unittest.TestCase):
def test_valid_ipv4_address_all_octets_in_range(self):
self.assertTrue(is_ip_address("192.168.1.1"))
self.assertTrue(is_ip_address("0.0.0.0"))
self.assertTrue(is_ip_address("255.255.255.255"))
def test_ipv4_address_with_leading_zeros(self):
self.assertTrue(is_ip_address("192.168.001.001"))
self.assertTrue(is_ip_address("0.0.0.0"))
self.assertTrue(is_ip_address("255.255.255.255"))
def test_ipv4_address_mixed_single_double_digit_octets(self):
self.assertTrue(is_ip_address("192.168.1.01"))
self.assertTrue(is_ip_address("0.0.0.0"))
self.assertTrue(is_ip_address("255.255.255.255"))
def test_ipv4_address_maximum_values_in_octets(self):
self.assertTrue(is_ip_address("255.255.255.255"))
self.assertTrue(is_ip_address("0.0.0.0"))
self.assertTrue(is_ip_address("192.168.1.1"))
def test_ipv4_address_minimum_values_in_octets(self):
self.assertTrue(is_ip_address("0.0.0.0"))
self.assertTrue(is_ip_address("192.168.1.1"))
self.assertTrue(is_ip_address("255.255.255.255"))
def test_ipv4_address_more_than_four_octets_invalid(self):
self.assertFalse(is_ip_address("192.168.1.1.1"))
self.assertFalse(is_ip_address("0.0.0.0.0"))
self.assertFalse(is_ip_address("255.255.255.255.255"))
def test_ipv4_address_fewer_than_four_octets_invalid(self):
self.assertFalse(is_ip_address("192.168.1"))
self.assertFalse(is_ip_address("0.0"))
self.assertFalse(is_ip_address("255"))
def test_ipv4_address_non_numeric_characters_invalid(self):
self.assertFalse(is_ip_address("192.a.b.c"))
self.assertFalse(is_ip_address("0.x.y.z"))
self.assertFalse(is_ip_address("255.q.w.e"))
def test_ipv4_address_octets_out_of_range_invalid(self):
self.assertFalse(is_ip_address("256.256.256.256"))
self.assertFalse(is_ip_address("300.300.300.300"))
self.assertFalse(is_ip_address("999.999.999.999"))
def test_ipv4_address_empty_string_invalid(self):
self.assertFalse(is_ip_address(""))
self.assertFalse(is_ip_address(" "))
self.assertFalse(is_ip_address(None))

View File

@@ -1,64 +0,0 @@
from commands.sniff import is_mac_address
import unittest
class TestIsMacAddress(unittest.TestCase):
def test_valid_mac_address_lowercase(self):
self.assertTrue(is_mac_address("aa:bb:cc:dd:ee:ff"))
self.assertFalse(is_mac_address("192.168.1.1"))
self.assertFalse(is_mac_address("aa:bb:cc:dd:ee:ff:gg"))
def test_valid_mac_address_uppercase(self):
self.assertTrue(is_mac_address("AA:BB:CC:DD:EE:FF"))
self.assertFalse(is_mac_address("10.0.0.1"))
self.assertFalse(is_mac_address("AA:BB:CC:DD:EE"))
def test_valid_mac_address_mixed_case(self):
self.assertTrue(is_mac_address("Aa:Bb:Cc:Dd:Ee:Ff"))
self.assertFalse(is_mac_address("172.16.0.1"))
self.assertFalse(is_mac_address("Aa:Bb:Cc:Dd:Ee:Ff:Gg"))
def test_valid_mac_address_digits(self):
self.assertTrue(is_mac_address("00:11:22:33:44:55"))
self.assertFalse(is_mac_address("8.8.8.8"))
self.assertFalse(is_mac_address("00:11:22:33:44"))
# returns False for an empty string
def test_empty_string(self):
self.assertFalse(is_mac_address(""))
self.assertFalse(is_mac_address(":"))
def test_invalid_characters(self):
self.assertFalse(is_mac_address("gh:ij:kl:mn:op:qr"))
self.assertFalse(is_mac_address("192.168.0.256"))
self.assertFalse(is_mac_address("ghij::klmn::opqr"))
# returns False for a MAC address with incorrect length
def test_incorrect_length(self):
self.assertFalse(is_mac_address("aa:bb:cc"))
self.assertFalse(is_mac_address("10.0.0.256"))
self.assertFalse(is_mac_address("aa::bb::cc::dd::ee::ff::gg"))
# returns False for a MAC address with missing colons
def test_missing_colons(self):
self.assertFalse(is_mac_address("aabbccddeeff"))
self.assertFalse(is_mac_address("127.0.0.1"))
self.assertFalse(is_mac_address("aabbccddeeffgg"))
# returns False for a MAC address with extra colons
def test_extra_colons(self):
self.assertFalse(is_mac_address("aa::bb::cc::dd::ee::ff"))
self.assertFalse(is_mac_address("192.168.1.256"))
self.assertFalse(is_mac_address("aa::bb::cc::dd::ee::ff::gg"))
# returns False for a MAC address with spaces
def test_spaces_in_mac(self):
self.assertFalse(is_mac_address("aa bb cc dd ee ff"))
self.assertFalse(is_mac_address("8.8.4.4"))
self.assertFalse(is_mac_address("aa bb cc dd ee ff gg"))

View File

@@ -1,20 +0,0 @@
from pathlib import Path
from datetime import datetime
def get_capture_src_folder(device_path):
today_str = datetime.now().strftime('%Y-%m-%d')
capture_base_path = device_path / today_str
capture_base_path.mkdir(parents=True, exist_ok=True)
existing_captures = [d for d in capture_base_path.iterdir() if d.is_dir()]
nth_capture = len(existing_captures) + 1
capture_dir = capture_base_path / f'capture_{nth_capture}'
capture_dir.mkdir(parents=True, exist_ok=True)
return capture_dir
def make_capture_src_folder(capture_src_folder):
capture_src_folder.mkdir(parents=True, exist_ok=True)
return capture_src_folder

View File

@@ -1,29 +0,0 @@
import matplotlib.pyplot as plt
import networkx as nx
# Create the graph
G1 = nx.DiGraph()
# Add nodes with positions
G1.add_node("IoT Device", pos=(1, 3))
G1.add_node("AP", pos=(3, 3))
G1.add_node("Switch (Port Mirroring Enabled)", pos=(5, 3))
G1.add_node("Gateway Router", pos=(7, 3))
G1.add_node("Internet", pos=(9, 3))
G1.add_node("Capture Device", pos=(5, 1))
# Add edges
G1.add_edge("IoT Device", "AP")
G1.add_edge("AP", "Switch (Port Mirroring Enabled)")
G1.add_edge("Switch (Port Mirroring Enabled)", "Gateway Router")
G1.add_edge("Gateway Router", "Internet")
G1.add_edge("Switch (Port Mirroring Enabled)", "Capture Device")
# Draw the graph
pos = nx.get_node_attributes(G1, 'pos')
plt.figure(figsize=(12, 8))
nx.draw(G1, pos, with_labels=True, node_size=3000, node_color='lightblue', font_size=10, font_weight='bold')
nx.draw_networkx_edge_labels(G1, pos, edge_labels={("Switch (Port Mirroring Enabled)", "Capture Device"): "Mirrored Traffic"}, font_color='red')
plt.title("IoT Device Connected via AP to Gateway Router via Switch with Port Mirroring Enabled")
plt.show()

View File

@@ -1,27 +0,0 @@
import matplotlib.pyplot as plt
import networkx as nx
# Create the graph
G2 = nx.DiGraph()
# Add nodes with positions
G2.add_node("IoT Device", pos=(1, 3))
G2.add_node("Capture Device (Hotspot)", pos=(3, 3))
G2.add_node("Ethernet Connection", pos=(5, 3))
G2.add_node("Gateway Router", pos=(7, 3))
G2.add_node("Internet", pos=(9, 3))
# Add edges
G2.add_edge("IoT Device", "Capture Device (Hotspot)")
G2.add_edge("Capture Device (Hotspot)", "Ethernet Connection")
G2.add_edge("Ethernet Connection", "Gateway Router")
G2.add_edge("Gateway Router", "Internet")
# Draw the graph
pos = nx.get_node_attributes(G2, 'pos')
plt.figure(figsize=(12, 8))
nx.draw(G2, pos, with_labels=True, node_size=3000, node_color='lightblue', font_size=10, font_weight='bold')
nx.draw_networkx_edge_labels(G2, pos, edge_labels={("Capture Device (Hotspot)", "Ethernet Connection"): "Bridged Traffic"}, font_color='red')
plt.title("Capture Device Provides Hotspot and Bridges to Ethernet for Internet")
plt.show()

View File

@@ -1,19 +0,0 @@
import json
from pathlib import Path
def load_json_template(template_path):
with open(template_path, 'r') as f:
return json.load(f)
def save_json(data, file_path):
with open(file_path, 'w') as f:
json.dump(data, f, indent=4)
def ensure_directory_exists(path):
path = Path(path)
if not path.exists():
path.mkdir(parents=True, exist_ok=True)
return path

View File

@@ -1,9 +0,0 @@
import subprocess
def check_installed():
try:
subprocess.run(['tcpdump', '--version'], check=True, capture_output=True)
return True
except subprocess.CalledProcessError:
return False