18 Commits

Author SHA1 Message Date
Sebastian Lenzlinger
64788a1997 Move unused modules into archive. 2024-05-08 03:06:20 +02:00
Sebastian Lenzlinger
7ffbdda7ea Add test case to add-device subcommand 2024-05-08 02:56:49 +02:00
Sebastian Lenzlinger
2e95bd2fd2 Fix places where quote replacement lead to issues. 2024-05-08 02:52:37 +02:00
Sebastian Lenzlinger
e569eb3e5b Replace all double quotes strings with single quoted strings. 2024-05-08 02:46:14 +02:00
Sebastian Lenzlinger
266a669e5e Add test for device metadata file creation and fixes until test passed. 2024-05-08 02:36:07 +02:00
Sebastian Lenzlinger
a21312ee61 Corrections 2024-05-08 01:38:53 +02:00
Sebastian Lenzlinger
73771be70d Add preliminary implementation to load DeviceMetadata from data. 2024-05-08 00:52:25 +02:00
Sebastian Lenzlinger
27ae736f11 Implement custom to_json function for DeviceMetadata 2024-05-08 00:40:29 +02:00
Sebastian Lenzlinger
cb1ad33cae Implement custom to_json function for CaptureMetadata 2024-05-08 00:40:17 +02:00
Sebastian Lenzlinger
2681ee9a8e Refactor function name to reflect that not using pydantic anymore. 2024-05-08 00:31:25 +02:00
Sebastian Lenzlinger
799414ad39 Remove getters and setters from DeviceMetadata class and change dependencies to use field access. 2024-05-08 00:03:18 +02:00
Sebastian Lenzlinger
6b73530943 Remove getters and setters from CaptureMetadata class and refactor dependencies to use field access. 2024-05-08 00:02:08 +02:00
Sebastian Lenzlinger
798a32b23e Remove ignored file from git. 2024-05-07 23:01:40 +02:00
Sebastian Lenzlinger
11e2c356fa Factor out pydantic. 2024-05-07 22:48:53 +02:00
Sebastian Lenzlinger
95426e0baa Merge branch 'cli-dev' into 'main'
Refactor and add Logger

See merge request dmi-pet/bsc-msc/2024-bsc-sebastian-lenzlinger!6
2024-05-07 19:24:56 +00:00
Sebastian Lenzlinger
347d43dcef Refactor and add Logger 2024-05-07 19:24:56 +00:00
Sebastian Lenzlinger
822b49ed8b Merge branch 'wifi' into 'main'
Add bash file with functions to enable and disable monitor mode using either...

See merge request dmi-pet/bsc-msc/2024-bsc-sebastian-lenzlinger!5
2024-05-02 17:15:51 +00:00
Sebastian Lenzlinger
b0a3fd951d Add bash file with functions to enable and disable monitor mode using either... 2024-05-02 17:15:51 +00:00
33 changed files with 731 additions and 644 deletions

2
.gitignore vendored
View File

@@ -1,3 +1,5 @@
.obsidian
venv
__pycache__
*.log
.idea/

View File

@@ -4,4 +4,7 @@
<option name="format" value="PLAIN" />
<option name="myDocStringFormat" value="Plain" />
</component>
<component name="TestRunnerService">
<option name="PROJECT_TEST_RUNNER" value="py.test" />
</component>
</module>

261
.idea/workspace.xml generated
View File

