Introduce complete refactoring.

This commit is contained in:
Sebastian Lenzlinger
2024-06-18 03:12:28 +02:00
parent b345474a89
commit 01954bd5a6
56 changed files with 754 additions and 206 deletions

View File

3
code/iottb/.gitignore vendored Normal file
View File

@@ -0,0 +1,3 @@
__pycache__
.venv
iottb.egg-info

View File

@@ -1,108 +1,30 @@
#!/usr/bin/env python3
import argparse
from os import environ
from pathlib import Path
import logging
from iottb.subcommands.add_device import setup_init_device_root_parser
# from iottb.subcommands.capture import setup_capture_parser
from iottb.subcommands.sniff import setup_sniff_parser
from iottb.utils.tcpdump_utils import list_interfaces
from iottb.definitions import IOTTB_HOME_ABS, ReturnCodes
from iottb.logger import setup_logging
from pathlib import Path
logger = logging.getLogger('iottbLogger.__main__')
logger.setLevel(logging.DEBUG)
from .commands.sniff import setup_sniff_parser
from .config import Config
from .utils.file_utils import ensure_directory_exists
######################
# Argparse setup
######################
def setup_argparse():
# create top level parser
root_parser = argparse.ArgumentParser(prog='iottb')
# shared options
root_parser.add_argument('--verbose', '-v', action='count', default=0)
# Group of args w.r.t iottb.db creation
group = root_parser.add_argument_group('database options')
group.add_argument('--db-home', default=Path.home() / 'IoTtb.db')
group.add_argument('--config-home', default=Path.home() / '.config' / 'iottb.conf', type=Path, )
group.add_argument('--user', default=Path.home().stem, type=Path, )
# configure subcommands
subparsers = root_parser.add_subparsers(title='subcommands', required=True, dest='command')
# setup_capture_parser(subparsers)
setup_init_device_root_parser(subparsers)
setup_sniff_parser(subparsers)
# Utility to list interfaces directly with iottb instead of relying on external tooling
interfaces_parser = subparsers.add_parser('list-interfaces', aliases=['li', 'if'],
help='List available network interfaces.')
interfaces_parser.set_defaults(func=list_interfaces)
return root_parser
###
# Where put ?!
###
class IoTdb:
def __init__(self, db_home=Path.home() / 'IoTtb.db', iottb_config=Path.home() / '.conf' / 'iottb.conf',
user=Path.home().stem):
self.db_home = db_home
self.config_home = iottb_config
self.default_filters_home = db_home / 'default_filters'
self.user = user
def create_db(self, mode=0o777, parents=False, exist_ok=False):
logger.info(f'Creating db at {self.db_home}')
try:
self.db_home.mkdir(mode=mode, parents=parents, exist_ok=exist_ok)
except FileExistsError:
logger.error(f'Database path already at {self.db_home} exists and is not a directory')
finally:
logger.debug(f'Leaving finally clause in create_db')
def create_device_tree(self, mode=0o777, parents=False, exist_ok=False):
logger.info(f'Creating device tree at {self.db_home / 'devices'}')
#TODO
def parse_db_config(self):
pass
def parse_iottb_config(self):
pass
def get_known_devices(self):
pass
def iottb_db_exists(db_home=Path.home() / 'IoTtb.db'):
res = db_home.is_dir()
def setup_logging():
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(name)s: %(message)s')
def main():
logger.debug(f'Pre setup_argparse()')
parser = setup_argparse()
logger.debug('Post setup_argparse().')
args = parser.parse_args()
logger.debug(f'Args parsed: {args}')
if args.command:
try:
args.func(args)
except KeyboardInterrupt:
print('Received keyboard interrupt. Exiting...')
exit(1)
except Exception as e:
logger.debug(f'Error in main: {e}')
print(f'Error: {e}')
# create_capture_directory(args.device_name)
if __name__ == '__main__':
setup_logging()
logger.debug("Debug level is working")
logger.info("Info level is working")
logger.warning("Warning level is working")
parser = argparse.ArgumentParser(description='IoT Testbed')
subparsers = parser.add_subparsers()
setup_sniff_parser(subparsers)
args = parser.parse_args()
if hasattr(args, 'func'):
args.func(args)
else:
parser.print_help()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,108 @@
import subprocess
import re
from datetime import datetime
from pathlib import Path
import logging
from models.capture_metadata import CaptureMetadata
from models.device_metadata import DeviceMetadata
from utils.capture_utils import get_capture_src_folder, make_capture_src_folder
from utils.tcpdump_utils import check_installed
from utils.file_utils import ensure_directory_exists
from config import Config
logger = logging.getLogger('iottb.sniff')
def is_ip_address(address):
ip_pattern = re.compile(r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$")
return ip_pattern.match(address) is not None
def is_mac_address(address):
mac_pattern = re.compile(r"^([0-9A-Fa-f]{2}:){5}[0-9A-Fa-f]{2}$")
return mac_pattern.match(address) is not None
class Sniffer:
def __init__(self, device_id, capture_interface, address=None, safe=True):
#TODO Decide if we want to use the device_id as the device_name (seems unhandy)
self.device_id = device_id
self.capture_interface = capture_interface
self.address = address
self.safe = safe
self.config = Config().load_config()
self.device_path = self.initialize_device(device_id)
self.filter = self.generate_filter()
def initialize_device(self, device_id):
db_path = Path(self.config.get('database_path', '~/iottb.db')).expanduser()
device_path = db_path / device_id
ensure_directory_exists(device_path)
metadata_file = device_path / 'device_metadata.json'
if not metadata_file.exists():
device_metadata = DeviceMetadata(device_name=device_id, device_root_path=device_path)
device_metadata.save_to_file()
return device_path
def get_capture_metadata(self, capture_dir):
metadata = CaptureMetadata(device_id=self.device_id, capture_dir=capture_dir)
metadata.build_capture_file_name()
metadata.interface = self.capture_interface
metadata.device_ip_address = self.address or "No IP Address set"
return metadata
def generate_filter(self):
if not self.address and self.safe:
raise ValueError("Address must be provided in safe mode.")
if is_ip_address(self.address):
return f"host {self.address}"
elif is_mac_address(self.address):
return f"ether host {self.address}"
else:
raise ValueError("Invalid address format.")
def capture(self):
if not check_installed():
print('Please install tcpdump first')
return
capture_dir = make_capture_src_folder(get_capture_src_folder(self.device_path))
metadata = self.get_capture_metadata(capture_dir)
pcap_file = capture_dir / metadata.capture_file
cmd = ['sudo', 'tcpdump', '-i', self.capture_interface, '-w', str(pcap_file)]
if self.filter:
cmd.append(self.filter)
metadata.tcpdump_command = ' '.join(cmd)
print(f'Executing: {metadata.tcpdump_command}')
try:
metadata.start_time = datetime.now().isoformat()
subprocess.run(cmd, check=True)
metadata.stop_time = datetime.now().isoformat()
except subprocess.CalledProcessError as e:
logger.error(f'Failed to capture packets: {e}')
return
metadata.save_to_file()
print(f"Capture complete. Metadata saved to {capture_dir / 'metadata.json'}")
def setup_sniff_parser(subparsers):
parser = subparsers.add_parser('sniff', help='Sniff packets with tcpdump')
parser.add_argument('device_id', help='ID of the device to sniff')
parser.add_argument('-i', '--interface', required=True, help='Network interface to capture on')
parser.add_argument('-a', '--address', help='IP or MAC address to filter packets by')
parser.add_argument('-u', '--unsafe', action='store_true', help='Run in unsafe mode without supplying an address. '
'Highly discouraged.')
parser.set_defaults(func=handle_sniff)
def handle_sniff(args):
sniffer = Sniffer(device_id=args.device_id, capture_interface=args.interface, address=args.address,
safe=not args.unsafe)
sniffer.capture()

1
code/iottb/config.json Normal file
View File

@@ -0,0 +1 @@
{"database_path": "~/.iottb.db", "log_level": "INFO"}

45
code/iottb/config.py Normal file
View File

@@ -0,0 +1,45 @@
import json
from pathlib import Path
import logging
logger = logging.getLogger('iottb.config')
class Config:
DEFAULT_CONFIG = {
"database_path": "~/.iottb.db",
"log_level": "INFO"
}
def __init__(self, config_file=None):
self.config_file = Path(config_file or "config.json")
if not self.config_file.exists():
self.create_default_config()
else:
self.config = self.load_config()
def create_default_config(self):
try:
self.save_config(self.DEFAULT_CONFIG)
except (IsADirectoryError, PermissionError) as e:
logger.error(f"Error creating default config: {e}")
raise
def load_config(self):
try:
with open(self.config_file, "r") as file:
return json.load(file)
except IsADirectoryError as e:
logger.error(f"Error loading config: {e}")
raise
except PermissionError as e:
logger.error(f"Error loading config: {e}")
raise
def save_config(self, config):
try:
with open(self.config_file, "w") as f:
json.dump(config, f, indent=2)
except (IsADirectoryError, PermissionError) as e:
logger.error(f"Error saving config: {e}")
raise

View File

@@ -1,41 +0,0 @@
from datetime import datetime
from enum import Flag, unique, global_enum
from pathlib import Path
'''
Defining IOTTB_HOME_ABS here implies that it be immutable.
It is used here so that one could configure it.
But after its used in __man__ this cannot be relied upon.
'''
IOTTB_HOME_ABS = Path().home() / 'IOTTB.db'
# TODO maybe wrap this into class to make it easier to pass along to different objects
# But will need more refactoring
DEVICE_METADATA_FILE = 'device_metadata.json'
CAPTURE_METADATA_FILE = 'capture_metadata.json'
TODAY_DATE_STRING = datetime.now().strftime('%d%b%Y').lower() # TODO convert to function in utils or so
CAPTURE_FOLDER_BASENAME = 'capture_###'
AFFIRMATIVE_USER_RESPONSE = {'yes', 'y', 'true', 'Y', 'Yes', 'YES'}
NEGATIVE_USER_RESPONSE = {'no', 'n', 'N', 'No'}
YES_DEFAULT = AFFIRMATIVE_USER_RESPONSE.union({'', ' '})
NO_DEFAULT = NEGATIVE_USER_RESPONSE.union({'', ' '})
@unique
@global_enum
class ReturnCodes(Flag):
SUCCESS = 0
ABORTED = 1
FAILURE = 2
UNKNOWN = 3
FILE_NOT_FOUND = 4
FILE_ALREADY_EXISTS = 5
INVALID_ARGUMENT = 6
INVALID_ARGUMENT_VALUE = 7
def iottb_home_abs():
return None

View File

@@ -1,35 +0,0 @@
import logging
import sys
import os
from logging.handlers import RotatingFileHandler
def setup_logging():
# Ensure the logs directory exists
log_directory = 'logs'
if not os.path.exists(log_directory):
os.makedirs(log_directory)
# Create handlers
file_handler = RotatingFileHandler(os.path.join(log_directory, 'iottb.log'), maxBytes=1048576, backupCount=5)
console_handler = logging.StreamHandler(sys.stdout)
# Create formatters and add it to handlers
file_fmt = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
console_fmt = logging.Formatter(
'%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s')
file_handler.setFormatter(file_fmt)
console_handler.setFormatter(console_fmt)
# Get the root logger and add handlers
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
root_logger.addHandler(file_handler)
root_logger.addHandler(console_handler)
# Prevent propagation to the root logger to avoid duplicate logs
root_logger.propagate = False
setup_logging()

View File

@@ -0,0 +1,29 @@
import json
import uuid
from datetime import datetime
from pathlib import Path
class CaptureMetadata:
def __init__(self, device_id, capture_dir):
self.device_id = device_id
self.capture_id = str(uuid.uuid4())
self.capture_date = datetime.now().isoformat()
self.capture_dir = Path(capture_dir)
self.capture_file = ""
self.start_time = ""
self.stop_time = ""
self.tcpdump_command = ""
self.interface = ""
self.device_ip_address = ""
def build_capture_file_name(self):
self.capture_file = f"{self.device_id}_{self.capture_id}.pcap"
def to_dict(self):
return self.__dict__
def save_to_file(self, file_path=None):
file_path = file_path or self.capture_dir / 'metadata.json'
with open(file_path, 'w') as f:
json.dump(self.to_dict(), f, indent=4)

View File

@@ -1,106 +0,0 @@
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Optional
from iottb.definitions import ReturnCodes, CAPTURE_METADATA_FILE
from iottb.models.device_metadata_model import DeviceMetadata
import logging
logger = logging.getLogger('iottbLogger.capture_metadata_model')
logger.setLevel(logging.DEBUG)
class CaptureMetadata:
# Required Fields
device_metadata: DeviceMetadata
device_id: str
capture_dir: Path
capture_file: str
# Statistics
start_time: str
stop_time: str
# tcpdump
packet_count: Optional[int]
pcap_filter: str = ''
tcpdump_command: str = ''
interface: str = ''
# Optional Fields
device_ip_address: str = 'No IP Address set'
device_mac_address: Optional[str] = None
app: Optional[str] = None
app_version: Optional[str] = None
firmware_version: Optional[str] = None
def __init__(self, device_metadata: DeviceMetadata, capture_dir: Path):
logger.info(f'Creating CaptureMetadata model from DeviceMetadata: {device_metadata}')
self.device_metadata = device_metadata
self.capture_id = str(uuid.uuid4())
self.capture_date = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
self.capture_dir = capture_dir
assert capture_dir.is_dir(), f'Capture directory {capture_dir} does not exist'
def build_capture_file_name(self):
logger.info(f'Building capture file name')
if self.app is None:
logger.debug(f'No app specified')
prefix = "iphone-14" #self.device_metadata.device_short_name
else:
logger.debug(f'App specified: {self.app}')
assert str(self.app).strip() not in {'', ' '}, f'app is not a valid name: {self.app}'
prefix = self.app.lower().replace(' ', '_')
# assert self.capture_dir is not None, f'{self.capture_dir} does not exist'
filename = f'{prefix}_{str(self.capture_id)}.pcap'
logger.debug(f'Capture file name: {filename}')
self.capture_file = filename
def save_capture_metadata_to_json(self, file_path: Path = Path(CAPTURE_METADATA_FILE)):
assert self.capture_dir.is_dir(), f'capture_dir is not a directory: {self.capture_dir}'
if file_path.is_file():
print(f'File {file_path} already exists, update instead.')
return ReturnCodes.FILE_ALREADY_EXISTS
metadata = self.to_json(indent=2)
with file_path.open('w') as file:
json.dump(metadata, file)
return ReturnCodes.SUCCESS
def to_json(self, indent=2):
# TODO: Where to validate data?
logger.info(f'Converting CaptureMetadata to JSON')
data = {}
# List of fields from CaptureData class, if fields[key]==True, then it is a required field
fields = {
'capture_id': True, #
'device_id': True,
'capture_dir': True,
'capture_file': False,
'capture_date': False,
'start_time': True,
'stop_time': True,
'packet_count': False,
'pcap_filter': False,
'tcpdump_command': False,
'interface': False,
'device_ip_address': False,
'device_mac_address': False,
'app': False,
'app_version': False,
'firmware_version': False
}
for field, is_mandatory in fields.items():
value = getattr(self, field, None)
if value not in [None, ''] or is_mandatory:
if value in [None, ''] and is_mandatory:
raise ValueError(f'Field {field} is required and cannot be empty.')
data[field] = str(value) if not isinstance(value, str) else value
logger.debug(f'Capture metadata: {data}')
return json.dumps(data, indent=indent)

View File

@@ -0,0 +1,27 @@
import json
import uuid
from datetime import datetime
from pathlib import Path
class DeviceMetadata:
def __init__(self, device_name, device_root_path):
self.device_name = device_name
self.device_short_name = device_name.lower().replace(' ', '_')
self.device_id = str(uuid.uuid4())
self.date_created = datetime.now().isoformat()
self.device_root_path = Path(device_root_path)
def to_dict(self):
return self.__dict__
def save_to_file(self):
file_path = self.device_root_path / 'device_metadata.json'
with open(file_path, 'w') as f:
json.dump(self.to_dict(), f, indent=4)
@classmethod
def load_from_file(cls, file_path):
with open(file_path, 'r') as f:
data = json.load(f)
return cls(**data)

View File

@@ -1,114 +0,0 @@
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Optional, List
# iottb modules
from iottb.definitions import ReturnCodes, DEVICE_METADATA_FILE
import logging
logger = logging.getLogger('iottbLogger.device_metadata_model')
logger.setLevel(logging.DEBUG)
# 3rd party libs
IMMUTABLE_FIELDS = {'device_name', 'device_short_name', 'device_id', 'date_created'}
class DeviceMetadata:
# Required fields
device_name: str
device_short_name: str
device_id: str
date_created: str
device_root_path: Path
# Optional Fields
aliases: Optional[List[str]] = None
device_type: Optional[str] = None
device_serial_number: Optional[str] = None
device_firmware_version: Optional[str] = None
date_updated: Optional[str] = None
capture_files: Optional[List[str]] = []
def __init__(self, device_name: str, device_root_path: Path):
self.device_name = device_name
self.device_short_name = device_name.lower().replace(' ', '_')
self.device_id = str(uuid.uuid4())
self.date_created = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
self.device_root_path = device_root_path
if not self.device_root_path or not self.device_root_path.is_dir():
logger.error(f'Invalid device root path: {device_root_path}')
raise ValueError(f'Invalid device root path: {device_root_path}')
logger.debug(f'Device name: {device_name}')
logger.debug(f'Device short_name: {self.device_short_name}')
logger.debug(f'Device root dir: {device_root_path}')
logger.info(f'Initialized DeviceMetadata model: {device_name}')
@classmethod
def load_from_json(cls, device_file_path: Path):
logger.info(f'Loading DeviceMetadata from JSON file: {device_file_path}')
assert device_file_path.is_file(), f'{device_file_path} is not a file'
assert device_file_path.name == DEVICE_METADATA_FILE, f'{device_file_path} is not a {DEVICE_METADATA_FILE}'
device_meta_filename = device_file_path
with device_meta_filename.open('r') as file:
metadata_json = json.load(file)
metadata_model_obj = cls.from_json(metadata_json)
return metadata_model_obj
def save_to_json(self, file_path: Path):
logger.info(f'Saving DeviceMetadata to JSON file: {file_path}')
if file_path.is_file():
print(f'File {file_path} already exists, update instead.')
return ReturnCodes.FILE_ALREADY_EXISTS
metadata = self.to_json(indent=2)
with file_path.open('w') as file:
json.dump(metadata, file)
return ReturnCodes.SUCCESS
@classmethod
def from_json(cls, metadata_json):
if isinstance(metadata_json, dict):
return DeviceMetadata(**metadata_json)
def to_json(self, indent=2):
# TODO: atm almost exact copy as in CaptureMetadata
data = {}
fields = {
'device_name': True,
'device_short_name': True,
'device_id': True,
'date_created': True,
'device_root_path': True,
'aliases': False,
'device_type': False,
'device_serial_number': False,
'device_firmware_version': False,
'date_updated': False,
'capture_files': False,
}
for field, is_mandatory in fields.items():
value = getattr(self, field, None)
if value not in [None, ''] or is_mandatory:
if value in [None, ''] and is_mandatory:
logger.debug(f'Mandatory field {field}: {value}')
raise ValueError(f'Field {field} is required and cannot be empty.')
data[field] = str(value) if not isinstance(value, str) else value
logger.debug(f'Device metadata: {data}')
return json.dumps(data, indent=indent)
def dir_contains_device_metadata(dir_path: Path):
if not dir_path.is_dir():
return False
else:
meta_file_path = dir_path / DEVICE_METADATA_FILE
print(f'Device metadata file path {str(meta_file_path)}')
if not meta_file_path.is_file():
return False
else:
return True

View File

@@ -3,14 +3,16 @@ requires = ["setuptools>=42", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = 'iottb'
version = '0.1.0'
name = "iottb"
version = "0.1.0"
authors = [{name = "Sebastian Lenzlinger", email = "sebastian.lenzlinger@unibas.ch"}]
description = "Automation Tool for Capturing Network packets of IoT devices."
requires-python = ">=3.8"
dependencies = []
[tool.setuptools]
packages = ["iottb"]
[tool.setuptools.packages.find]
where = ["."]
exclude = ["tests*", "docs*"]
[project.scripts]
iottb = "iottb.__main__:main"
iottb = "iottb.__main__:main"

View File

@@ -1,77 +0,0 @@
import logging
import os
import pathlib
from iottb import definitions
from iottb.definitions import DEVICE_METADATA_FILE, ReturnCodes
from iottb.models.device_metadata_model import DeviceMetadata
# logger.setLevel(logging.INFO) # Since module currently passes all tests
logger = logging.getLogger('iottbLogger.add_device')
logger.setLevel(logging.INFO)
def setup_init_device_root_parser(subparsers):
#assert os.environ['IOTTB_HOME'] is not None, "IOTTB_HOME environment variable is not set"
parser = subparsers.add_parser('add-device', aliases=['add-device-root', 'add'],
help='Initialize a folder for a device.')
parser.add_argument('--root_dir', type=pathlib.Path,
default=definitions.IOTTB_HOME_ABS) # TODO: Refactor code to not use this or handle iottb here
group = parser.add_mutually_exclusive_group()
group.add_argument('--guided', action='store_true', help='Guided setup', default=False)
group.add_argument('--name', action='store', type=str, help='name of device')
parser.set_defaults(func=handle_add)
def handle_add(args):
# TODO: This whole function should be refactored into using the fact that IOTTB_HOME is set, and the dir exists
logger.info(f'Add device handler called with args {args}')
if args.guided:
logger.debug('begin guided setup')
metadata = guided_setup(args.root_dir) # TODO refactor to use IOTTB_HOME
logger.debug('guided setup complete')
else:
logger.debug('Setup through passed args: setup')
if not args.name:
logger.error('No device name specified with unguided setup.')
return ReturnCodes.ERROR
metadata = DeviceMetadata(args.name, args.root_dir)
file_path = args.root_dir / DEVICE_METADATA_FILE # TODO IOTTB_HOME REFACTOR
if file_path.exists():
print('Directory already contains a metadata file. Aborting.')
return ReturnCodes.ABORTED
serialized_metadata = metadata.to_json()
response = input(f'Confirm device metadata: {serialized_metadata} [y/N]')
logger.debug(f'response: {response}')
if response not in definitions.AFFIRMATIVE_USER_RESPONSE:
print('Adding device aborted by user.')
return ReturnCodes.ABORTED
logger.debug(f'Device metadata file {file_path}')
if metadata.save_to_json(file_path) == ReturnCodes.FILE_ALREADY_EXISTS:
logger.error('File exists after checking, which should not happen.')
return ReturnCodes.ABORTED
print('Device metadata successfully created.')
return ReturnCodes.SUCCESS
def configure_metadata():
pass
def guided_setup(device_root) -> DeviceMetadata:
logger.info('Guided setup')
response = 'N'
device_name = ''
while response.upper() == 'N':
device_name = input('Please enter name of device: ')
response = input(f'Confirm device name: {device_name} [y/N] ')
if device_name == '' or device_name is None:
print('Name cannot be empty')
logger.warning('Name cannot be empty')
logger.debug(f'Response is {response}')
logger.debug(f'Device name is {device_name}')
return DeviceMetadata(device_name, device_root)

View File

@@ -1,179 +0,0 @@
import subprocess
from pathlib import Path
from iottb.definitions import *
import logging
from iottb.models.capture_metadata_model import CaptureMetadata
from iottb.models.device_metadata_model import DeviceMetadata, dir_contains_device_metadata
from iottb.utils.capture_utils import get_capture_src_folder, make_capture_src_folder
from iottb.utils.tcpdump_utils import check_installed
logger = logging.getLogger('iottbLogger.capture')
logger.setLevel(logging.DEBUG)
def setup_capture_parser(subparsers):
parser = subparsers.add_parser('sniff', help='Sniff packets with tcpdump')
# metadata args
parser.add_argument('-a', '--ip-address', help='IP address of the device to sniff', dest='device_ip')
# tcpdump args
parser.add_argument('device_root', help='Root folder for device to sniff',
type=Path, default=Path.cwd())
parser.add_argument('-s', '--safe', help='Ensure correct device root folder before sniffing', action='store_true')
parser.add_argument('--app', help='Application name to sniff', dest='app_name', default=None)
parser_sniff_tcpdump = parser.add_argument_group('tcpdump arguments')
parser_sniff_tcpdump.add_argument('-i', '--interface', help='Interface to capture on.', dest='capture_interface',
required=True)
parser_sniff_tcpdump.add_argument('-I', '--monitor-mode', help='Put interface into monitor mode',
action='store_true')
parser_sniff_tcpdump.add_argument('-n', help='Deactivate name resolution. True by default.',
action='store_true', dest='no_name_resolution')
parser_sniff_tcpdump.add_argument('-#', '--number',
help='Print packet number at beginning of line. True by default.',
action='store_true')
parser_sniff_tcpdump.add_argument('-e', help='Print link layer headers. True by default.',
action='store_true', dest='print_link_layer')
parser_sniff_tcpdump.add_argument('-t', action='count', default=0,
help='Please see tcpdump manual for details. Unused by default.')
cap_size_group = parser.add_mutually_exclusive_group(required=False)
cap_size_group.add_argument('-c', '--count', type=int, help='Number of packets to capture.', default=10)
cap_size_group.add_argument('--mins', type=int, help='Time in minutes to capture.', default=1)
parser.set_defaults(func=handle_capture)
def cwd_is_device_root_dir() -> bool:
device_metadata_file = Path.cwd() / DEVICE_METADATA_FILE
return device_metadata_file.is_file()
def start_guided_device_root_dir_setup():
assert False, 'Not implemented'
def handle_metadata():
assert not cwd_is_device_root_dir()
print(f'Unable to find {DEVICE_METADATA_FILE} in current working directory')
print('You need to setup a device root directory before using this command')
response = input('Would you like to be guided through the setup? [y/n]')
if response.lower() == 'y':
start_guided_device_root_dir_setup()
else:
print('\'iottb init-device-root --help\' for more information.')
exit(ReturnCodes.ABORTED)
# device_id = handle_capture_metadata()
return ReturnCodes.SUCCESS
def get_device_metadata_from_file(device_metadata_filename: Path) -> str:
assert device_metadata_filename.is_file(), f'Device metadata file f"{device_metadata_filename}" does not exist'
device_metadata = DeviceMetadata.load_from_json(device_metadata_filename)
return device_metadata
def run_tcpdump(cmd):
# TODO: Maybe specify files for stout and stderr
try:
p = subprocess.run(cmd, capture_output=True, text=True, check=True)
if p.returncode != 0:
print(f'Error running tcpdump {p.stderr}')
# TODO add logging
else:
print(f'tcpdump run successfully\n: {p.stdout}')
except KeyboardInterrupt:
pass
def handle_capture(args):
if not check_installed():
print('Please install tcpdump first')
exit(ReturnCodes.ABORTED)
assert args.device_root is not None, f'Device root directory is required'
assert dir_contains_device_metadata(args.device_root), f'Device metadata file \'{args.device_root}\' does not exist'
# get device metadata
logger.info(f'Device root directory: {args.device_root}')
if args.safe and not dir_contains_device_metadata(args.device_root):
print(f'Supplied folder contains no device metadata. '
f'Please setup a device root directory before using this command')
exit(ReturnCodes.ABORTED)
elif dir_contains_device_metadata(args.device_root):
device_metadata_filename = args.device_root / DEVICE_METADATA_FILE
device_data = DeviceMetadata.load_from_json(device_metadata_filename)
else:
name = input('Please enter a device name: ')
args.device_root.mkdir(parents=True, exist_ok=True)
device_data = DeviceMetadata(name, args.device_root)
# start constructing environment for capture
capture_dir = get_capture_src_folder(args.device_root)
make_capture_src_folder(capture_dir)
capture_metadata = CaptureMetadata(device_data, capture_dir)
capture_metadata.interface = args.capture_interface
cmd = ['sudo', 'tcpdump', '-i', args.capture_interface]
cmd = build_tcpdump_args(args, cmd, capture_metadata)
capture_metadata.tcpdump_command = cmd
print('Executing: ' + ' '.join(cmd))
# run capture
try:
start_time = datetime.now().strftime('%H:%M:%S')
run_tcpdump(cmd)
stop_time = datetime.now().strftime('%H:%M:%S')
capture_metadata.start_time = start_time
capture_metadata.stop_time = stop_time
except KeyboardInterrupt:
print('Received keyboard interrupt.')
exit(ReturnCodes.ABORTED)
except subprocess.CalledProcessError as e:
print(f'Failed to capture packet: {e}')
exit(ReturnCodes.FAILURE)
except Exception as e:
print(f'Failed to capture packet: {e}')
exit(ReturnCodes.FAILURE)
return ReturnCodes.SUCCESS
def build_tcpdump_args(args, cmd, capture_metadata: CaptureMetadata):
if args.monitor_mode:
cmd.append('-I')
if args.no_name_resolution:
cmd.append('-n')
if args.number:
cmd.append('-#')
if args.print_link_layer:
cmd.append('-e')
if args.count:
cmd.append('-c')
cmd.append(str(args.count))
elif args.mins:
assert False, 'Unimplemented option'
if args.app_name is not None:
capture_metadata.app = args.app_name
capture_metadata.build_capture_file_name()
cmd.append('-w')
cmd.append(str(capture_metadata.capture_dir) + "/" + capture_metadata.capture_file)
if args.safe:
cmd.append(f'host {args.device_ip}') # if not specified, filter 'any' implied by tcpdump
capture_metadata.device_id = args.device_ip
return cmd
# def capture_file_cmd(args, cmd, capture_dir, capture_metadata: CaptureMetadata):
# capture_file_prefix = capture_metadata.get_device_metadata().get_device_short_name()
# if args.app_name is not None:
# capture_file_prefix = args.app_name
# capture_metadata.set_app(args.app_name)
# capfile_name = capture_file_prefix + '_' + str(capture_metadata.get_capture_id()) + '.pcap'
# capture_metadata.set_capture_file(capfile_name)
# capfile_abs_path = capture_dir / capfile_name
# capture_metadata.set_capture_file(capfile_name)
# cmd.append('-w')
# cmd.append(str(capfile_abs_path))

View File

@@ -1,63 +0,0 @@
import subprocess
import logging
logger = logging.getLogger('iottbLogger.capture')
logger.setLevel(logging.DEBUG)
class Sniffer:
def __init__(self):
pass
def setup_sniff_parser(subparsers):
parser = subparsers.add_parser('sniff', help='Sniff packets with tcpdump')
# metadata args
parser.add_argument('-a', '--addr', help='IP or MAC address of IoT device')
# tcpdump args
parser.add_argument('--app', help='Application name to sniff', default=None)
parser_sniff_tcpdump = parser.add_argument_group('tcpdump arguments')
parser_sniff_tcpdump.add_argument('-i', '--interface', help='Interface to capture on.', dest='capture_interface',
required=True)
parser_sniff_tcpdump.add_argument('-I', '--monitor-mode', help='Put interface into monitor mode',
action='store_true')
parser_sniff_tcpdump.add_argument('-n', help='Deactivate name resolution. True by default.',
action='store_true', dest='no_name_resolution')
parser_sniff_tcpdump.add_argument('-#', '--number',
help='Print packet number at beginning of line. True by default.',
action='store_true')
parser_sniff_tcpdump.add_argument('-e', help='Print link layer headers. True by default.',
action='store_true', dest='print_link_layer')
parser_sniff_tcpdump.add_argument('-t', action='count', default=0,
help='Please see tcpdump manual for details. Unused by default.')
cap_size_group = parser.add_mutually_exclusive_group(required=False)
cap_size_group.add_argument('-c', '--count', type=int, help='Number of packets to capture.', default=10)
cap_size_group.add_argument('--mins', type=int, help='Time in minutes to capture.', default=1)
parser.set_defaults(func=sniff)
def parse_addr(addr):
#TODO Implement
pass
def sniff(args):
if args.addr is None:
print('You must supply either a MAC or IP(v4) address to use this tool!')
logger.info("Exiting on account of missing MAC/IP.")
exit(1)
else:
(type, value) = parse_addr(args.addr)
#TODO Get this party started
def sniff_tcpdump(args, filter):
pass
def sniff_mitmproxy(args, filter):
pass
def sniff_raw(cmd,args):
pass

View File

@@ -0,0 +1,15 @@
{
"device_id": "",
"capture_id": "",
"capture_date": "",
"capture_file": "",
"start_time": "",
"stop_time": "",
"capture_duration": "",
"interfaces": "",
"device_ip_address": "",
"device_mac_address": "",
"contacted_ip_address": [],
"device_firmware_version": "",
"campanion_app": ""
}

View File

@@ -0,0 +1,4 @@
{
"database_path": "~/.iottb.db",
"log_level": "INFO"
}

View File

@@ -0,0 +1,14 @@
{
"device_id": "",
"device_name": "",
"device_short_name": "",
"date_created": "",
"description": "",
"model": "",
"manufacturer": "",
"firmware_version": "",
"device_type": "",
"supported_interfaces": "",
"companion_applications": "",
"last_metadata_update": ""
}

View File

@@ -0,0 +1,43 @@
import json
from pathlib import Path
from unittest import mock
from config import Config
import unittest
class TestConfig(unittest.TestCase):
def test_creates_new_config_file_if_not_exists(self):
config_path = Path("test_config.json")
if config_path.exists():
config_path.unlink()
config = Config(config_file=config_path)
self.assertTrue(config_path.exists())
config_path.unlink()
def test_writes_default_configuration_to_config_file(self):
config_path = Path("test_config.json")
if config_path.exists():
config_path.unlink()
config = Config(config_file=config_path)
with open(config_path, "r") as f:
data = json.load(f)
self.assertEqual(data, {"database_path": "~/.iottb.db", "log_level": "INFO"})
config_path.unlink()
@unittest.mock.patch("builtins.open", side_effect=PermissionError)
def test_config_file_path_not_writable(self, mock_open):
config_path = Path("test_config.json")
with self.assertRaises(PermissionError):
config = Config(config_file=config_path)
config.create_default_config()
def test_config_file_path_is_directory(self):
config_dir = Path("test_config_dir")
config_dir.mkdir(exist_ok=True)
with self.assertRaises(IsADirectoryError):
config = Config(config_file=config_dir)
config.create_default_config()
config_dir.rmdir()

View File

@@ -0,0 +1,38 @@
from pathlib import Path
# Generated by CodiumAI
import unittest
from utils.file_utils import ensure_directory_exists
class TestEnsureDirectoryExists(unittest.TestCase):
# creates directory if it does not exist
def test_creates_directory_if_not_exists(self):
path = Path('/tmp/testdir')
if path.exists():
path.rmdir()
ensure_directory_exists(path)
self.assertTrue(path.exists())
path.rmdir()
# does not create directory if it already exists
def test_does_not_create_directory_if_exists(self):
path = Path('/tmp/testdir')
path.mkdir(exist_ok=True)
ensure_directory_exists(path)
self.assertTrue(path.exists())
path.rmdir()
# path is a symbolic link
def test_path_is_a_symbolic_link(self):
target_dir = Path('/tmp/targetdir')
symlink_path = Path('/tmp/symlinkdir')
target_dir.mkdir(exist_ok=True)
symlink_path.symlink_to(target_dir)
ensure_directory_exists(symlink_path)
self.assertTrue(symlink_path.exists())
self.assertTrue(symlink_path.is_symlink())
symlink_path.unlink()
target_dir.rmdir()

View File

@@ -0,0 +1,62 @@
from commands.sniff import is_ip_address
import unittest
class TestIsIpAddress(unittest.TestCase):
def test_valid_ipv4_address_all_octets_in_range(self):
self.assertTrue(is_ip_address("192.168.1.1"))
self.assertTrue(is_ip_address("0.0.0.0"))
self.assertTrue(is_ip_address("255.255.255.255"))
def test_ipv4_address_with_leading_zeros(self):
self.assertTrue(is_ip_address("192.168.001.001"))
self.assertTrue(is_ip_address("0.0.0.0"))
self.assertTrue(is_ip_address("255.255.255.255"))
def test_ipv4_address_mixed_single_double_digit_octets(self):
self.assertTrue(is_ip_address("192.168.1.01"))
self.assertTrue(is_ip_address("0.0.0.0"))
self.assertTrue(is_ip_address("255.255.255.255"))
def test_ipv4_address_maximum_values_in_octets(self):
self.assertTrue(is_ip_address("255.255.255.255"))
self.assertTrue(is_ip_address("0.0.0.0"))
self.assertTrue(is_ip_address("192.168.1.1"))
def test_ipv4_address_minimum_values_in_octets(self):
self.assertTrue(is_ip_address("0.0.0.0"))
self.assertTrue(is_ip_address("192.168.1.1"))
self.assertTrue(is_ip_address("255.255.255.255"))
def test_ipv4_address_more_than_four_octets_invalid(self):
self.assertFalse(is_ip_address("192.168.1.1.1"))
self.assertFalse(is_ip_address("0.0.0.0.0"))
self.assertFalse(is_ip_address("255.255.255.255.255"))
def test_ipv4_address_fewer_than_four_octets_invalid(self):
self.assertFalse(is_ip_address("192.168.1"))
self.assertFalse(is_ip_address("0.0"))
self.assertFalse(is_ip_address("255"))
def test_ipv4_address_non_numeric_characters_invalid(self):
self.assertFalse(is_ip_address("192.a.b.c"))
self.assertFalse(is_ip_address("0.x.y.z"))
self.assertFalse(is_ip_address("255.q.w.e"))
def test_ipv4_address_octets_out_of_range_invalid(self):
self.assertFalse(is_ip_address("256.256.256.256"))
self.assertFalse(is_ip_address("300.300.300.300"))
self.assertFalse(is_ip_address("999.999.999.999"))
def test_ipv4_address_empty_string_invalid(self):
self.assertFalse(is_ip_address(""))
self.assertFalse(is_ip_address(" "))
self.assertFalse(is_ip_address(None))

View File

@@ -0,0 +1,64 @@
from commands.sniff import is_mac_address
import unittest
class TestIsMacAddress(unittest.TestCase):
def test_valid_mac_address_lowercase(self):
self.assertTrue(is_mac_address("aa:bb:cc:dd:ee:ff"))
self.assertFalse(is_mac_address("192.168.1.1"))
self.assertFalse(is_mac_address("aa:bb:cc:dd:ee:ff:gg"))
def test_valid_mac_address_uppercase(self):
self.assertTrue(is_mac_address("AA:BB:CC:DD:EE:FF"))
self.assertFalse(is_mac_address("10.0.0.1"))
self.assertFalse(is_mac_address("AA:BB:CC:DD:EE"))
def test_valid_mac_address_mixed_case(self):
self.assertTrue(is_mac_address("Aa:Bb:Cc:Dd:Ee:Ff"))
self.assertFalse(is_mac_address("172.16.0.1"))
self.assertFalse(is_mac_address("Aa:Bb:Cc:Dd:Ee:Ff:Gg"))
def test_valid_mac_address_digits(self):
self.assertTrue(is_mac_address("00:11:22:33:44:55"))
self.assertFalse(is_mac_address("8.8.8.8"))
self.assertFalse(is_mac_address("00:11:22:33:44"))
# returns False for an empty string
def test_empty_string(self):
self.assertFalse(is_mac_address(""))
self.assertFalse(is_mac_address(":"))
def test_invalid_characters(self):
self.assertFalse(is_mac_address("gh:ij:kl:mn:op:qr"))
self.assertFalse(is_mac_address("192.168.0.256"))
self.assertFalse(is_mac_address("ghij::klmn::opqr"))
# returns False for a MAC address with incorrect length
def test_incorrect_length(self):
self.assertFalse(is_mac_address("aa:bb:cc"))
self.assertFalse(is_mac_address("10.0.0.256"))
self.assertFalse(is_mac_address("aa::bb::cc::dd::ee::ff::gg"))
# returns False for a MAC address with missing colons
def test_missing_colons(self):
self.assertFalse(is_mac_address("aabbccddeeff"))
self.assertFalse(is_mac_address("127.0.0.1"))
self.assertFalse(is_mac_address("aabbccddeeffgg"))
# returns False for a MAC address with extra colons
def test_extra_colons(self):
self.assertFalse(is_mac_address("aa::bb::cc::dd::ee::ff"))
self.assertFalse(is_mac_address("192.168.1.256"))
self.assertFalse(is_mac_address("aa::bb::cc::dd::ee::ff::gg"))
# returns False for a MAC address with spaces
def test_spaces_in_mac(self):
self.assertFalse(is_mac_address("aa bb cc dd ee ff"))
self.assertFalse(is_mac_address("8.8.4.4"))
self.assertFalse(is_mac_address("aa bb cc dd ee ff gg"))

View File

@@ -1,44 +1,20 @@
import uuid
from pathlib import Path
from iottb.models.device_metadata_model import dir_contains_device_metadata
from iottb.utils.utils import get_iso_date
from datetime import datetime
def get_capture_uuid():
return str(uuid.uuid4())
def get_capture_src_folder(device_path):
today_str = datetime.now().strftime('%Y-%m-%d')
capture_base_path = device_path / today_str
capture_base_path.mkdir(parents=True, exist_ok=True)
existing_captures = [d for d in capture_base_path.iterdir() if d.is_dir()]
nth_capture = len(existing_captures) + 1
capture_dir = capture_base_path / f'capture_{nth_capture}'
capture_dir.mkdir(parents=True, exist_ok=True)
return capture_dir
def get_capture_date_folder(device_root: Path):
today_iso = get_iso_date()
today_folder = device_root / today_iso
if dir_contains_device_metadata(device_root):
if not today_folder.is_dir():
try:
today_folder.mkdir()
except FileExistsError:
print(f'Folder {today_folder} already exists')
return today_folder
raise FileNotFoundError(f'Given path {device_root} is not a device root directory')
def get_capture_src_folder(device_folder: Path):
assert device_folder.is_dir(), f'Given path {device_folder} is not a folder'
today_iso = get_iso_date()
max_sequence_number = 1
for d in device_folder.iterdir():
if d.is_dir() and d.name.startswith(f'{today_iso}_capture_'):
name = d.name
num = int(name.split('_')[2])
max_sequence_number = max(max_sequence_number, num)
next_sequence_number = max_sequence_number + 1
return device_folder.joinpath(f'{today_iso}_capture_{next_sequence_number:03}')
def make_capture_src_folder(capture_src_folder: Path):
try:
capture_src_folder.mkdir()
except FileExistsError:
print(f'Folder {capture_src_folder} already exists')
finally:
return capture_src_folder
def make_capture_src_folder(capture_src_folder):
capture_src_folder.mkdir(parents=True, exist_ok=True)
return capture_src_folder

View File

@@ -0,0 +1,19 @@
import json
from pathlib import Path
def load_json_template(template_path):
with open(template_path, 'r') as f:
return json.load(f)
def save_json(data, file_path):
with open(file_path, 'w') as f:
json.dump(data, f, indent=4)
def ensure_directory_exists(path):
path = Path(path)
if not path.exists():
path.mkdir(parents=True, exist_ok=True)
return path

View File

@@ -1,41 +1,9 @@
import ipaddress
import shutil
import subprocess
from typing import Optional
def check_installed() -> bool:
"""Check if tcpdump is installed and available on the system path."""
return shutil.which('tcpdump') is not None
def ensure_installed():
"""Ensure that tcpdump is installed, raise an error if not."""
if not check_installed():
raise RuntimeError('tcpdump is not installed. Please install it to continue.')
def list_interfaces(args) -> str:
"""List available network interfaces using tcpdump."""
ensure_installed()
def check_installed():
try:
result = subprocess.run(['tcpdump', '--list-interfaces'], capture_output=True, text=True, check=True)
print(result.stdout)
except subprocess.CalledProcessError as e:
print(f'Failed to list interfaces: {e}')
return ''
def is_valid_ipv4(ip: str) -> bool:
try:
ipaddress.IPv4Address(ip)
subprocess.run(['tcpdump', '--version'], check=True, capture_output=True)
return True
except ValueError:
except subprocess.CalledProcessError:
return False
def str_to_ipv4(ip: str) -> (bool, Optional[ipaddress]):
try:
address = ipaddress.IPv4Address(ip)
return address == ipaddress.IPv4Address(ip), address
except ipaddress.AddressValueError:
return False, None

View File

@@ -1,18 +0,0 @@
import uuid
from datetime import datetime
from iottb.definitions import TODAY_DATE_STRING, DEVICE_METADATA_FILE, CAPTURE_METADATA_FILE
from pathlib import Path
def get_iso_date():
return datetime.now().strftime('%Y-%m-%d')
def subfolder_exists(parent: Path, child: str):
return parent.joinpath(child).exists()
def generate_unique_string_with_prefix(prefix: str):
return prefix + '_' + str(uuid.uuid4())

View File

@@ -1,6 +0,0 @@
interface=wlp0s20f0u1
dhcp-range=192.168.1.2,192.168.1.250,12h
# Gateway
dhcp-option=3,192.168.1.1
# Dns server addr
dhcp-option=6,192.168.1.1

View File

@@ -1,6 +0,0 @@
#!
# Run as root
#
sysctl -w net.ipv4.conf.all.forwarding=1
sysctl -w net.ipv6.conf.all.forwading=1

View File

@@ -1,8 +0,0 @@
interface=wlp0s20f0u1
driver=nl80211
ssid=t3u
hw_mode=g
channel=11
macaddr_acl=0
auth_algs=1
ignore_broadcast_ssid=0

View File

@@ -1,12 +0,0 @@
interface=wlp0s20f0u1
driver=nl80211
ssid=t3u
hw_mode=g
channel=11
macaddr_acl=0
auth_algs=1
ignore_broadcast_ssid=0
wpa=2
wpa_passphrase=11help22help33
wpa_key_mgmt=WPA-PSK
rsn_pairwise=CCMP

View File

@@ -1,35 +0,0 @@
#!/bin/bash
# DISCLAIMER! THIS CODE HAS BEEN TAKEN FROM:
# https://nims11.wordpress.com/2012/04/27/hostapd-the-linux-way-to-create-virtual-wifi-access-point/
# Usage: ./initSoftAP
########### Initial wifi interface configuration #############
ip link set $1 down
ip addr flush dev $1
ip link set $1 up
ip addr add 10.0.0.1/24 dev $1
# If you still use ifconfig for some reason, replace the above lines with the following
# ifconfig $1 up 10.0.0.1 netmask 255.255.255.0
sleep 2
###########
########### Start dnsmasq ##########
if [ -z "$(ps -e | grep dnsmasq)" ]
then
dnsmasq
fi
###########
########### Enable NAT ############
iptables -t nat -A POSTROUTING -o $2 -j MASQUERADE
iptables -A FORWARD -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
iptables -A FORWARD -i $1 -o $2 -j ACCEPT
#Thanks to lorenzo
#Uncomment the line below if facing problems while sharing PPPoE, see lorenzo's comment for more details
#iptables -I FORWARD -p tcp --tcp-flags SYN,RST SYN -j TCPMSS --clamp-mss-to-pmtu
sysctl -w net.ipv4.ip_forward=1
###########
########## Start hostapd ###########
hostapd $PWD/hostapd.conf ## TODO! either put config in normal place
#killall dnsmasq

View File

@@ -1,36 +0,0 @@
#!/bin/bash
# DISCLAIMER! THIS CODE HAS BEEN TAKEN FROM:
# https://nims11.wordpress.com/2012/04/27/hostapd-the-linux-way-to-create-virtual-wifi-access-point/
# Usage: ./initSoftAP
########### Initial wifi interface configuration #############
ip link set $1 down
ip addr flush dev $1
ip link set $1 up
ip addr add 10.0.0.1/24 dev $1
# If you still use ifconfig for some reason, replace the above lines with the following
# ifconfig $1 up 10.0.0.1 netmask 255.255.255.0
sleep 2
###########
########### Start dnsmasq ##########
if [ -z "$(ps -e | grep dnsmasq)" ]
then
dnsmasq
fi
###########
########### Enable NAT ############
nft add table nat
nft -- add chain nat prerouting { type nat hook prerouting priority -100 \; }
nft add chain nat postrouting { type nat hook postrouting priority 100 \; }
nft add rule nat postrouting oifname wlp44s0 wlp masquerade
#Thanks to lorenzo
#Uncomment the line below if facing problems while sharing PPPoE, see lorenzo's comment for more details
#iptables -I FORWARD -p tcp --tcp-flags SYN,RST SYN -j TCPMSS --clamp-mss-to-pmtu
sysctl -w net.ipv4.ip_forward=1
###########
########## Start hostapd ###########
hostapd $PWD/hostapd.conf ## TODO! either put config in normal place
#killall dnsmasq

View File

@@ -1,22 +0,0 @@
#! /bin/env bash
TYPE="wifi"
IFNAME="wlp0s20f0u1"
CONNAME="T3UminiConn"
SSID="T3Umini"
BAND="bg"
CHAN=1
KMGMT="wpa-psk"
PSK=11223344
nmcli con add type wifi ifname wlp0s20f0u1 mode ap con-name WIFI_AP_TEST ssid MY_AP_TEST &&
nmcli con modify WIFI_AP_TEST 802-11-wireless.band bg &&
nmcli con modify WIFI_AP_TEST 802-11-wireless.channel 1 &&
nmcli con modify WIFI_AP_TEST 802-11-wireless-security.key-mgmt wpa-psk &&
nmcli con modify WIFI_AP_TEST 802-11-wireless-security.pairwise ccmp &&
nmcli con modify WIFI_AP_TEST 802-11-wireless-security.psk 11223344 &&
nmcli con modify WIFI_AP_TEST ipv4.method shared && nmcli con up WIFI_AP_TEST
' nmcli con modify WIFI_AP_TEST 802-11-wireless-security.proto rsn &&
nmcli con modify WIFI_AP_TEST 802-11-wireless-security.group ccmp && NOT USED FOR APPLE`

View File

View File

View File

@@ -1,15 +0,0 @@
import pytest
import tempfile
from pathlib import Path
@pytest.fixture(scope='session')
def tmp_dir():
with tempfile.TemporaryDirectory() as tmp_dir:
yield Path(tmp_dir)
@pytest.fixture
def mock_device_metadata_json_(tmp_dir):
with tempfile.TemporaryDirectory() as tmp_dir:
pass

View File

@@ -1,47 +0,0 @@
import sys
import unittest
from io import StringIO
from unittest.mock import patch, MagicMock
from pathlib import Path
from iottb.definitions import DEVICE_METADATA_FILE
import shutil
from iottb.__main__ import main
class TestDeviceMetadataFileCreation(unittest.TestCase):
def setUp(self):
self.test_dir = Path('/tmp/iottbtest/test_add_device')
self.test_dir.mkdir(parents=True, exist_ok=True)
# self.captured_output = StringIO()
# sys.stdout = self.captured_output
def tearDown(self):
# shutil.rmtree(str(self.test_dir))
for item in self.test_dir.iterdir():
if item.is_dir():
item.rmdir()
else:
item.unlink()
self.test_dir.rmdir()
# sys.stdout = sys.__stdout__
@patch('builtins.input', side_effect=['iPhone 14', 'y', 'y'])
def test_guided_device_setup(self, mock_input):
sys.argv = ['__main__.py', 'add', '--root_dir', str(self.test_dir), '--guided']
main()
expected_file = self.test_dir / DEVICE_METADATA_FILE
self.assertTrue(expected_file.exists()), f'Expected file not created: {expected_file}'
@patch('builtins.input', side_effect=['y']) # need mock_input else wont work
def test_device_setup(self, mock_input):
sys.argv = ['__main__.py', 'add', '--root_dir', str(self.test_dir), '--name', 'iPhone 14']
main()
expected_file = self.test_dir / DEVICE_METADATA_FILE
self.assertTrue(expected_file.exists()), f'Expected file not created: {expected_file}'
def test_add_when_file_exists(self):
# TODO
pass
if __name__ == '__main__':
unittest.main()

View File

@@ -1,2 +0,0 @@
def test_save_to_json():
assert False