@@ -1,261 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="AutoImportSettings">
<option name="autoReloadType" value="SELECTIVE" />
</component>
<component name="ChangeListManager">
<list default="true" id="7a3ac8e1-7fbf-4aa7-9cf9-a51d7ade8503" name="Changes" comment="Start tracking development config files.">
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/code/kydcap/subcommands/sniff.py" beforeDir="false" afterPath="$PROJECT_DIR$/code/kydcap/subcommands/sniff.py" afterDir="false" />
</list>
<option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" />
<option name="HIGHLIGHT_NON_ACTIVE_CHANGELIST" value="false" />
<option name="LAST_RESOLUTION" value="IGNORE" />
</component>
<component name="FileTemplateManagerImpl">
<option name="RECENT_TEMPLATES">
<list>
<option value="Python Script" />
</list>
</option>
</component>
<component name="Git.Settings">
<option name="PUSH_AUTO_UPDATE" value="true" />
<option name="RECENT_BRANCH_BY_REPOSITORY">
<map>
<entry key="$PROJECT_DIR$" value="main" />
</map>
</option>
<option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$" />
</component>
<component name="ProblemsViewState">
<option name="selectedTabId" value="CurrentFile" />
</component>
<component name="ProjectColorInfo">{
&quot;associatedIndex&quot;: 3
}</component>
<component name="ProjectId" id="2fYAAba0AnH9jx9D0JkB8Xbuv0r" />
<component name="ProjectViewState">
<option name="hideEmptyMiddlePackages" value="true" />
<option name="showLibraryContents" value="true" />
</component>
<component name="PropertiesComponent">{
&quot;keyToString&quot;: {
&quot;ASKED_ADD_EXTERNAL_FILES&quot;: &quot;true&quot;,
&quot;ASKED_MARK_IGNORED_FILES_AS_EXCLUDED&quot;: &quot;true&quot;,
&quot;ASKED_SHARE_PROJECT_CONFIGURATION_FILES&quot;: &quot;true&quot;,
&quot;Python.__init__.executor&quot;: &quot;Run&quot;,
&quot;Python.__main__.executor&quot;: &quot;Run&quot;,
&quot;Python.main.executor&quot;: &quot;Run&quot;,
&quot;RunOnceActivity.ShowReadmeOnStart&quot;: &quot;true&quot;,
&quot;SHARE_PROJECT_CONFIGURATION_FILES&quot;: &quot;true&quot;,
&quot;git-widget-placeholder&quot;: &quot;main&quot;,
&quot;last_opened_file_path&quot;: &quot;/home/slnopriv/projects/2024-bsc-sebastian-lenzlinger/code/kydcap/utils/device_metadata_utils.py&quot;,
&quot;node.js.detected.package.eslint&quot;: &quot;true&quot;,
&quot;node.js.detected.package.tslint&quot;: &quot;true&quot;,
&quot;node.js.selected.package.eslint&quot;: &quot;(autodetect)&quot;,
&quot;node.js.selected.package.tslint&quot;: &quot;(autodetect)&quot;,
&quot;nodejs_package_manager_path&quot;: &quot;npm&quot;,
&quot;settings.editor.selected.configurable&quot;: &quot;com.jetbrains.python.configuration.PyActiveSdkModuleConfigurable&quot;,
&quot;vue.rearranger.settings.migration&quot;: &quot;true&quot;
}
}</component>
<component name="RecentsManager">
<key name="MoveFile.RECENT_KEYS">
<recent name="$PROJECT_DIR$/archive" />
<recent name="$PROJECT_DIR$" />
<recent name="$PROJECT_DIR$/code/misc/archive" />
<recent name="$PROJECT_DIR$/code/misc" />
<recent name="$PROJECT_DIR$/code/kydcap/utils" />
</key>
</component>
<component name="RunManager" selected="Python.__main__">
<configuration name="__init__" type="PythonConfigurationType" factoryName="Python" temporary="true" nameIsGenerated="true">
<module name="2024-bsc-sebastian-lenzlinger" />
<option name="ENV_FILES" value="" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/code/kydcap" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/code/kydcap/__init__.py" />
<option name="PARAMETERS" value="" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
<configuration name="__main__" type="PythonConfigurationType" factoryName="Python" nameIsGenerated="true">
<module name="2024-bsc-sebastian-lenzlinger" />
<option name="ENV_FILES" value="" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/code/kydcap" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/code/kydcap/__main__.py" />
<option name="PARAMETERS" value="init-device-root --dynamic" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
<list>
<item itemvalue="Python.__main__" />
<item itemvalue="Python.__init__" />
</list>
<recent_temporary>
<list>
<item itemvalue="Python.__init__" />
</list>
</recent_temporary>
</component>
<component name="SharedIndexes">
<attachedChunks>
<set>
<option value="bundled-js-predefined-1d06a55b98c1-74d2a5396914-JavaScript-PY-241.14494.241" />
<option value="bundled-python-sdk-0509580d9d50-28c9f5db9ffe-com.jetbrains.pycharm.pro.sharedIndexes.bundled-PY-241.14494.241" />
</set>
</attachedChunks>
</component>
<component name="SpellCheckerSettings" RuntimeDictionaries="0" Folders="0" CustomDictionaries="0" DefaultDictionary="application-level" UseSingleDictionary="true" transferred="true" />
<component name="TaskManager">
<task active="true" id="Default" summary="Default task">
<changelist id="7a3ac8e1-7fbf-4aa7-9cf9-a51d7ade8503" name="Changes" comment="" />
<created>1713967494544</created>
<option name="number" value="Default" />
<option name="presentableId" value="Default" />
<updated>1713967494544</updated>
<workItem from="1713967495566" duration="6927000" />
<workItem from="1714554228183" duration="34000" />
<workItem from="1714554269789" duration="56478000" />
<workItem from="1714616237168" duration="6135000" />
</task>
<task id="LOCAL-00001" summary="Add code for capture testbed. This is a huge commit. End of day sync...">
<option name="closed" value="true" />
<created>1714615532115</created>
<option name="number" value="00001" />
<option name="presentableId" value="LOCAL-00001" />
<option name="project" value="LOCAL" />
<updated>1714615532115</updated>
</task>
<task id="LOCAL-00002" summary="Add some notes.">
<option name="closed" value="true" />
<created>1714615608142</created>
<option name="number" value="00002" />
<option name="presentableId" value="LOCAL-00002" />
<option name="project" value="LOCAL" />
<updated>1714615608142</updated>
</task>
<task id="LOCAL-00003" summary="Update gitignore">
<option name="closed" value="true" />
<created>1714616343905</created>
<option name="number" value="00003" />
<option name="presentableId" value="LOCAL-00003" />
<option name="project" value="LOCAL" />
<updated>1714616343905</updated>
</task>
<task id="LOCAL-00004" summary="Add test module.">
<option name="closed" value="true" />
<created>1714617162903</created>
<option name="number" value="00004" />
<option name="presentableId" value="LOCAL-00004" />
<option name="project" value="LOCAL" />
<updated>1714617162903</updated>
</task>
<task id="LOCAL-00005" summary="Update gitignore again.">
<option name="closed" value="true" />
<created>1714617231842</created>
<option name="number" value="00005" />
<option name="presentableId" value="LOCAL-00005" />
<option name="project" value="LOCAL" />
<updated>1714617231842</updated>
</task>
<task id="LOCAL-00006" summary="Start tracking development config files.">
<option name="closed" value="true" />
<created>1714617266799</created>
<option name="number" value="00006" />
<option name="presentableId" value="LOCAL-00006" />
<option name="project" value="LOCAL" />
<updated>1714617266799</updated>
</task>
<option name="localTasksCounter" value="7" />
<servers />
</component>
<component name="TypeScriptGeneratedFilesManager">
<option name="version" value="3" />
</component>
<component name="Vcs.Log.Tabs.Properties">
<option name="RECENT_FILTERS">
<map>
<entry key="Branch">
<value>
<list>
<RecentGroup>
<option name="FILTER_VALUES">
<option value="HEAD" />
</option>
</RecentGroup>
<RecentGroup>
<option name="FILTER_VALUES">
<option value="devel" />
</option>
</RecentGroup>
</list>
</value>
</entry>
</map>
</option>
<option name="TAB_STATES">
<map>
<entry key="MAIN">
<value>
<State>
<option name="FILTERS">
<map>
<entry key="branch">
<value>
<list>
<option value="HEAD" />
</list>
</value>
</entry>
</map>
</option>
</State>
</value>
</entry>
</map>
</option>
</component>
<component name="VcsManagerConfiguration">
<MESSAGE value="Add code for capture testbed. This is a huge commit. End of day sync..." />
<MESSAGE value="Add some notes." />
<MESSAGE value="Update gitignore" />
<MESSAGE value="Add test module." />
<MESSAGE value="Update gitignore again." />
<MESSAGE value="Start tracking development config files." />
<option name="LAST_COMMIT_MESSAGE" value="Start tracking development config files." />
</component>
<component name="com.intellij.coverage.CoverageDataManagerImpl">
<SUITE FILE_PATH="coverage/2024_bsc_sebastian_lenzlinger$__init__.coverage" NAME="__init__ Coverage Results" MODIFIED="1714619300966" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="false" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/code/kydcap" />
<SUITE FILE_PATH="coverage/2024_bsc_sebastian_lenzlinger$main.coverage" NAME="__main__ Coverage Results" MODIFIED="1714619560177" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="false" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/code/kydcap" />
</component>
</project>

View File

@@ -1,7 +1,7 @@
import json
from pathlib import Path
from kydcap.config import ReturnCodes
from iottb.definitions import ReturnCodes
def set_device_ip_address(ip_addr: str, file_path: Path):
@@ -9,12 +9,12 @@ def set_device_ip_address(ip_addr: str, file_path: Path):
assert file_path.is_file()
with file_path.open('r') as f:
data = json.load(f)
current_ip = data["device_ip_address"]
current_ip = data['device_ip_address']
if current_ip is not None:
print(f"Device IP Address is set to {current_ip}")
response = input(f"Do you want to change the recorded IP address to {ip_addr}? [Y/N] ")
if response.upper() == "N":
print("Aborting change to device IP address")
print(f'Device IP Address is set to {current_ip}')
response = input(f'Do you want to change the recorded IP address to {ip_addr}? [Y/N] ')
if response.upper() == 'N':
print('Aborting change to device IP address')
return ReturnCodes.ABORTED
with file_path.open('w') as f:
json.dump(data, f)
@@ -26,15 +26,13 @@ def set_device_mac_address(mac_addr: str, file_path: Path):
assert file_path.is_file()
with file_path.open('r') as f:
data = json.load(f)
current_mac = data["device_mac_address"]
current_mac = data['device_mac_address']
if current_mac is not None:
print(f"Device MAC Address is set to {current_mac}")
response = input(f"Do you want to change the recorded MAC address to {mac_addr}? [Y/N] ")
if response.upper() == "N":
print("Aborting change to device MAC address")
print(f'Device MAC Address is set to {current_mac}')
response = input(f'Do you want to change the recorded MAC address to {mac_addr}? [Y/N] ')
if response.upper() == 'N':
print('Aborting change to device MAC address')
return ReturnCodes.ABORTED
with file_path.open('w') as f:
json.dump(data, f)
return ReturnCodes.SUCCESS
# TODO finnish for other fields in capture metadata

View File

@@ -2,7 +2,7 @@ import json
from datetime import datetime
from pathlib import Path
from kydcap.config import ReturnCodes
from iottb.definitions import ReturnCodes
def update_firmware_version(version: str, file_path: Path):
@@ -47,3 +47,5 @@ def update_device_type(device_type: str, file_path: Path):
with file_path.open('w') as file:
json.dump(metadata, file)
return ReturnCodes.SUCCESS

View File

@@ -2,31 +2,31 @@ def setup_sniff_tcpdump_parser(parser_sniff):
# arguments which will be passed to tcpdump
parser_sniff_tcpdump = parser_sniff.add_argument_group('tcpdump arguments')
# TODO: tcpdump_parser.add_argument('-c', '--count', re)
parser_sniff_tcpdump.add_argument("-a", "--ip-address=", help="IP address of the device to sniff", dest="device_ip")
parser_sniff_tcpdump.add_argument("-i", "--interface=", help="Interface of the capture device.", dest="capture_interface",default="")
parser_sniff_tcpdump.add_argument("-I", "--monitor-mode", help="Put interface into monitor mode",
action="store_true")
parser_sniff_tcpdump.add_argument("-n", help="Deactivate name resolution. Option is set by default.",
action="store_true")
parser_sniff_tcpdump.add_argument("-#", "--number",
help="Print packet number at beginning of line. Set by default.",
action="store_true")
parser_sniff_tcpdump.add_argument("-e", help="Print link layer headers. Option is set by default.",
action="store_true")
parser_sniff_tcpdump.add_argument("-t", action="count", default=0,
help="Please see tcpdump manual for details. Unused by default.")
parser_sniff_tcpdump.add_argument('-a', '--ip-address=', help='IP address of the device to sniff', dest='device_ip')
parser_sniff_tcpdump.add_argument('-i', '--interface=', help='Interface of the capture device.', dest='capture_interface',default='')
parser_sniff_tcpdump.add_argument('-I', '--monitor-mode', help='Put interface into monitor mode',
action='store_true')
parser_sniff_tcpdump.add_argument('-n', help='Deactivate name resolution. Option is set by default.',
action='store_true')
parser_sniff_tcpdump.add_argument('-#', '--number',
help='Print packet number at beginning of line. Set by default.',
action='store_true')
parser_sniff_tcpdump.add_argument('-e', help='Print link layer headers. Option is set by default.',
action='store_true')
parser_sniff_tcpdump.add_argument('-t', action='count', default=0,
help='Please see tcpdump manual for details. Unused by default.')
def setup_sniff_parser(subparsers):
# create parser for "sniff" command
parser_sniff = subparsers.add_parser("sniff", help="Start tcpdump capture.")
# create parser for 'sniff' command
parser_sniff = subparsers.add_parser('sniff', help='Start tcpdump capture.')
setup_sniff_tcpdump_parser(parser_sniff)
setup_pcap_filter_parser(parser_sniff)
cap_size_group = parser_sniff.add_mutually_exclusive_group(required=True)
cap_size_group.add_argument("-c", "--count", type=int, help="Number of packets to capture.", default=0)
cap_size_group.add_argument("--mins", type=int, help="Time in minutes to capture.", default=60)
cap_size_group.add_argument('-c', '--count', type=int, help='Number of packets to capture.', default=0)
cap_size_group.add_argument('--mins', type=int, help='Time in minutes to capture.', default=60)
def setup_pcap_filter_parser(parser_sniff):
parser_pcap_filter = parser_sniff.add_argument_parser("pcap-filter expression")
parser_pcap_filter = parser_sniff.add_argument_parser('pcap-filter expression')
pass

View File

@@ -15,5 +15,5 @@ class Metadata:
def create_metadata(filename, unique_id, device_details):
date_string = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
meta_filename = f"meta_{date_string}_{unique_id}.json"
date_string = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
meta_filename = f'meta_{date_string}_{unique_id}.json'

View File

@@ -3,38 +3,38 @@ from pathlib import Path
from pydantic import BaseModel
from kydcap.models.device_metadata_model import DeviceMetadata
from kydcap.config import DEVICE_METADATA_FILE
from iottb.models.device_metadata_model import DeviceMetadata
from iottb.definitions import DEVICE_METADATA_FILE
def write_device_metadata_to_file(metadata: DeviceMetadata, device_path: Path):
"""Write the device metadata to a JSON file in the specified directory."""
meta_file_path = device_path / "meta.json"
'''Write the device metadata to a JSON file in the specified directory.'''
meta_file_path = device_path / 'meta.json'
meta_file_path.write_text(metadata.json(indent=2))
def confirm_device_metadata(metadata: DeviceMetadata) -> bool:
"""Display device metadata for user confirmation."""
'''Display device metadata for user confirmation.'''
print(metadata.json(indent=2))
return input("Confirm device metadata? (y/n): ").strip().lower() == 'y'
return input('Confirm device metadata? (y/n): ').strip().lower() == 'y'
def get_device_metadata_from_user() -> DeviceMetadata:
"""Prompt the user to enter device details and return a populated DeviceMetadata object."""
device_name = input("Device name: ")
device_short_name = device_name.lower().replace(" ", "-")
'''Prompt the user to enter device details and return a populated DeviceMetadata object.'''
device_name = input('Device name: ')
device_short_name = device_name.lower().replace(' ', '-')
return DeviceMetadata(device_name=device_name, device_short_name=device_short_name)
def initialize_device_root_dir(device_name: str) -> Path:
"""Create and return the path for the device directory."""
'''Create and return the path for the device directory.'''
device_path = Path.cwd() / device_name
device_path.mkdir(exist_ok=True)
return device_path
def write_metadata(metadata: BaseModel, device_name: str):
"""Write device metadata to a JSON file."""
'''Write device metadata to a JSON file.'''
meta_path = Path.cwd() / device_name / DEVICE_METADATA_FILE
meta_path.parent.mkdir(parents=True, exist_ok=True)
with meta_path.open('w') as f:
@@ -42,19 +42,19 @@ def write_metadata(metadata: BaseModel, device_name: str):
def get_device_metadata(file_path: Path) -> DeviceMetadata | None:
"""Fetch device metadata from a JSON file."""
'''Fetch device metadata from a JSON file.'''
if dev_metadata_exists(file_path):
with file_path.open('r') as f:
device_metadata_json = json.load(f)
try:
device_metadata = DeviceMetadata.model_validate_json(device_metadata_json)
device_metadata = DeviceMetadata.from_json(device_metadata_json)
return device_metadata
except ValueError as e:
print(f"Validation error for device metadata: {e}")
print(f'Validation error for device metadata: {e}')
else:
# TODO Decide what to do (e.g. search for file etc)
print(f"No device metadata at {file_path}")
print(f'No device metadata at {file_path}')
return None

38
code/iottb/__main__.py Normal file
View File

@@ -0,0 +1,38 @@
#!/usr/bin/env python3
import argparse
from iottb.subcommands.capture import setup_capture_parser
from iottb.subcommands.add_device import setup_init_device_root_parser
######################
# Argparse setup
######################
def setup_argparse():
# create top level parser
root_parser = argparse.ArgumentParser(prog='iottb')
subparsers = root_parser.add_subparsers(title='subcommands', required=True, dest='command')
setup_capture_parser(subparsers)
setup_init_device_root_parser(subparsers)
return root_parser
def main():
parser = setup_argparse()
args = parser.parse_args()
print(args)
if args.command:
try:
args.func(args)
except KeyboardInterrupt:
print('Received keyboard interrupt. Exiting...')
exit(1)
except Exception as e:
print(f'Error: {e}')
# create_capture_directory(args.device_name)
if __name__ == '__main__':
main()

26
code/iottb/definitions.py Normal file
View File

@@ -0,0 +1,26 @@
from datetime import datetime
from enum import Flag, unique, global_enum
DEVICE_METADATA_FILE = 'device_metadata.json'
CAPTURE_METADATA_FILE = 'capture_metadata.json'
TODAY_DATE_STRING = datetime.now().strftime('%d%b%Y').lower() # TODO convert to function in utils or so
CAPTURE_FOLDER_BASENAME = 'capture_###'
AFFIRMATIVE_USER_RESPONSE = {'yes', 'y', 'true', 'Y', 'Yes', 'YES'}
NEGATIVE_USER_RESPONSE = {'no', 'n', 'N', 'No'}
YES_DEFAULT = AFFIRMATIVE_USER_RESPONSE.union({'', ' '})
NO_DEFAULT = NEGATIVE_USER_RESPONSE.union({'', ' '})
@unique
@global_enum
class ReturnCodes(Flag):
SUCCESS = 0
ABORTED = 1
FAILURE = 2
UNKNOWN = 3
FILE_NOT_FOUND = 4
FILE_ALREADY_EXISTS = 5
INVALID_ARGUMENT = 6
INVALID_ARGUMENT_VALUE = 7

28
code/iottb/logger.py Normal file
View File

@@ -0,0 +1,28 @@
import logging
import sys
from logging.handlers import RotatingFileHandler
def setup_logging():
logger_obj = logging.getLogger('iottbLogger')
logger_obj.setLevel(logging.DEBUG)
file_handler = RotatingFileHandler('iottb.log')
console_handler = logging.StreamHandler(sys.stdout)
file_handler.setLevel(logging.INFO)
console_handler.setLevel(logging.DEBUG)
file_fmt = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
console_fmt = logging.Formatter('%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s')
file_handler.setFormatter(file_fmt)
console_handler.setFormatter(console_fmt)
logger_obj.addHandler(file_handler)
logger_obj.addHandler(console_handler)
return logger_obj
logger = setup_logging()

View File

@@ -0,0 +1,102 @@
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Optional
from iottb.definitions import ReturnCodes, CAPTURE_METADATA_FILE
from iottb.models.device_metadata_model import DeviceMetadata
from iottb.logger import logger
class CaptureMetadata:
# Required Fields
device_metadata: DeviceMetadata
capture_id: str = lambda: str(uuid.uuid4())
device_id: str
capture_dir: Path
capture_file: str
capture_date: str = lambda: datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
# Statistics
start_time: str
stop_time: str
# tcpdump
packet_count: Optional[int]
pcap_filter: str = ''
tcpdump_command: str = ''
interface: str = ''
# Optional Fields
device_ip_address: str = 'No IP Address set'
device_mac_address: Optional[str] = None
app: Optional[str] = None
app_version: Optional[str] = None
firmware_version: Optional[str] = None
def __init__(self, device_metadata: DeviceMetadata, capture_dir: Path):
logger.info(f'Creating CaptureMetadata model from DeviceMetadata: {device_metadata}')
self.device_metadata = device_metadata
self.capture_dir = capture_dir
assert capture_dir.is_dir(), f'Capture directory {capture_dir} does not exist'
def build_capture_file_name(self):
logger.info(f'Building capture file name')
if self.app is None:
logger.debug(f'No app specified')
prefix = self.device_metadata.device_short_name
else:
logger.debug(f'App specified: {self.app}')
assert str(self.app).strip() not in {'', ' '}, f'app is not a valid name: {self.app}'
prefix = self.app.lower().replace(' ', '_')
# assert self.capture_dir is not None, f'{self.capture_dir} does not exist'
filename = f'{prefix}_{str(self.capture_id)}.pcap'
logger.debug(f'Capture file name: {filename}')
self.capture_file = filename
def save_capture_metadata_to_json(self, file_path: Path = Path(CAPTURE_METADATA_FILE)):
assert self.capture_dir.is_dir(), f'capture_dir is not a directory: {self.capture_dir}'
if file_path.is_file():
print(f'File {file_path} already exists, update instead.')
return ReturnCodes.FILE_ALREADY_EXISTS
metadata = self.to_json(indent=2)
with file_path.open('w') as file:
json.dump(metadata, file)
return ReturnCodes.SUCCESS
def to_json(self, indent=2):
# TODO: Where to validate data?
logger.info(f'Converting CaptureMetadata to JSON')
data = {}
# List of fields from CaptureData class, if fields[key]==True, then it is a required field
fields = {
'capture_id': True, #
'device_id': True,
'capture_dir': True,
'capture_file': False,
'capture_date': False,
'start_time': True,
'stop_time': True,
'packet_count': False,
'pcap_filter': False,
'tcpdump_command': False,
'interface': False,
'device_ip_address': False,
'device_mac_address': False,
'app': False,
'app_version': False,
'firmware_version': False
}
for field, is_mandatory in fields.items():
value = getattr(self, field, None)
if value not in [None, ''] or is_mandatory:
if value in [None, ''] and is_mandatory:
raise ValueError(f'Field {field} is required and cannot be empty.')
data[field] = str(value) if not isinstance(value, str) else value
logger.debug(f'Capture metadata: {data}')
return json.dumps(data, indent=indent)

View File

@@ -0,0 +1,111 @@
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Optional, List
# iottb modules
from iottb.definitions import ReturnCodes, DEVICE_METADATA_FILE
from iottb.logger import logger
# 3rd party libs
IMMUTABLE_FIELDS = {'device_name', 'device_short_name', 'device_id', 'date_created'}
class DeviceMetadata:
# Required fields
device_name: str
device_short_name: str
device_id: str
date_created: str
device_root_path: Path
# Optional Fields
aliases: Optional[List[str]] = None
device_type: Optional[str] = None
device_serial_number: Optional[str] = None
device_firmware_version: Optional[str] = None
date_updated: Optional[str] = None
capture_files: Optional[List[str]] = []
def __init__(self, device_name: str, device_root_path: Path):
self.device_name = device_name
self.device_short_name = device_name.lower().replace(' ', '_')
self.device_id = str(uuid.uuid4())
self.date_created = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
self.device_root_path = device_root_path
if not self.device_root_path or not self.device_root_path.is_dir():
logger.error(f'Invalid device root path: {device_root_path}')
raise ValueError(f'Invalid device root path: {device_root_path}')
logger.debug(f'Device name: {device_name}')
logger.debug(f'Device short_name: {self.device_short_name}')
logger.debug(f'Device root dir: {device_root_path}')
logger.info(f'Initialized DeviceMetadata model: {device_name}')
@classmethod
def load_from_json(cls, device_file_path: Path):
logger.info(f'Loading DeviceMetadata from JSON file: {device_file_path}')
assert device_file_path.is_file(), f'{device_file_path} is not a file'
assert device_file_path.name == DEVICE_METADATA_FILE, f'{device_file_path} is not a {DEVICE_METADATA_FILE}'
device_meta_filename = device_file_path
with device_meta_filename.open('r') as file:
metadata_json = json.load(file)
metadata_model_obj = cls.from_json(metadata_json)
return metadata_model_obj
def save_to_json(self, file_path: Path):
logger.info(f'Saving DeviceMetadata to JSON file: {file_path}')
if file_path.is_file():
print(f'File {file_path} already exists, update instead.')
return ReturnCodes.FILE_ALREADY_EXISTS
metadata = self.to_json(indent=2)
with file_path.open('w') as file:
json.dump(metadata, file)
return ReturnCodes.SUCCESS
@classmethod
def from_json(cls, metadata_json):
if isinstance(metadata_json, dict):
return DeviceMetadata(**metadata_json)
def to_json(self, indent=2):
# TODO: atm almost exact copy as in CaptureMetadata
data = {}
fields = {
'device_name': True,
'device_short_name': True,
'device_id': True,
'date_created': True,
'device_root_path': True,
'aliases': False,
'device_type': False,
'device_serial_number': False,
'device_firmware_version': False,
'date_updated': False,
'capture_files': False,
}
for field, is_mandatory in fields.items():
value = getattr(self, field, None)
if value not in [None, ''] or is_mandatory:
if value in [None, ''] and is_mandatory:
logger.debug(f'Mandatory field {field}: {value}')
raise ValueError(f'Field {field} is required and cannot be empty.')
data[field] = str(value) if not isinstance(value, str) else value
logger.debug(f'Device metadata: {data}')
return json.dumps(data, indent=indent)
def dir_contains_device_metadata(dir_path: Path):
if not dir_path.is_dir():
return False
else:
meta_file_path = dir_path / DEVICE_METADATA_FILE
print(f'Device metadata file path {str(meta_file_path)}')
if not meta_file_path.is_file():
return False
else:
return True

View File

@@ -0,0 +1,75 @@
import logging
import pathlib
from iottb import definitions
from iottb.definitions import DEVICE_METADATA_FILE
from iottb.logger import logger
from iottb.models.device_metadata_model import DeviceMetadata
from archive.device_metadata_utils import *
logger.setLevel(logging.INFO) # Since module currently passes all tests
def setup_init_device_root_parser(subparsers):
parser = subparsers.add_parser('add-device', aliases=['add-device-root', 'add'])
parser.add_argument('--root_dir', type=pathlib.Path, default=pathlib.Path.cwd())
group = parser.add_mutually_exclusive_group()
group.add_argument('--guided', action='store_true', help='Guided setup', default=False)
group.add_argument('--name', action='store', type=str, help='name of device')
parser.set_defaults(func=handle_add)
def handle_add(args):
logger.info(f'Add device handler called with args {args}')
args.root_dir.mkdir(parents=True, exist_ok=True) # else metadata.save_to_file will fail TODO: unclear what to assume
if args.guided:
logger.debug('begin guided setup')
metadata = guided_setup(args.root_dir)
logger.debug('guided setup complete')
else:
logger.debug('Setup through passed args: setup')
if not args.name:
logger.error('No device name specified with unguided setup.')
return ReturnCodes.ERROR
metadata = DeviceMetadata(args.name, args.root_dir)
file_path = args.root_dir / DEVICE_METADATA_FILE
if file_path.exists():
print('Directory already contains a metadata file. Aborting.')
return ReturnCodes.ABORTED
serialized_metadata = metadata.to_json()
response = input(f'Confirm device metadata: {serialized_metadata} [y/N]')
logger.debug(f'response: {response}')
if response not in definitions.AFFIRMATIVE_USER_RESPONSE:
print('Adding device aborted by user.')
return ReturnCodes.ABORTED
logger.debug(f'Device metadata file {file_path}')
if metadata.save_to_json(file_path) == ReturnCodes.FILE_ALREADY_EXISTS:
logger.error('File exists after checking, which should not happen.')
return ReturnCodes.ABORTED
print('Device metadata successfully created.')
return ReturnCodes.SUCCESS
def configure_metadata():
pass
def guided_setup(device_root) -> DeviceMetadata:
logger.info('Guided setup')
response = 'N'
device_name = ''
while response.upper() == 'N':
device_name = input('Please enter name of device: ')
response = input(f'Confirm device name: {device_name} [y/N] ')
if device_name == '' or device_name is None:
print('Name cannot be empty')
logger.warning('Name cannot be empty')
logger.debug(f'Response is {response}')
logger.debug(f'Device name is {device_name}')
return DeviceMetadata(device_name, device_root)

View File

@@ -0,0 +1,170 @@
import subprocess
from pathlib import Path
from iottb.definitions import *
from iottb.models.capture_metadata_model import CaptureMetadata
from iottb.models.device_metadata_model import DeviceMetadata, dir_contains_device_metadata
from iottb.utils.capture_utils import get_capture_src_folder, make_capture_src_folder
def setup_capture_parser(subparsers):
parser = subparsers.add_parser('sniff', help='Sniff packets with tcpdump')
# metadata args
parser.add_argument('-a', '--ip-address', help='IP address of the device to sniff', dest='device_ip')
# tcpdump args
parser.add_argument('device_root', help='Root folder for device to sniff',
type=Path, default=Path.cwd())
parser.add_argument('-s', '--safe', help='Ensure correct device root folder before sniffing', action='store_true')
parser.add_argument('--app', help='Application name to sniff', dest='app_name', default=None)
parser_sniff_tcpdump = parser.add_argument_group('tcpdump arguments')
parser_sniff_tcpdump.add_argument('-i', '--interface', help='Interface to capture on.', dest='capture_interface',
required=True)
parser_sniff_tcpdump.add_argument('-I', '--monitor-mode', help='Put interface into monitor mode',
action='store_true')
parser_sniff_tcpdump.add_argument('-n', help='Deactivate name resolution. True by default.',
action='store_true', dest='no_name_resolution')
parser_sniff_tcpdump.add_argument('-#', '--number',
help='Print packet number at beginning of line. True by default.',
action='store_true')
parser_sniff_tcpdump.add_argument('-e', help='Print link layer headers. True by default.',
action='store_true', dest='print_link_layer')
parser_sniff_tcpdump.add_argument('-t', action='count', default=0,
help='Please see tcpdump manual for details. Unused by default.')
cap_size_group = parser.add_mutually_exclusive_group(required=False)
cap_size_group.add_argument('-c', '--count', type=int, help='Number of packets to capture.', default=1000)
cap_size_group.add_argument('--mins', type=int, help='Time in minutes to capture.', default=1)
parser.set_defaults(func=handle_capture)
def cwd_is_device_root_dir() -> bool:
device_metadata_file = Path.cwd() / DEVICE_METADATA_FILE
return device_metadata_file.is_file()
def start_guided_device_root_dir_setup():
assert False, 'Not implemented'
def handle_metadata():
assert not cwd_is_device_root_dir()
print(f'Unable to find {DEVICE_METADATA_FILE} in current working directory')
print('You need to setup a device root directory before using this command')
response = input('Would you like to be guided through the setup? [y/n]')
if response.lower() == 'y':
start_guided_device_root_dir_setup()
else:
print('\'iottb init-device-root --help\' for more information.')
exit(ReturnCodes.ABORTED)
# device_id = handle_capture_metadata()
return ReturnCodes.SUCCESS
def get_device_metadata_from_file(device_metadata_filename: Path) -> str:
assert device_metadata_filename.is_file(), f'Device metadata file f"{device_metadata_filename}" does not exist'
device_metadata = DeviceMetadata.load_from_json(device_metadata_filename)
return device_metadata
def run_tcpdump(cmd):
# TODO: Maybe specify files for stout and stderr
try:
p = subprocess.run(cmd, capture_output=True, text=True, check=True)
if p.returncode != 0:
print(f'Error running tcpdump {p.stderr}')
else:
print(f'tcpdump run successfully\n: {p.stdout}')
except KeyboardInterrupt:
pass
def handle_capture(args):
assert args.device_root is not None, f'Device root directory is required'
assert dir_contains_device_metadata(args.device_root), f'Device metadata file \'{args.device_root}\' does not exist'
# get device metadata
if args.safe and not dir_contains_device_metadata(args.device_root):
print(f'Supplied folder contains no device metadata. '
f'Please setup a device root directory before using this command')
exit(ReturnCodes.ABORTED)
elif dir_contains_device_metadata(args.device_root):
device_metadata_filename = args.device_root / DEVICE_METADATA_FILE
device_data = DeviceMetadata.load_from_json(device_metadata_filename)
else:
name = input('Please enter a device name: ')
args.device_root.mkdir(parents=True, exist_ok=True)
device_data = DeviceMetadata(name, args.device_root)
# start constructing environment for capture
capture_dir = get_capture_src_folder(args.device_root)
make_capture_src_folder(capture_dir)
capture_metadata = CaptureMetadata(device_data, capture_dir)
capture_metadata.interface = args.capture_interface
cmd = ['sudo', 'tcpdump', '-i', args.capture_interface]
cmd = build_tcpdump_args(args, cmd, capture_metadata)
capture_metadata.tcpdump_command = cmd
print('Executing: ' + ' '.join(cmd))
# run capture
try:
start_time = datetime.now().strftime('%H:%M:%S')
run_tcpdump(cmd)
stop_time = datetime.now().strftime('%H:%M:%S')
capture_metadata.start_time = start_time
capture_metadata.stop_time = stop_time
except KeyboardInterrupt:
print('Received keyboard interrupt.')
exit(ReturnCodes.ABORTED)
except subprocess.CalledProcessError as e:
print(f'Failed to capture packet: {e}')
exit(ReturnCodes.FAILURE)
except Exception as e:
print(f'Failed to capture packet: {e}')
exit(ReturnCodes.FAILURE)
return ReturnCodes.SUCCESS
def build_tcpdump_args(args, cmd, capture_metadata: CaptureMetadata):
if args.monitor_mode:
cmd.append('-I')
if args.no_name_resolution:
cmd.append('-n')
if args.number:
cmd.append('-#')
if args.print_link_layer:
cmd.append('-e')
if args.count:
cmd.append('-c')
cmd.append(str(args.count))
elif args.mins:
assert False, 'Unimplemented option'
if args.app_name is not None:
capture_metadata.app = args.app_name
capture_metadata.build_capture_file_name()
cmd.append('-w')
cmd.append(capture_metadata.capture_file)
if args.safe:
cmd.append(f'host {args.device_ip}') # if not specified, filter 'any' implied by tcpdump
capture_metadata.device_id = args.device_ip
return cmd
# def capture_file_cmd(args, cmd, capture_dir, capture_metadata: CaptureMetadata):
# capture_file_prefix = capture_metadata.get_device_metadata().get_device_short_name()
# if args.app_name is not None:
# capture_file_prefix = args.app_name
# capture_metadata.set_app(args.app_name)
# capfile_name = capture_file_prefix + '_' + str(capture_metadata.get_capture_id()) + '.pcap'
# capture_metadata.set_capture_file(capfile_name)
# capfile_abs_path = capture_dir / capfile_name
# capture_metadata.set_capture_file(capfile_name)
# cmd.append('-w')
# cmd.append(str(capfile_abs_path))

View File

@@ -0,0 +1,44 @@
import uuid
from pathlib import Path
from iottb.models.device_metadata_model import dir_contains_device_metadata
from iottb.utils.utils import get_iso_date
def get_capture_uuid():
return str(uuid.uuid4())
def get_capture_date_folder(device_root: Path):
today_iso = get_iso_date()
today_folder = device_root / today_iso
if dir_contains_device_metadata(device_root):
if not today_folder.is_dir():
try:
today_folder.mkdir()
except FileExistsError:
print(f'Folder {today_folder} already exists')
return today_folder
raise FileNotFoundError(f'Given path {device_root} is not a device root directory')
def get_capture_src_folder(device_folder: Path):
assert device_folder.is_dir(), f'Given path {device_folder} is not a folder'
today_iso = get_iso_date()
max_sequence_number = 1
for d in device_folder.iterdir():
if d.is_dir() and d.name.startswith(f'{today_iso}_capture_'):
name = d.name
num = int(name.split('_')[2])
max_sequence_number = max(max_sequence_number, num)
next_sequence_number = max_sequence_number + 1
return device_folder.joinpath(f'{today_iso}_capture_{next_sequence_number:03}')
def make_capture_src_folder(capture_src_folder: Path):
try:
capture_src_folder.mkdir()
except FileExistsError:
print(f'Folder {capture_src_folder} already exists')
finally:
return capture_src_folder

View File

@@ -1,5 +1,7 @@
import ipaddress
import shutil
import subprocess
from typing import Optional
def check_installed() -> bool:
@@ -10,7 +12,7 @@ def check_installed() -> bool:
def ensure_installed():
"""Ensure that tcpdump is installed, raise an error if not."""
if not check_installed():
raise RuntimeError("tcpdump is not installed. Please install it to continue.")
raise RuntimeError('tcpdump is not installed. Please install it to continue.')
def list_interfaces() -> str:
@@ -20,9 +22,20 @@ def list_interfaces() -> str:
result = subprocess.run(['tcpdump', '--list-interfaces'], capture_output=True, text=True, check=True)
return result.stdout
except subprocess.CalledProcessError as e:
print(f"Failed to list interfaces: {e}")
return ""
print(f'Failed to list interfaces: {e}')
return ''
def start_tcpdump():
return None
def is_valid_ipv4(ip: str) -> bool:
try:
ipaddress.IPv4Address(ip)
return True
except ValueError:
return False
def str_to_ipv4(ip: str) -> (bool, Optional[ipaddress]):
try:
address = ipaddress.IPv4Address(ip)
return address == ipaddress.IPv4Address(ip), address
except ipaddress.AddressValueError:
return False, None

18
code/iottb/utils/utils.py Normal file
View File

@@ -0,0 +1,18 @@
import uuid
from datetime import datetime
from iottb.definitions import TODAY_DATE_STRING, DEVICE_METADATA_FILE, CAPTURE_METADATA_FILE
from pathlib import Path
def get_iso_date():
return datetime.now().strftime('%Y-%m-%d')
def subfolder_exists(parent: Path, child: str):
return parent.joinpath(child).exists()
def generate_unique_string_with_prefix(prefix: str):
return prefix + '_' + str(uuid.uuid4())

View File

@@ -1,40 +0,0 @@
#!/usr/bin/env python3
import argparse
from kydcap.subcommands.sniff import setup_sniff_parser
from kydcap.subcommands.initialize_device_root_dir import setup_init_root_dir_parser
CAP_DIR_PREFIX = ...
######################
# Argparse setup
######################
def setup_argparse():
# create top level parser
root_parser = argparse.ArgumentParser(prog="kydcap")
subparsers = root_parser.add_subparsers(title="subcommands", required=True, dest="command")
setup_sniff_parser(subparsers)
setup_init_root_dir_parser(subparsers)
return root_parser
def main():
parser = setup_argparse()
args = parser.parse_args()
print(args)
if args.command:
try:
args.func(args)
except KeyboardInterrupt:
print("Received keyboard interrupt. Exiting...")
exit(1)
except Exception as e:
print(f"Error: {e}")
# create_capture_directory(args.device_name)
if __name__ == "__main__":
main()

View File

@@ -1,20 +0,0 @@
from datetime import datetime
from enum import Flag, unique, global_enum
DEVICE_METADATA_FILE = "device-metadata.json"
CAPTURE_METADATA_FILE = "capture-metadata.json"
TODAY_DATE_STRING = datetime.now().strftime("%d%b%Y").lower()
@unique
@global_enum
class ReturnCodes(Flag):
SUCCESS = 0
ABORTED = 1
FAILURE = 2
UNKNOWN = 3
FILE_NOT_FOUND = 4
FILE_ALREADY_EXISTS = 5
INVALID_ARGUMENT = 6
INVALID_ARGUMENT_VALUE = 7

View File

@@ -1,47 +0,0 @@
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Optional, Any
from pydantic import BaseModel, Field
from kydcap.config import ReturnCodes
class KydcapCaptureMetadata(BaseModel):
# Required Fields
device_id: str
capture_id: uuid.UUID = Field(default_factory=lambda: str(uuid.uuid4()))
capture_date: str = Field(default_factory=lambda: datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower())
# Statistics
start_time: str
stop_time: str
packet_count: Optional[int]
# Optional Fields
device_ip_address: Optional[str] = None
device_mac_address: Optional[str] = None
app: Optional[str] = None
app_version: Optional[str] = None
firmware_version: Optional[str] = None
def __init__(self, device_id: str, start_time: str, stop_time: str, /, **data: Any):
super().__init__(**data) # Pycharms orders
assert isinstance(device_id, str)
assert isinstance(start_time, str)
assert isinstance(stop_time, str)
self.device_id = device_id
self.start_time = start_time
self.stop_time = stop_time
def save_to_json(self, file_path: Path):
if file_path.is_file():
print(f"File {file_path} already exists, update instead.")
return ReturnCodes.FILE_ALREADY_EXISTS
metadata = self.model_dump_json(indent=2)
with file_path.open('w') as file:
json.dump(metadata, file)
return ReturnCodes.SUCCESS

View File

@@ -1,66 +0,0 @@
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Optional, List, Any
# kydcap modules
from kydcap.config import ReturnCodes
# 3rd party libs
from pydantic import BaseModel, Field
IMMUTABLE_FIELDS = {"device_name", "device_short_name", "device_id", "date_created"}
class DeviceMetadata(BaseModel):
# Required fields
device_name: str
device_short_name: str
device_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
date_created: str = Field(default_factory=lambda: datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower())
# Optional Fields
device_type: Optional[str] = None
device_serial_number: Optional[str] = None
device_firmware_version: Optional[str] = None
date_updated: Optional[str] = None
capture_files: Optional[List[str]] = []
def __init__(self, device_name: str, /, **data: Any):
super().__init__(**data)
self.device_name = device_name
self.device_short_name = device_name.lower().replace(" ", "_")
@classmethod
def load_from_json(cls, file_path: Path):
assert file_path.is_file()
with file_path.open('r') as file:
metadata_json = json.load(file)
metadata_model_obj = cls.model_validate_json(metadata_json)
return metadata_model_obj
def save_to_json(self, file_path: Path):
if file_path.is_file():
print(f"File {file_path} already exists, update instead.")
return ReturnCodes.FILE_ALREADY_EXISTS
metadata = self.model_dump_json(indent=2)
with file_path.open('w') as file:
json.dump(metadata, file)
return ReturnCodes.SUCCESS
@classmethod
def update_metadata_in_json(cls, file_path: Path, **kwargs):
# TODO Maybe not needed at all.
assert file_path.is_file()
for field in IMMUTABLE_FIELDS:
if field in kwargs:
print(f"Field {field} is immutable")
return ReturnCodes.IMMUTABLE
metadata = cls.load_from_json(file_path)
for field, value in kwargs.items():
if field in metadata.model_fields_set:
setattr(metadata, field, value)
metadata.date_updated = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
pass

View File

@@ -1,40 +0,0 @@
import pathlib
from kydcap.config import DEVICE_METADATA_FILE
from kydcap.models.device_metadata_model import DeviceMetadata
def setup_init_root_dir_parser(subparsers):
parser = subparsers.add_parser("init-device-root", aliases=["idr"])
parser.add_argument("--root_dir", type=pathlib.Path, default=pathlib.Path.cwd())
group = parser.add_mutually_exclusive_group()
group.add_argument("--dynamic", action="store_true", help="enable guided setup", default=False)
group.add_argument("-n", "--name", action="store", type=str, help="name of device")
parser.set_defaults(func=handle_idr)
def handle_idr(args):
print("Entered kydcap initialize-device-root")
root_dir = args.root_dir
device_name = None
if args.dynamic:
response = "N"
while response == "N":
name = input("Please enter name of device: ")
# TODO extended config for other fields like apps, firmware etc.
response = input(f"Confirm device name: {name} [y/N]")
device_name = name
else:
device_name = args.name
root_dir.mkdir(parents=True, exist_ok=True)
root_dir.chdir()
dev_metadata_model = DeviceMetadata(device_name)
file_path = root_dir / device_name / DEVICE_METADATA_FILE
assert not file_path.exists(), f"{file_path} already exists"
if args.dynamic:
response = input(f"Confirm device metadata: {dev_metadata_model.model_dump()} [y/N]")
if response.lower() != "y":
assert False, "TODO implement dynamic setup"
code = dev_metadata_model.save_to_json(file_path)
print(f"Device metadata saved to {file_path}")
return code

View File

@@ -1,111 +0,0 @@
import subprocess
from pathlib import Path
from kydcap.config import *
from kydcap.models.device_metadata_model import DeviceMetadata
def setup_sniff_parser(subparsers):
parser = subparsers.add_parser('sniff', help='Sniff packets with tcpdump')
# metadata args
parser.add_argument("-a", "--ip-address=", help="IP address of the device to sniff", dest="device_ip")
# tcpdump args
parser_sniff_tcpdump = parser.add_argument_group('tcpdump arguments')
parser_sniff_tcpdump.add_argument("-i", "--interface=", help="Interface to capture on.", dest="capture_interface",
default="any")
parser_sniff_tcpdump.add_argument("-I", "--monitor-mode", help="Put interface into monitor mode",
action="store_true")
parser_sniff_tcpdump.add_argument("-n", help="Deactivate name resolution. Option is set by default.",
action="store_true", dest="no_name_resolution")
parser_sniff_tcpdump.add_argument("-#", "--number",
help="Print packet number at beginning of line. Set by default.",
action="store_true")
parser_sniff_tcpdump.add_argument("-e", help="Print link layer headers. Option is set by default.",
action="store_true", dest="print_link_layer")
parser_sniff_tcpdump.add_argument("-t", action="count", default=0,
help="Please see tcpdump manual for details. Unused by default.")
# parser_sniff_tcpdump.add_argument("--filter",type=str,default="ip help=f"pcap filter expression. \
# Defaults is '{default}'")
# shared args
cap_size_group = parser.add_mutually_exclusive_group(required=False)
cap_size_group.add_argument("-c", "--count", type=int, help="Number of packets to capture.", default=0)
cap_size_group.add_argument("--mins", type=int, help="Time in minutes to capture.", default=60)
parser.set_defaults(func=handle_sniff)
# return parser
# parser.add_default(func=handle_sniff(args=sniff_args))
def cwd_is_device_root_dir() -> bool:
device_metadata_file = Path.cwd() / DEVICE_METADATA_FILE
return device_metadata_file.exists()
def start_guided_device_root_dir_setup():
assert False, "Not implemented"
def handle_metadata():
assert not cwd_is_device_root_dir()
print(f"Unable to find {DEVICE_METADATA_FILE} in current working directory")
print("You need to setup a device root directory before using this command")
response = input("Would you like to be guided through the setup? [y/n]")
if response.lower() == "y":
start_guided_device_root_dir_setup()
else:
print("'kydcap init-device-root --help' for more information.")
exit(ReturnCodes.ABORTED)
# device_id = handle_capture_metadata()
return ReturnCodes.SUCCESS
def handle_capture_metadata():
device_metadata_json = Path.cwd() / DEVICE_METADATA_FILE
device_metadata = DeviceMetadata.load_from_json(device_metadata_json)
device_id = device_metadata.device_id
return device_id
def handle_date_dir():
pass
def run_tcpdum(cmd):
subprocess.run(cmd)
def handle_sniff(args):
if cwd_is_device_root_dir():
handle_date_dir()
cmd = ['sudo tcpdump', '-i', args.capture_interface]
if args.monitor_mode:
cmd.append('-I')
if args.no_name_resolution:
cmd.append('-n')
if args.number:
cmd.append('-#')
if args.print_link_layer:
cmd.append('-e')
if args.count:
cmd.append('-c')
cmd.append(str(args.count))
elif args.mins:
pass
print('Executing: ' + ' '.join(cmd))
# TODO maybe dump this into file -> put into device metadata
try:
start_time = datetime.now().strftime('%H:%M:%S')
run_tcpdum(cmd)
stop_time = datetime.now().strftime('%H:%M:%S')
except KeyboardInterrupt:
print("Received keyboard interrupt.")
exit(ReturnCodes.ABORTED)
except subprocess.CalledProcessError as e:
print(f"Failed to capture packet: {e}")
exit(ReturnCodes.FAILURE)
except Exception as e:
print(f"Failed to capture packet: {e}")
exit(ReturnCodes.FAILURE)
return ReturnCodes.SUCCESS
else:
handle_metadata()

View File

@@ -0,0 +1,44 @@
import sys
import unittest
from io import StringIO
from unittest.mock import patch, MagicMock
from pathlib import Path
from iottb.definitions import DEVICE_METADATA_FILE
import shutil
from iottb.__main__ import main
class TestDeviceMetadataFileCreation(unittest.TestCase):
def setUp(self):
self.test_dir = Path('/tmp/iottbtest/test_add_device')
self.test_dir.mkdir(parents=True, exist_ok=True)
# self.captured_output = StringIO()
# sys.stdout = self.captured_output
def tearDown(self):
# shutil.rmtree(str(self.test_dir))
for item in self.test_dir.iterdir():
if item.is_dir():
item.rmdir()
else:
item.unlink()
self.test_dir.rmdir()
# sys.stdout = sys.__stdout__
@patch('builtins.input', side_effect=['iPhone 14', 'y', 'y'])
def test_guided_device_setup(self, mock_input):
sys.argv = ['__main__.py', 'add', '--root_dir', str(self.test_dir), '--guided']
main()
expected_file = self.test_dir / DEVICE_METADATA_FILE
self.assertTrue(expected_file.exists()), f'Expected file not created: {expected_file}'
@patch('builtins.input', side_effect=['y']) # need mock_input else wont work
def test_device_setup(self, mock_input):
sys.argv = ['__main__.py', 'add', '--root_dir', str(self.test_dir), '--name', 'iPhone 14']
main()
expected_file = self.test_dir / DEVICE_METADATA_FILE
self.assertTrue(expected_file.exists()), f'Expected file not created: {expected_file}'
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,2 @@
def test_save_to_json():
assert False

View File

@@ -1,6 +1,2 @@
import json
from pathlib import Path
from unittest.mock import mock_open, patch
import pytest
from kydcap.utils.capture_metadata_utils import set_device_ip_address