Compare commits

...

78 Commits

Author SHA1 Message Date
Sebastian Lenzlinger
04200ee09b Add usage examples to README.md. 2024-07-17 13:02:14 +02:00
Sebastian Lenzlinger
62da103253 Change language of Readmes 2024-07-17 12:43:23 +02:00
Sebastian Lenzlinger
7515939a41 Change location of License 2024-07-17 12:42:11 +02:00
Sebastian Lenzlinger
481a300cac Change location of License 2024-07-17 12:40:39 +02:00
Sebastian Lenzlinger
519e1e9730 Add presentation sources 2024-07-17 12:31:54 +02:00
Sebastian Lenzlinger
c4fc38ce8e Add presentation sources 2024-07-17 12:31:37 +02:00
Sebastian Lenzlinger
4eeb0aa55d Make root README useful 2024-07-17 12:28:31 +02:00
Sebastian Lenzlinger
bbc092d747 Relocate iottb executable 2024-07-17 12:17:38 +02:00
Sebastian Lenzlinger
1a531f1047 Update Readme 2024-07-17 12:14:13 +02:00
Sebastian Lenzlinger
c6c8cfb223 Import sys and modify exit invocation to be able to create single file executable of iottb 2024-07-17 12:10:57 +02:00
Sebastian Lenzlinger
78c155208e Add install instructions to Readme 2024-07-17 11:58:16 +02:00
Sebastian Lenzlinger
f4cad751a2 Revert changes since there was actually no issue in code, just with the installation on local machine. 2024-07-17 11:42:29 +02:00
Sebastian Lenzlinger
ccbdf0e9ed Add newly generated poetry lockfile. 2024-07-17 11:15:45 +02:00
Sebastian Lenzlinger
e6d52cf3a1 Remove poetry lockfile before generating new one. 2024-07-17 11:15:03 +02:00
Sebastian Lenzlinger
45e300ca3a Merge branch 'main' of gitlab.com:dmi-pet/bsc-msc/2024-bsc-sebastian-lenzlinger 2024-07-17 11:13:26 +02:00
Sebastian Lenzlinger
ae93234eb6 Prelim fix for wrong db path generation 2024-07-17 11:13:06 +02:00
Sebastian Lenzlinger
6a9d9c65f9 Add presentation slides presented versions. 2024-07-17 09:00:58 +00:00
Sebastian Lenzlinger
97d0238cc2 Merge branch 'main' of gitlab.com:dmi-pet/bsc-msc/2024-bsc-sebastian-lenzlinger 2024-07-17 10:57:55 +02:00
Sebastian Lenzlinger
b063eb0972 Add signed 'wissenschaftliche Redlichkeit' 2024-07-17 10:57:07 +02:00
Sebastian Lenzlinger
9e8c17fd0d Add Preliminere Presentation Slides 2024-07-11 04:15:18 +00:00
Sebastian Lenzlinger
c5b8ea42e7 Add guided device add functionality. 2024-07-10 16:52:27 +02:00
Sebastian Lenzlinger
1b5c324c50 Remove JetBrains config files 2024-07-10 12:40:35 +00:00
7daca7f3ad Add missing argument for add_device parameter list. 2024-07-01 19:22:09 +02:00
da05edb71a Update add-device command. Make "device" a positional argument instead of a kw option 2024-07-01 19:10:41 +02:00
b3f0f7a3ed Add missing dependency 'click-option-group' 2024-07-01 18:29:51 +02:00
a7a2809228 Remove conflicting paths before merge' 2024-07-01 18:10:50 +02:00
89d93121d6 idea config 2024-07-01 18:09:02 +02:00
38c93a2cb1 Unpack thesis sources. 2024-07-01 03:43:45 +02:00
9ca84861b3 More acceptible thesis state. 2024-07-01 03:43:22 +02:00
41053b8e9a Handin thesis. Please consider a later handin than this which is worked on as I commit. 2024-07-01 00:13:01 +02:00
e62914e738 HANDIN COMMIT 2024-07-01 00:08:04 +02:00
854fba049d Make help message generation robust. 2024-06-30 00:37:18 +02:00
Sebastian Lenzlinger
de30d7a4af Merge branch 'experiment2' into 'main'
Merge experiment2 into main

See merge request dmi-pet/bsc-msc/2024-bsc-sebastian-lenzlinger!9
2024-06-29 22:12:51 +00:00
Sebastian Lenzlinger
6405b8f62e Merge branch 'main' into 'experiment2'
# Conflicts:
#   .gitignore
#   code/iottb-project/iottb/commands/add_device.py
#   code/iottb-project/iottb/commands/developer.py
#   code/iottb-project/iottb/commands/sniff.py
#   code/iottb-project/iottb/definitions.py
#   code/iottb-project/iottb/main.py
#   code/iottb-project/iottb/models/sniff_metadata.py
#   code/iottb-project/iottb/utils/string_processing.py
#   code/iottb-project/pyproject.toml
2024-06-29 22:12:34 +00:00
Sebastian Lenzlinger
d9d3f66fc8 Hopefully successfully integrate proper repo. 2024-06-30 00:02:59 +02:00
a5325fc35f Another try 2024-06-29 00:43:47 +02:00
Sebastian Lenzlinger
929b40983c add LICENSE 2024-06-23 13:14:23 +00:00
Sebastian Lenzlinger
01954bd5a6 Introduce complete refactoring. 2024-06-18 03:12:40 +02:00
Sebastian Lenzlinger
b345474a89 eod sync 2024-06-12 23:07:09 +02:00
Sebastian Lenzlinger
bf33dfe3a8 Why am I so slow???????????? Just sync commit. Slowly but surely getting allong with this refactoring. 2024-06-12 22:33:08 +02:00
Sebastian Lenzlinger
7d9095f113 SYNC 2024-06-12 20:01:59 +02:00
Sebastian Lenzlinger
ae82bd3a67 Fix debugger 2024-06-12 17:33:42 +02:00
Sebastian Lenzlinger
f22b06ad14 Small corrections to list_interfaces(). Also make pyproject.toml usable. 2024-06-12 16:51:17 +02:00
Sebastian Lenzlinger
5196c2e129 Ensure no arguments are passed to list_interfaces() 2024-06-12 16:47:45 +02:00
Sebastian Lenzlinger
2b782bbdca Cleanup 2024-06-12 16:04:32 +02:00
Sebastian Lenzlinger
e5ece09c33 Clean Slate (or not) 2024-06-12 13:31:49 +02:00
Sebastian Lenzlinger
f82b45a91e Add journal entry for 25. May 2024. 2024-05-15 18:17:20 +02:00
Sebastian Lenzlinger
0751e26172 Rename folder to use new date format. 2024-05-15 18:16:41 +02:00
Sebastian Lenzlinger
7b2ef173eb Introduce environment variable 'IOTTB_HOME' to represent the root of the iottb database tree. 2024-05-15 17:17:23 +02:00
Sebastian Lenzlinger
6a7ab04e1c Rename journal files into more ISO friendly date format for better sorting. 2024-05-15 17:14:58 +02:00
Sebastian Lenzlinger
7eff5ef555 Update gitignore 2024-05-08 19:13:01 +02:00
Sebastian Lenzlinger
880bbdf50c Journal entry on setting up wifi. Resource for implementing the functionality into iottb. 2024-05-08 19:09:43 +02:00
Sebastian Lenzlinger
18cdd1dd46 Add help text to subcommands.
This way they appear better in the console.
2024-05-08 03:53:17 +02:00
Sebastian Lenzlinger
3bbbbe52c0 Add command to list available NIC names. 2024-05-08 03:48:35 +02:00
Sebastian Lenzlinger
1f43b11177 Remove deprecated import from archive. 2024-05-08 03:47:09 +02:00
Sebastian Lenzlinger
8b7ad05ad6 Ensure tcpdump is installed before running capture command. 2024-05-08 03:45:55 +02:00
Sebastian Lenzlinger
7badc83530 SYNC May, 8th 2024. 2024-05-08 03:25:41 +02:00
Sebastian Lenzlinger
5c031d8157 Merge branch 'refs/heads/dev-logger'
# Conflicts:
#	code/iottb/logger.py
#	code/iottb/models/capture_metadata_model.py
#	code/iottb/models/device_metadata_model.py
#	code/iottb/subcommands/add_device.py
2024-05-08 03:11:23 +02:00
Sebastian Lenzlinger
7465819ef4 Remove unused file from main branch 2024-05-08 03:08:41 +02:00
Sebastian Lenzlinger
901133c84c Merge branch 'refs/heads/cli-dev'
# Conflicts:
#	.idea/workspace.xml
#	code/iottb/logger.py
#	code/iottb/models/capture_metadata_model.py
#	code/iottb/models/device_metadata_model.py
#	code/iottb/subcommands/add_device.py
2024-05-08 03:07:16 +02:00
Sebastian Lenzlinger
64788a1997 Move unused modules into archive. 2024-05-08 03:06:20 +02:00
Sebastian Lenzlinger
7ffbdda7ea Add test case to add-device subcommand 2024-05-08 02:56:49 +02:00
Sebastian Lenzlinger
2e95bd2fd2 Fix places where quote replacement lead to issues. 2024-05-08 02:52:37 +02:00
Sebastian Lenzlinger
e569eb3e5b Replace all double quotes strings with single quoted strings. 2024-05-08 02:46:14 +02:00
Sebastian Lenzlinger
266a669e5e Add test for device metadata file creation and fixes until test passed. 2024-05-08 02:36:07 +02:00
Sebastian Lenzlinger
a21312ee61 Corrections 2024-05-08 01:38:53 +02:00
Sebastian Lenzlinger
73771be70d Add preliminary implementation to load DeviceMetadata from data. 2024-05-08 00:52:25 +02:00
Sebastian Lenzlinger
27ae736f11 Implement custom to_json function for DeviceMetadata 2024-05-08 00:40:29 +02:00
Sebastian Lenzlinger
cb1ad33cae Implement custom to_json function for CaptureMetadata 2024-05-08 00:40:17 +02:00
Sebastian Lenzlinger
2681ee9a8e Refactor function name to reflect that not using pydantic anymore. 2024-05-08 00:31:25 +02:00
Sebastian Lenzlinger
799414ad39 Remove getters and setters from DeviceMetadata class and change dependencies to use field access. 2024-05-08 00:03:18 +02:00
Sebastian Lenzlinger
6b73530943 Remove getters and setters from CaptureMetadata class and refactor dependencies to use field access. 2024-05-08 00:02:08 +02:00
Sebastian Lenzlinger
2efa0d9e7b Merge remote-tracking branch 'refs/remotes/origin/main' 2024-05-07 23:04:59 +02:00
Sebastian Lenzlinger
798a32b23e Remove ignored file from git. 2024-05-07 23:01:40 +02:00
Sebastian Lenzlinger
00aa20a8af Merge branch 'dev-logger' into 'main'
Factor out pydantic.

See merge request dmi-pet/bsc-msc/2024-bsc-sebastian-lenzlinger!7
2024-05-07 20:50:15 +00:00
Sebastian Lenzlinger
11e2c356fa Factor out pydantic. 2024-05-07 22:48:53 +02:00
Sebastian Lenzlinger
95426e0baa Merge branch 'cli-dev' into 'main'
Refactor and add Logger

See merge request dmi-pet/bsc-msc/2024-bsc-sebastian-lenzlinger!6
2024-05-07 19:24:56 +00:00
Sebastian Lenzlinger
347d43dcef Refactor and add Logger 2024-05-07 19:24:56 +00:00
165 changed files with 11232 additions and 933 deletions

39
.gitignore vendored
View File

@ -1,3 +1,38 @@
.obsidian
venv
__pycache__ __pycache__
.venv
iottb.egg-info
.idea/
*.log
logs/
*.pyc
.obsidian
dist/
build/
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
# User-specific stuff
.idea/**/workspace.xml
.idea/**/tasks.xml
.idea/**/usage.statistics.xml
.idea/**/dictionaries
.idea/**/shelf
# AWS User-specific
.idea/**/aws.xml
# Generated files
.idea/**/contentModel.xml
# Sensitive or high-churn files
.idea/**/dataSources/
.idea/**/dataSources.ids
.idea/**/dataSources.local.xml
.idea/**/sqlDataSources.xml
.idea/**/dynamic.xml
.idea/**/uiDesigner.xml
.idea/**/dbnavigator.xml
.private/
*.pcap

View File

@ -1,10 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<module version="4">
<component name="PyDocumentationSettings">
<option name="format" value="PLAIN" />
<option name="myDocStringFormat" value="Plain" />
</component>
<component name="TestRunnerService">
<option name="PROJECT_TEST_RUNNER" value="py.test" />
</component>
</module>

View File

@ -1,17 +0,0 @@
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="DuplicatedCode" enabled="true" level="WEAK WARNING" enabled_by_default="true">
<Languages>
<language minSize="67" name="Python" />
</Languages>
</inspection_tool>
<inspection_tool class="PyPep8NamingInspection" enabled="true" level="WEAK WARNING" enabled_by_default="true">
<option name="ignoredErrors">
<list>
<option value="N806" />
</list>
</option>
</inspection_tool>
</profile>
</component>

View File

@ -1,6 +0,0 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

7
.idea/misc.xml generated
View File

@ -1,7 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Black">
<option name="sdkName" value="Python 3.12 (pythonProject)" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.12 (2024-bsc-sebastian-lenzlinger)" project-jdk-type="Python SDK" />
</project>

6
.idea/vcs.xml generated
View File

@ -1,6 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="" vcs="Git" />
</component>
</project>

14
.idea/webResources.xml generated
View File

@ -1,14 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="WebResourcesPaths">
<contentEntries>
<entry url="file://$PROJECT_DIR$">
<entryData>
<resourceRoots>
<path value="file://$PROJECT_DIR$/data" />
</resourceRoots>
</entryData>
</entry>
</contentEntries>
</component>
</project>

308
.idea/workspace.xml generated
View File

@ -1,308 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="AutoImportSettings">
<option name="autoReloadType" value="SELECTIVE" />
</component>
<component name="ChangeListManager">
<list default="true" id="7a3ac8e1-7fbf-4aa7-9cf9-a51d7ade8503" name="Changes" comment="UNTESTED REFACTORING:&#10;Move more functionality into Metadata Model classes to ensure data is available and better passable between functions.">
<change afterPath="$PROJECT_DIR$/code/iottb/logger.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/code/iottb/models/capture_metadata_model.py" beforeDir="false" afterPath="$PROJECT_DIR$/code/iottb/models/capture_metadata_model.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/code/iottb/models/device_metadata_model.py" beforeDir="false" afterPath="$PROJECT_DIR$/code/iottb/models/device_metadata_model.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/code/iottb/subcommands/add_device.py" beforeDir="false" afterPath="$PROJECT_DIR$/code/iottb/subcommands/add_device.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/code/iottb/subcommands/capture.py" beforeDir="false" afterPath="$PROJECT_DIR$/code/iottb/subcommands/capture.py" afterDir="false" />
</list>
<option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" />
<option name="HIGHLIGHT_NON_ACTIVE_CHANGELIST" value="false" />
<option name="LAST_RESOLUTION" value="IGNORE" />
</component>
<component name="FileTemplateManagerImpl">
<option name="RECENT_TEMPLATES">
<list>
<option value="Python Script" />
</list>
</option>
</component>
<component name="Git.Settings">
<option name="PUSH_AUTO_UPDATE" value="true" />
<option name="RECENT_BRANCH_BY_REPOSITORY">
<map>
<entry key="$PROJECT_DIR$" value="sync" />
</map>
</option>
<option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$" />
</component>
<component name="ProblemsViewState">
<option name="selectedTabId" value="CurrentFile" />
</component>
<component name="ProjectColorInfo">{
&quot;associatedIndex&quot;: 3
}</component>
<component name="ProjectId" id="2fYAAba0AnH9jx9D0JkB8Xbuv0r" />
<component name="ProjectViewState">
<option name="hideEmptyMiddlePackages" value="true" />
<option name="showLibraryContents" value="true" />
</component>
<component name="PropertiesComponent"><![CDATA[{
"keyToString": {
"ASKED_ADD_EXTERNAL_FILES": "true",
"ASKED_MARK_IGNORED_FILES_AS_EXCLUDED": "true",
"ASKED_SHARE_PROJECT_CONFIGURATION_FILES": "true",
"Python.__init__.executor": "Run",
"Python.__main__.executor": "Debug",
"Python.iotdb.executor": "Debug",
"Python.main.executor": "Run",
"RunOnceActivity.ShowReadmeOnStart": "true",
"SHARE_PROJECT_CONFIGURATION_FILES": "true",
"git-widget-placeholder": "cli-dev",
"last_opened_file_path": "/home/slnopriv/projects/2024-bsc-sebastian-lenzlinger/code/iottb/logger.py",
"node.js.detected.package.eslint": "true",
"node.js.detected.package.tslint": "true",
"node.js.selected.package.eslint": "(autodetect)",
"node.js.selected.package.tslint": "(autodetect)",
"nodejs_package_manager_path": "npm",
"settings.editor.selected.configurable": "com.jetbrains.python.configuration.PyActiveSdkModuleConfigurable",
"vue.rearranger.settings.migration": "true"
}
}]]></component>
<component name="RecentsManager">
<key name="MoveFile.RECENT_KEYS">
<recent name="$PROJECT_DIR$/archive" />
<recent name="$PROJECT_DIR$" />
<recent name="$PROJECT_DIR$/code/misc/archive" />
<recent name="$PROJECT_DIR$/code/misc" />
<recent name="$PROJECT_DIR$/code/kydcap/utils" />
</key>
</component>
<component name="RunManager" selected="Python.__main__">
<configuration name="__init__" type="PythonConfigurationType" factoryName="Python" temporary="true" nameIsGenerated="true">
<module name="2024-bsc-sebastian-lenzlinger" />
<option name="ENV_FILES" value="" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/code/kydcap" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/code/kydcap/__init__.py" />
<option name="PARAMETERS" value="" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
<configuration name="__main__" type="PythonConfigurationType" factoryName="Python" temporary="true" nameIsGenerated="true">
<module name="2024-bsc-sebastian-lenzlinger" />
<option name="ENV_FILES" value="" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/code/iottb" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/code/iottb/__main__.py" />
<option name="PARAMETERS" value="add --root /tmp/test --guided" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
<list>
<item itemvalue="Python.__main__" />
<item itemvalue="Python.__init__" />
</list>
<recent_temporary>
<list>
<item itemvalue="Python.__main__" />
<item itemvalue="Python.__init__" />
</list>
</recent_temporary>
</component>
<component name="SharedIndexes">
<attachedChunks>
<set>
<option value="bundled-js-predefined-1d06a55b98c1-74d2a5396914-JavaScript-PY-241.14494.241" />
<option value="bundled-python-sdk-0509580d9d50-28c9f5db9ffe-com.jetbrains.pycharm.pro.sharedIndexes.bundled-PY-241.14494.241" />
</set>
</attachedChunks>
</component>
<component name="SpellCheckerSettings" RuntimeDictionaries="0" Folders="0" CustomDictionaries="0" DefaultDictionary="application-level" UseSingleDictionary="true" transferred="true" />
<component name="TaskManager">
<task active="true" id="Default" summary="Default task">
<changelist id="7a3ac8e1-7fbf-4aa7-9cf9-a51d7ade8503" name="Changes" comment="" />
<created>1713967494544</created>
<option name="number" value="Default" />
<option name="presentableId" value="Default" />
<updated>1713967494544</updated>
<workItem from="1713967495566" duration="6927000" />
<workItem from="1714554228183" duration="34000" />
<workItem from="1714554269789" duration="56478000" />
<workItem from="1714616237168" duration="6135000" />
<workItem from="1714850899817" duration="2659000" />
<workItem from="1714917763516" duration="9796000" />
<workItem from="1715078660090" duration="25014000" />
</task>
<task id="LOCAL-00001" summary="Add code for capture testbed. This is a huge commit. End of day sync...">
<option name="closed" value="true" />
<created>1714615532115</created>
<option name="number" value="00001" />
<option name="presentableId" value="LOCAL-00001" />
<option name="project" value="LOCAL" />
<updated>1714615532115</updated>
</task>
<task id="LOCAL-00002" summary="Add some notes.">
<option name="closed" value="true" />
<created>1714615608142</created>
<option name="number" value="00002" />
<option name="presentableId" value="LOCAL-00002" />
<option name="project" value="LOCAL" />
<updated>1714615608142</updated>
</task>
<task id="LOCAL-00003" summary="Update gitignore">
<option name="closed" value="true" />
<created>1714616343905</created>
<option name="number" value="00003" />
<option name="presentableId" value="LOCAL-00003" />
<option name="project" value="LOCAL" />
<updated>1714616343905</updated>
</task>
<task id="LOCAL-00004" summary="Add test module.">
<option name="closed" value="true" />
<created>1714617162903</created>
<option name="number" value="00004" />
<option name="presentableId" value="LOCAL-00004" />
<option name="project" value="LOCAL" />
<updated>1714617162903</updated>
</task>
<task id="LOCAL-00005" summary="Update gitignore again.">
<option name="closed" value="true" />
<created>1714617231842</created>
<option name="number" value="00005" />
<option name="presentableId" value="LOCAL-00005" />
<option name="project" value="LOCAL" />
<updated>1714617231842</updated>
</task>
<task id="LOCAL-00006" summary="Start tracking development config files.">
<option name="closed" value="true" />
<created>1714617266799</created>
<option name="number" value="00006" />
<option name="presentableId" value="LOCAL-00006" />
<option name="project" value="LOCAL" />
<updated>1714617266799</updated>
</task>
<task id="LOCAL-00007" summary="SYNC">
<option name="closed" value="true" />
<created>1714823516954</created>
<option name="number" value="00007" />
<option name="presentableId" value="LOCAL-00007" />
<option name="project" value="LOCAL" />
<updated>1714823516954</updated>
</task>
<task id="LOCAL-00008" summary="Refactor various names.">
<option name="closed" value="true" />
<created>1714919098392</created>
<option name="number" value="00008" />
<option name="presentableId" value="LOCAL-00008" />
<option name="project" value="LOCAL" />
<updated>1714919098392</updated>
</task>
<task id="LOCAL-00009" summary="Refactor subcommands, config etc.">
<option name="closed" value="true" />
<created>1714924463148</created>
<option name="number" value="00009" />
<option name="presentableId" value="LOCAL-00009" />
<option name="project" value="LOCAL" />
<updated>1714924463148</updated>
</task>
<task id="LOCAL-00010" summary="UNTESTED REFACTORING:&#10;Move more functionality into Metadata Model classes to ensure data is available and better passable between functions.">
<option name="closed" value="true" />
<created>1715101138312</created>
<option name="number" value="00010" />
<option name="presentableId" value="LOCAL-00010" />
<option name="project" value="LOCAL" />
<updated>1715101138312</updated>
</task>
<option name="localTasksCounter" value="11" />
<servers />
</component>
<component name="TypeScriptGeneratedFilesManager">
<option name="version" value="3" />
</component>
<component name="Vcs.Log.Tabs.Properties">
<option name="RECENT_FILTERS">
<map>
<entry key="Branch">
<value>
<list>
<RecentGroup>
<option name="FILTER_VALUES">
<option value="HEAD" />
</option>
</RecentGroup>
<RecentGroup>
<option name="FILTER_VALUES">
<option value="devel" />
</option>
</RecentGroup>
</list>
</value>
</entry>
</map>
</option>
<option name="TAB_STATES">
<map>
<entry key="MAIN">
<value>
<State>
<option name="FILTERS">
<map>
<entry key="branch">
<value>
<list>
<option value="HEAD" />
</list>
</value>
</entry>
</map>
</option>
</State>
</value>
</entry>
</map>
</option>
</component>
<component name="VcsManagerConfiguration">
<MESSAGE value="Add code for capture testbed. This is a huge commit. End of day sync..." />
<MESSAGE value="Add some notes." />
<MESSAGE value="Update gitignore" />
<MESSAGE value="Add test module." />
<MESSAGE value="Update gitignore again." />
<MESSAGE value="Start tracking development config files." />
<MESSAGE value="SYNC" />
<MESSAGE value="Refactor various names." />
<MESSAGE value="Refactor subcommands, config etc." />
<MESSAGE value="UNTESTED REFACTORING:&#10;Move more functionality into Metadata Model classes to ensure data is available and better passable between functions." />
<option name="LAST_COMMIT_MESSAGE" value="UNTESTED REFACTORING:&#10;Move more functionality into Metadata Model classes to ensure data is available and better passable between functions." />
</component>
<component name="com.intellij.coverage.CoverageDataManagerImpl">
<SUITE FILE_PATH="coverage/2024_bsc_sebastian_lenzlinger$__main__.coverage" NAME="__main__ Coverage Results" MODIFIED="1715103831289" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="false" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/code/iottb" />
<SUITE FILE_PATH="coverage/2024_bsc_sebastian_lenzlinger$iotdb.coverage" NAME="iotdb Coverage Results" MODIFIED="1715103593519" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="false" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/code/iottb" />
<SUITE FILE_PATH="coverage/2024_bsc_sebastian_lenzlinger$__init__.coverage" NAME="__init__ Coverage Results" MODIFIED="1714619300966" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="false" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/code/kydcap" />
<SUITE FILE_PATH="coverage/2024_bsc_sebastian_lenzlinger$main.coverage" NAME="__main__ Coverage Results" MODIFIED="1714619560177" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="false" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/code/kydcap" />
</component>
</project>

View File

@ -1,4 +1,4 @@
# Your Project Name # IOTTB
Hello! This is the README file that accompanies the Gitlab repository for your Bachelor or Master thesis. You'll need to update this README as you work on your thesis to reflect relevant information about your thesis. Hello! This is the README file that accompanies the Gitlab repository for your Bachelor or Master thesis. You'll need to update this README as you work on your thesis to reflect relevant information about your thesis.
@ -6,29 +6,26 @@ Hello! This is the README file that accompanies the Gitlab repository for your B
## Organization of the repository ## Organization of the repository
- **code** folder: holds source code - **code** folder: holds source code
- **data** folder: holds (input) data required for the project. If your input data files are larger than 100MB, create a sample data file smaller than 100MB and commit the sample instead of the full data file. Include a note explaining how the full data can be retrieved. - **data** folder: Holds no relevant data for this thesis. Files in here where used for debugging and testing.
- **results** folder: holds results files generated as part of the project - **thesis** folder: contains the latex sources + PDF of the final thesis.
- **thesis** folder: contains the latex sources + PDF of the final thesis. You can use the [basilea-latex template](https://github.com/ivangiangreco/basilea-latex) as a starting point. - **presentation** folder: contains PDF and sources of the presentation.
- **presentation** folder: contains the sources of the presentation (e.g., latex or PPT) - **literature** used can be found in the **thesis** folder .bib or in the **presentation** folders .bib file.
- **literature** folder: contains any research paper that the student needs to read or finds interesting - **notes** folder: Various notes and the beginnings of a wiki.
- **notes** folder: holds minutes of meetings - `iottb` is the python testbed as a single executable (including python interpreter) which should be able to run on Linux machines.
## Useful resources
- [Efficient Reading of Papers in Science and Technology](https://www.cs.columbia.edu/~hgs/netbib/efficientReading.pdf)
- [Heilmeier's catechism](https://en.wikipedia.org/wiki/George_H._Heilmeier#Heilmeier%27s_Catechism)
## Description ## Description
Let people know what your project can do specifically. Provide context and add a link to any reference visitors might be unfamiliar with. A list of Features or a Background subsection can also be added here. If there are alternatives to your project, this is a good place to list differentiating factors. In this thesis I design a automation testbed for IoT devices.
The main result is the software `iottb` which automates some aspects of experimenting with IoT devices.
## Visuals Currently, it implements a database guided by the FAIR principles of open data as well as wraps tcpdump such that metadata is stored.
Depending on what you are making, it can be a good idea to include screenshots or even a video (you'll frequently see GIFs rather than actual videos). Tools like ttygif can help, but check out Asciinema for a more sophisticated method.
## Installation
Within a particular ecosystem, there may be a common way of installing things, such as using Yarn, NuGet, or Homebrew. However, consider the possibility that whoever is reading your README is a novice and would like more guidance. Listing specific steps helps remove ambiguity and gets people to using your project as quickly as possible. If it only runs in a specific context like a particular programming language version or operating system or has dependencies that have to be installed manually, also add a Requirements subsection.
## Usage ## Usage
Use examples liberally, and show the expected output if you can. It's helpful to have inline the smallest example of usage that you can demonstrate, while providing links to more sophisticated examples if they are too long to reasonably include in the README. For more info see `code/iottb-project/README.md`.
As well as examples in the thesis writeup at `thesis/BScThesisUnibas_main-5.pdf`. <br>
In general:
```bash
iottb --help # Most general overview
iottb <subcommand> --help
```
## License ## License
To allow further development and use during public events of the implemented system through the University of Basel, the system is expected to be well documented and provided to the university under a license that allows such reuse, e.g., the [BSD 3-clause license](https://opensource.org/license/bsd-3-clause/). The student agrees that all code produced during the project may be released open-source in the context of the PET group's projects. The code is licensed under a BSD 3-clause license, a copy of which is provided in the file `code/iottb-project/LICENSE`.

View File

@ -9,12 +9,12 @@ def set_device_ip_address(ip_addr: str, file_path: Path):
assert file_path.is_file() assert file_path.is_file()
with file_path.open('r') as f: with file_path.open('r') as f:
data = json.load(f) data = json.load(f)
current_ip = data["device_ip_address"] current_ip = data['device_ip_address']
if current_ip is not None: if current_ip is not None:
print(f"Device IP Address is set to {current_ip}") print(f'Device IP Address is set to {current_ip}')
response = input(f"Do you want to change the recorded IP address to {ip_addr}? [Y/N] ") response = input(f'Do you want to change the recorded IP address to {ip_addr}? [Y/N] ')
if response.upper() == "N": if response.upper() == 'N':
print("Aborting change to device IP address") print('Aborting change to device IP address')
return ReturnCodes.ABORTED return ReturnCodes.ABORTED
with file_path.open('w') as f: with file_path.open('w') as f:
json.dump(data, f) json.dump(data, f)
@ -26,12 +26,12 @@ def set_device_mac_address(mac_addr: str, file_path: Path):
assert file_path.is_file() assert file_path.is_file()
with file_path.open('r') as f: with file_path.open('r') as f:
data = json.load(f) data = json.load(f)
current_mac = data["device_mac_address"] current_mac = data['device_mac_address']
if current_mac is not None: if current_mac is not None:
print(f"Device MAC Address is set to {current_mac}") print(f'Device MAC Address is set to {current_mac}')
response = input(f"Do you want to change the recorded MAC address to {mac_addr}? [Y/N] ") response = input(f'Do you want to change the recorded MAC address to {mac_addr}? [Y/N] ')
if response.upper() == "N": if response.upper() == 'N':
print("Aborting change to device MAC address") print('Aborting change to device MAC address')
return ReturnCodes.ABORTED return ReturnCodes.ABORTED
with file_path.open('w') as f: with file_path.open('w') as f:
json.dump(data, f) json.dump(data, f)

View File

@ -2,31 +2,58 @@ def setup_sniff_tcpdump_parser(parser_sniff):
# arguments which will be passed to tcpdump # arguments which will be passed to tcpdump
parser_sniff_tcpdump = parser_sniff.add_argument_group('tcpdump arguments') parser_sniff_tcpdump = parser_sniff.add_argument_group('tcpdump arguments')
# TODO: tcpdump_parser.add_argument('-c', '--count', re) # TODO: tcpdump_parser.add_argument('-c', '--count', re)
parser_sniff_tcpdump.add_argument("-a", "--ip-address=", help="IP address of the device to sniff", dest="device_ip") parser_sniff_tcpdump.add_argument('-a', '--ip-address=', help='IP address of the device to sniff', dest='device_ip')
parser_sniff_tcpdump.add_argument("-i", "--interface=", help="Interface of the capture device.", dest="capture_interface",default="") parser_sniff_tcpdump.add_argument('-i', '--interface=', help='Interface of the capture device.', dest='capture_interface',default='')
parser_sniff_tcpdump.add_argument("-I", "--monitor-mode", help="Put interface into monitor mode", parser_sniff_tcpdump.add_argument('-I', '--monitor-mode', help='Put interface into monitor mode',
action="store_true") action='store_true')
parser_sniff_tcpdump.add_argument("-n", help="Deactivate name resolution. Option is set by default.", parser_sniff_tcpdump.add_argument('-n', help='Deactivate name resolution. Option is set by default.',
action="store_true") action='store_true')
parser_sniff_tcpdump.add_argument("-#", "--number", parser_sniff_tcpdump.add_argument('-#', '--number',
help="Print packet number at beginning of line. Set by default.", help='Print packet number at beginning of line. Set by default.',
action="store_true") action='store_true')
parser_sniff_tcpdump.add_argument("-e", help="Print link layer headers. Option is set by default.", parser_sniff_tcpdump.add_argument('-e', help='Print link layer headers. Option is set by default.',
action="store_true") action='store_true')
parser_sniff_tcpdump.add_argument("-t", action="count", default=0, parser_sniff_tcpdump.add_argument('-t', action='count', default=0,
help="Please see tcpdump manual for details. Unused by default.") help='Please see tcpdump manual for details. Unused by default.')
def setup_sniff_parser(subparsers): def setup_sniff_parser(subparsers):
# create parser for "sniff" command # create parser for 'sniff' command
parser_sniff = subparsers.add_parser("sniff", help="Start tcpdump capture.") parser_sniff = subparsers.add_parser('sniff', help='Start tcpdump capture.')
setup_sniff_tcpdump_parser(parser_sniff) setup_sniff_tcpdump_parser(parser_sniff)
setup_pcap_filter_parser(parser_sniff) setup_pcap_filter_parser(parser_sniff)
cap_size_group = parser_sniff.add_mutually_exclusive_group(required=True) cap_size_group = parser_sniff.add_mutually_exclusive_group(required=True)
cap_size_group.add_argument("-c", "--count", type=int, help="Number of packets to capture.", default=0) cap_size_group.add_argument('-c', '--count', type=int, help='Number of packets to capture.', default=0)
cap_size_group.add_argument("--mins", type=int, help="Time in minutes to capture.", default=60) cap_size_group.add_argument('--mins', type=int, help='Time in minutes to capture.', default=60)
def setup_pcap_filter_parser(parser_sniff): def setup_pcap_filter_parser(parser_sniff):
parser_pcap_filter = parser_sniff.add_argument_parser("pcap-filter expression") parser_pcap_filter = parser_sniff.add_argument_parser('pcap-filter expression')
pass pass
def check_iottb_env():
# This makes the option '--root-dir' obsolescent # TODO How to streamline this?\
try:
iottb_home = environ['IOTTB_HOME'] # TODO WARN implicit declaration of env var name!
except KeyError:
logger.error(f"Environment variable 'IOTTB_HOME' is not set."
f"Setting environment variable 'IOTTB_HOME' to '~/{IOTTB_HOME_ABS}'")
environ['IOTTB_HOME'] = IOTTB_HOME_ABS
finally:
if not Path(IOTTB_HOME_ABS).exists():
print(f'"{IOTTB_HOME_ABS}" does not exist.')
response = input('Do you want to create it now? [y/N]')
logger.debug(f'response: {response}')
if response.lower() != 'y':
logger.debug(f'Not setting "IOTTB_HOME"')
print('TODO')
print("Aborting execution...")
return ReturnCodes.ABORTED
else:
print(f'Setting environment variable IOTTB_HOME""')
Path(IOTTB_HOME_ABS).mkdir(parents=True,
exist_ok=False) # Should always work since in 'not exist' code path
return ReturnCodes.SUCCESS
logger.info(f'"{IOTTB_HOME_ABS}" exists.')
# TODO: Check that it is a valid iottb dir or can we say it is valid by definition if?
return ReturnCodes.SUCCESS

107
archive/iottb/__main__.py Normal file
View File

@ -0,0 +1,107 @@
#!/usr/bin/env python3
import argparse
from os import environ
from pathlib import Path
import logging
from archive.iottb.subcommands.add_device import setup_init_device_root_parser
# from iottb.subcommands.capture import setup_capture_parser
from iottb.subcommands.sniff import setup_sniff_parser
from iottb.utils.tcpdump_utils import list_interfaces
from iottb.logger import setup_logging
logger = logging.getLogger('iottbLogger.__main__')
logger.setLevel(logging.DEBUG)
######################
# Argparse setup
######################
def setup_argparse():
# create top level parser
root_parser = argparse.ArgumentParser(prog='iottb')
# shared options
root_parser.add_argument('--verbose', '-v', action='count', default=0)
root_parser.add_argument('--script-mode', action='store_true', help='Run in script mode (non-interactive)')
# Group of args w.r.t iottb.db creation
group = root_parser.add_argument_group('database options')
group.add_argument('--db-home', default=Path.home() / 'IoTtb.db')
group.add_argument('--config-home', default=Path.home() / '.config' / 'iottb.conf', type=Path, )
group.add_argument('--user', default=Path.home().stem, type=Path, )
# configure subcommands
subparsers = root_parser.add_subparsers(title='subcommands', required=True, dest='command')
# setup_capture_parser(subparsers)
setup_init_device_root_parser(subparsers)
setup_sniff_parser(subparsers)
# Utility to list interfaces directly with iottb instead of relying on external tooling
interfaces_parser = subparsers.add_parser('list-interfaces', aliases=['li', 'if'],
help='List available network interfaces.')
interfaces_parser.set_defaults(func=list_interfaces)
return root_parser
###
# Where put ?!
###
class IoTdb:
def __init__(self, db_home=Path.home() / 'IoTtb.db', iottb_config=Path.home() / '.conf' / 'iottb.conf',
user=Path.home().stem):
self.db_home = db_home
self.config_home = iottb_config
self.default_filters_home = db_home / 'default_filters'
self.user = user
def create_db(self, mode=0o777, parents=False, exist_ok=False):
logger.info(f'Creating db at {self.db_home}')
try:
self.db_home.mkdir(mode=mode, parents=parents, exist_ok=exist_ok)
except FileExistsError:
logger.error(f'Database path already at {self.db_home} exists and is not a directory')
finally:
logger.debug(f'Leaving finally clause in create_db')
def create_device_tree(self, mode=0o777, parents=False, exist_ok=False):
logger.info(f'Creating device tree at {self.db_home / 'devices'}')
#TODO
def parse_db_config(self):
pass
def parse_iottb_config(self):
pass
def get_known_devices(self):
pass
def iottb_db_exists(db_home=Path.home() / 'IoTtb.db'):
res = db_home.is_dir()
def main():
logger.debug(f'Pre setup_argparse()')
parser = setup_argparse()
logger.debug('Post setup_argparse().')
args = parser.parse_args()
logger.debug(f'Args parsed: {args}')
if args.command:
try:
args.func(args)
except KeyboardInterrupt:
print('Received keyboard interrupt. Exiting...')
exit(1)
except Exception as e:
logger.debug(f'Error in main: {e}')
print(f'Error: {e}')
# create_capture_directory(args.device_name)
if __name__ == '__main__':
setup_logging()
logger.debug("Debug level is working")
logger.info("Info level is working")
logger.warning("Warning level is working")
main()

View File

@ -0,0 +1,41 @@
from datetime import datetime
from enum import Flag, unique, global_enum
from pathlib import Path
'''
Defining IOTTB_HOME_ABS here implies that it be immutable.
It is used here so that one could configure it.
But after its used in __man__ this cannot be relied upon.
'''
IOTTB_HOME_ABS = Path().home() / 'IOTTB.db'
# TODO maybe wrap this into class to make it easier to pass along to different objects
# But will need more refactoring
DEVICE_METADATA_FILE = 'device_metadata.json'
CAPTURE_METADATA_FILE = 'capture_metadata.json'
TODAY_DATE_STRING = datetime.now().strftime('%d%b%Y').lower() # TODO convert to function in utils or so
CAPTURE_FOLDER_BASENAME = 'capture_###'
AFFIRMATIVE_USER_RESPONSE = {'yes', 'y', 'true', 'Y', 'Yes', 'YES'}
NEGATIVE_USER_RESPONSE = {'no', 'n', 'N', 'No'}
YES_DEFAULT = AFFIRMATIVE_USER_RESPONSE.union({'', ' '})
NO_DEFAULT = NEGATIVE_USER_RESPONSE.union({'', ' '})
@unique
@global_enum
class ReturnCodes(Flag):
SUCCESS = 0
ABORTED = 1
FAILURE = 2
UNKNOWN = 3
FILE_NOT_FOUND = 4
FILE_ALREADY_EXISTS = 5
INVALID_ARGUMENT = 6
INVALID_ARGUMENT_VALUE = 7
def iottb_home_abs():
return None

35
archive/iottb/logger.py Normal file
View File

@ -0,0 +1,35 @@
import logging
import sys
import os
from logging.handlers import RotatingFileHandler
def setup_logging():
# Ensure the logs directory exists
log_directory = 'logs'
if not os.path.exists(log_directory):
os.makedirs(log_directory)
# Create handlers
file_handler = RotatingFileHandler(os.path.join(log_directory, 'iottb.log'), maxBytes=1048576, backupCount=5)
console_handler = logging.StreamHandler(sys.stdout)
# Create formatters and add it to handlers
file_fmt = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
console_fmt = logging.Formatter(
'%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s')
file_handler.setFormatter(file_fmt)
console_handler.setFormatter(console_fmt)
# Get the root logger and add handlers
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
root_logger.addHandler(file_handler)
root_logger.addHandler(console_handler)
# Prevent propagation to the root logger to avoid duplicate logs
root_logger.propagate = False
setup_logging()

View File

@ -0,0 +1,106 @@
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Optional
from iottb.definitions import ReturnCodes, CAPTURE_METADATA_FILE
from iottb.models.device_metadata_model import DeviceMetadata
import logging
logger = logging.getLogger('iottbLogger.capture_metadata_model')
logger.setLevel(logging.DEBUG)
class CaptureMetadata:
# Required Fields
device_metadata: DeviceMetadata
device_id: str
capture_dir: Path
capture_file: str
# Statistics
start_time: str
stop_time: str
# tcpdump
packet_count: Optional[int]
pcap_filter: str = ''
tcpdump_command: str = ''
interface: str = ''
# Optional Fields
device_ip_address: str = 'No IP Address set'
device_mac_address: Optional[str] = None
app: Optional[str] = None
app_version: Optional[str] = None
firmware_version: Optional[str] = None
def __init__(self, device_metadata: DeviceMetadata, capture_dir: Path):
logger.info(f'Creating CaptureMetadata model from DeviceMetadata: {device_metadata}')
self.device_metadata = device_metadata
self.capture_id = str(uuid.uuid4())
self.capture_date = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
self.capture_dir = capture_dir
assert capture_dir.is_dir(), f'Capture directory {capture_dir} does not exist'
def build_capture_file_name(self):
logger.info(f'Building capture file name')
if self.app is None:
logger.debug(f'No app specified')
prefix = "iphone-14" #self.device_metadata.device_short_name
else:
logger.debug(f'App specified: {self.app}')
assert str(self.app).strip() not in {'', ' '}, f'app is not a valid name: {self.app}'
prefix = self.app.lower().replace(' ', '_')
# assert self.capture_dir is not None, f'{self.capture_dir} does not exist'
filename = f'{prefix}_{str(self.capture_id)}.pcap'
logger.debug(f'Capture file name: {filename}')
self.capture_file = filename
def save_capture_metadata_to_json(self, file_path: Path = Path(CAPTURE_METADATA_FILE)):
assert self.capture_dir.is_dir(), f'capture_dir is not a directory: {self.capture_dir}'
if file_path.is_file():
print(f'File {file_path} already exists, update instead.')
return ReturnCodes.FILE_ALREADY_EXISTS
metadata = self.to_json(indent=2)
with file_path.open('w') as file:
json.dump(metadata, file)
return ReturnCodes.SUCCESS
def to_json(self, indent=2):
# TODO: Where to validate data?
logger.info(f'Converting CaptureMetadata to JSON')
data = {}
# List of fields from CaptureData class, if fields[key]==True, then it is a required field
fields = {
'capture_id': True, #
'device_id': True,
'capture_dir': True,
'capture_file': False,
'capture_date': False,
'start_time': True,
'stop_time': True,
'packet_count': False,
'pcap_filter': False,
'tcpdump_command': False,
'interface': False,
'device_ip_address': False,
'device_mac_address': False,
'app': False,
'app_version': False,
'firmware_version': False
}
for field, is_mandatory in fields.items():
value = getattr(self, field, None)
if value not in [None, ''] or is_mandatory:
if value in [None, ''] and is_mandatory:
raise ValueError(f'Field {field} is required and cannot be empty.')
data[field] = str(value) if not isinstance(value, str) else value
logger.debug(f'Capture metadata: {data}')
return json.dumps(data, indent=indent)

View File

@ -0,0 +1,114 @@
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Optional, List
# iottb modules
from iottb.definitions import ReturnCodes, DEVICE_METADATA_FILE
import logging
logger = logging.getLogger('iottbLogger.device_metadata_model')
logger.setLevel(logging.DEBUG)
# 3rd party libs
IMMUTABLE_FIELDS = {'device_name', 'device_short_name', 'device_id', 'date_created'}
class DeviceMetadata:
# Required fields
device_name: str
device_short_name: str
device_id: str
date_created: str
device_root_path: Path
# Optional Fields
aliases: Optional[List[str]] = None
device_type: Optional[str] = None
device_serial_number: Optional[str] = None
device_firmware_version: Optional[str] = None
date_updated: Optional[str] = None
capture_files: Optional[List[str]] = []
def __init__(self, device_name: str, device_root_path: Path):
self.device_name = device_name
self.device_short_name = device_name.lower().replace(' ', '_')
self.device_id = str(uuid.uuid4())
self.date_created = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
self.device_root_path = device_root_path
if not self.device_root_path or not self.device_root_path.is_dir():
logger.error(f'Invalid device root path: {device_root_path}')
raise ValueError(f'Invalid device root path: {device_root_path}')
logger.debug(f'Device name: {device_name}')
logger.debug(f'Device short_name: {self.device_short_name}')
logger.debug(f'Device root dir: {device_root_path}')
logger.info(f'Initialized DeviceMetadata model: {device_name}')
@classmethod
def load_from_json(cls, device_file_path: Path):
logger.info(f'Loading DeviceMetadata from JSON file: {device_file_path}')
assert device_file_path.is_file(), f'{device_file_path} is not a file'
assert device_file_path.name == DEVICE_METADATA_FILE, f'{device_file_path} is not a {DEVICE_METADATA_FILE}'
device_meta_filename = device_file_path
with device_meta_filename.open('r') as file:
metadata_json = json.load(file)
metadata_model_obj = cls.from_json(metadata_json)
return metadata_model_obj
def save_to_json(self, file_path: Path):
logger.info(f'Saving DeviceMetadata to JSON file: {file_path}')
if file_path.is_file():
print(f'File {file_path} already exists, update instead.')
return ReturnCodes.FILE_ALREADY_EXISTS
metadata = self.to_json(indent=2)
with file_path.open('w') as file:
json.dump(metadata, file)
return ReturnCodes.SUCCESS
@classmethod
def from_json(cls, metadata_json):
if isinstance(metadata_json, dict):
return DeviceMetadata(**metadata_json)
def to_json(self, indent=2):
# TODO: atm almost exact copy as in CaptureMetadata
data = {}
fields = {
'device_name': True,
'device_short_name': True,
'device_id': True,
'date_created': True,
'device_root_path': True,
'aliases': False,
'device_type': False,
'device_serial_number': False,
'device_firmware_version': False,
'date_updated': False,
'capture_files': False,
}
for field, is_mandatory in fields.items():
value = getattr(self, field, None)
if value not in [None, ''] or is_mandatory:
if value in [None, ''] and is_mandatory:
logger.debug(f'Mandatory field {field}: {value}')
raise ValueError(f'Field {field} is required and cannot be empty.')
data[field] = str(value) if not isinstance(value, str) else value
logger.debug(f'Device metadata: {data}')
return json.dumps(data, indent=indent)
def dir_contains_device_metadata(dir_path: Path):
if not dir_path.is_dir():
return False
else:
meta_file_path = dir_path / DEVICE_METADATA_FILE
print(f'Device metadata file path {str(meta_file_path)}')
if not meta_file_path.is_file():
return False
else:
return True

View File

@ -0,0 +1,77 @@
import logging
import os
import pathlib
from iottb import definitions
from iottb.definitions import DEVICE_METADATA_FILE, ReturnCodes
from iottb.models.device_metadata_model import DeviceMetadata
# logger.setLevel(logging.INFO) # Since module currently passes all tests
logger = logging.getLogger('iottbLogger.add_device')
logger.setLevel(logging.INFO)
def setup_init_device_root_parser(subparsers):
#assert os.environ['IOTTB_HOME'] is not None, "IOTTB_HOME environment variable is not set"
parser = subparsers.add_parser('add-device', aliases=['add-device-root', 'add'],
help='Initialize a folder for a device.')
parser.add_argument('--root_dir', type=pathlib.Path,
default=definitions.IOTTB_HOME_ABS) # TODO: Refactor code to not use this or handle iottb here
group = parser.add_mutually_exclusive_group()
group.add_argument('--guided', action='store_true', help='Guided setup', default=False)
group.add_argument('--name', action='store', type=str, help='name of device')
parser.set_defaults(func=handle_add)
def handle_add(args):
# TODO: This whole function should be refactored into using the fact that IOTTB_HOME is set, and the dir exists
logger.info(f'Add device handler called with args {args}')
if args.guided:
logger.debug('begin guided setup')
metadata = guided_setup(args.root_dir) # TODO refactor to use IOTTB_HOME
logger.debug('guided setup complete')
else:
logger.debug('Setup through passed args: setup')
if not args.name:
logger.error('No device name specified with unguided setup.')
return ReturnCodes.ERROR
metadata = DeviceMetadata(args.name, args.root_dir)
file_path = args.root_dir / DEVICE_METADATA_FILE # TODO IOTTB_HOME REFACTOR
if file_path.exists():
print('Directory already contains a metadata file. Aborting.')
return ReturnCodes.ABORTED
serialized_metadata = metadata.to_json()
response = input(f'Confirm device metadata: {serialized_metadata} [y/N]')
logger.debug(f'response: {response}')
if response not in definitions.AFFIRMATIVE_USER_RESPONSE:
print('Adding device aborted by user.')
return ReturnCodes.ABORTED
logger.debug(f'Device metadata file {file_path}')
if metadata.save_to_json(file_path) == ReturnCodes.FILE_ALREADY_EXISTS:
logger.error('File exists after checking, which should not happen.')
return ReturnCodes.ABORTED
print('Device metadata successfully created.')
return ReturnCodes.SUCCESS
def configure_metadata():
pass
def guided_setup(device_root) -> DeviceMetadata:
logger.info('Guided setup')
response = 'N'
device_name = ''
while response.upper() == 'N':
device_name = input('Please enter name of device: ')
response = input(f'Confirm device name: {device_name} [y/N] ')
if device_name == '' or device_name is None:
print('Name cannot be empty')
logger.warning('Name cannot be empty')
logger.debug(f'Response is {response}')
logger.debug(f'Device name is {device_name}')
return DeviceMetadata(device_name, device_root)

View File

@ -2,39 +2,43 @@ import subprocess
from pathlib import Path from pathlib import Path
from iottb.definitions import * from iottb.definitions import *
import logging
from iottb.models.capture_metadata_model import CaptureMetadata from iottb.models.capture_metadata_model import CaptureMetadata
from iottb.models.device_metadata_model import DeviceMetadata, dir_contains_device_metadata from iottb.models.device_metadata_model import DeviceMetadata, dir_contains_device_metadata
from iottb.utils.capture_utils import get_capture_src_folder, make_capture_src_folder from iottb.utils.capture_utils import get_capture_src_folder, make_capture_src_folder
from iottb.utils.tcpdump_utils import check_installed
logger = logging.getLogger('iottbLogger.capture')
logger.setLevel(logging.DEBUG)
def setup_capture_parser(subparsers): def setup_capture_parser(subparsers):
parser = subparsers.add_parser('sniff', help='Sniff packets with tcpdump') parser = subparsers.add_parser('sniff', help='Sniff packets with tcpdump')
# metadata args # metadata args
parser.add_argument("-a", "--ip-address", help="IP address of the device to sniff", dest="device_ip") parser.add_argument('-a', '--ip-address', help='IP address of the device to sniff', dest='device_ip')
# tcpdump args # tcpdump args
parser.add_argument("device_root", help="Root folder for device to sniff", parser.add_argument('device_root', help='Root folder for device to sniff',
type=Path, default=Path.cwd()) type=Path, default=Path.cwd())
parser.add_argument("-s", "--safe", help="Ensure correct device root folder before sniffing", action="store_true") parser.add_argument('-s', '--safe', help='Ensure correct device root folder before sniffing', action='store_true')
parser.add_argument("--app", help="Application name to sniff", dest="app_name", default=None) parser.add_argument('--app', help='Application name to sniff', dest='app_name', default=None)
parser_sniff_tcpdump = parser.add_argument_group('tcpdump arguments') parser_sniff_tcpdump = parser.add_argument_group('tcpdump arguments')
parser_sniff_tcpdump.add_argument("-i", "--interface", help="Interface to capture on.", dest="capture_interface", parser_sniff_tcpdump.add_argument('-i', '--interface', help='Interface to capture on.', dest='capture_interface',
required=True) required=True)
parser_sniff_tcpdump.add_argument("-I", "--monitor-mode", help="Put interface into monitor mode", parser_sniff_tcpdump.add_argument('-I', '--monitor-mode', help='Put interface into monitor mode',
action="store_true") action='store_true')
parser_sniff_tcpdump.add_argument("-n", help="Deactivate name resolution. True by default.", parser_sniff_tcpdump.add_argument('-n', help='Deactivate name resolution. True by default.',
action="store_true", dest="no_name_resolution") action='store_true', dest='no_name_resolution')
parser_sniff_tcpdump.add_argument("-#", "--number", parser_sniff_tcpdump.add_argument('-#', '--number',
help="Print packet number at beginning of line. True by default.", help='Print packet number at beginning of line. True by default.',
action="store_true") action='store_true')
parser_sniff_tcpdump.add_argument("-e", help="Print link layer headers. True by default.", parser_sniff_tcpdump.add_argument('-e', help='Print link layer headers. True by default.',
action="store_true", dest="print_link_layer") action='store_true', dest='print_link_layer')
parser_sniff_tcpdump.add_argument("-t", action="count", default=0, parser_sniff_tcpdump.add_argument('-t', action='count', default=0,
help="Please see tcpdump manual for details. Unused by default.") help='Please see tcpdump manual for details. Unused by default.')
cap_size_group = parser.add_mutually_exclusive_group(required=False) cap_size_group = parser.add_mutually_exclusive_group(required=False)
cap_size_group.add_argument("-c", "--count", type=int, help="Number of packets to capture.", default=1000) cap_size_group.add_argument('-c', '--count', type=int, help='Number of packets to capture.', default=10)
cap_size_group.add_argument("--mins", type=int, help="Time in minutes to capture.", default=1) cap_size_group.add_argument('--mins', type=int, help='Time in minutes to capture.', default=1)
parser.set_defaults(func=handle_capture) parser.set_defaults(func=handle_capture)
@ -45,25 +49,25 @@ def cwd_is_device_root_dir() -> bool:
def start_guided_device_root_dir_setup(): def start_guided_device_root_dir_setup():
assert False, "Not implemented" assert False, 'Not implemented'
def handle_metadata(): def handle_metadata():
assert not cwd_is_device_root_dir() assert not cwd_is_device_root_dir()
print(f"Unable to find {DEVICE_METADATA_FILE} in current working directory") print(f'Unable to find {DEVICE_METADATA_FILE} in current working directory')
print("You need to setup a device root directory before using this command") print('You need to setup a device root directory before using this command')
response = input("Would you like to be guided through the setup? [y/n]") response = input('Would you like to be guided through the setup? [y/n]')
if response.lower() == "y": if response.lower() == 'y':
start_guided_device_root_dir_setup() start_guided_device_root_dir_setup()
else: else:
print("'iottb init-device-root --help' for more information.") print('\'iottb init-device-root --help\' for more information.')
exit(ReturnCodes.ABORTED) exit(ReturnCodes.ABORTED)
# device_id = handle_capture_metadata() # device_id = handle_capture_metadata()
return ReturnCodes.SUCCESS return ReturnCodes.SUCCESS
def get_device_metadata_from_file(device_metadata_filename: Path) -> str: def get_device_metadata_from_file(device_metadata_filename: Path) -> str:
assert device_metadata_filename.is_file(), f"Device metadata file '{device_metadata_filename} does not exist" assert device_metadata_filename.is_file(), f'Device metadata file f"{device_metadata_filename}" does not exist'
device_metadata = DeviceMetadata.load_from_json(device_metadata_filename) device_metadata = DeviceMetadata.load_from_json(device_metadata_filename)
return device_metadata return device_metadata
@ -73,37 +77,43 @@ def run_tcpdump(cmd):
try: try:
p = subprocess.run(cmd, capture_output=True, text=True, check=True) p = subprocess.run(cmd, capture_output=True, text=True, check=True)
if p.returncode != 0: if p.returncode != 0:
print(f"Error running tcpdump {p.stderr}") print(f'Error running tcpdump {p.stderr}')
# TODO add logging
else: else:
print(f"tcpdump run successfully\n: {p.stdout}") print(f'tcpdump run successfully\n: {p.stdout}')
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
def handle_capture(args): def handle_capture(args):
assert args.device_root is not None, f"Device root directory is required" if not check_installed():
assert dir_contains_device_metadata(args.device_root), f"Device metadata file '{args.device_root}' does not exist" print('Please install tcpdump first')
exit(ReturnCodes.ABORTED)
assert args.device_root is not None, f'Device root directory is required'
assert dir_contains_device_metadata(args.device_root), f'Device metadata file \'{args.device_root}\' does not exist'
# get device metadata # get device metadata
logger.info(f'Device root directory: {args.device_root}')
if args.safe and not dir_contains_device_metadata(args.device_root): if args.safe and not dir_contains_device_metadata(args.device_root):
print(f"Supplied folder contains no device metadata. " print(f'Supplied folder contains no device metadata. '
f"Please setup a device root directory before using this command") f'Please setup a device root directory before using this command')
exit(ReturnCodes.ABORTED) exit(ReturnCodes.ABORTED)
elif dir_contains_device_metadata(args.device_root): elif dir_contains_device_metadata(args.device_root):
device_metadata_filename = args.device_root / DEVICE_METADATA_FILE device_metadata_filename = args.device_root / DEVICE_METADATA_FILE
device_data = DeviceMetadata.load_from_json(device_metadata_filename) device_data = DeviceMetadata.load_from_json(device_metadata_filename)
else: else:
name = input("Please enter a device name: ") name = input('Please enter a device name: ')
args.device_root.mkdir(parents=True, exist_ok=True) args.device_root.mkdir(parents=True, exist_ok=True)
device_data = DeviceMetadata(name, args.device_root) device_data = DeviceMetadata(name, args.device_root)
# start constructing environment for capture # start constructing environment for capture
capture_dir = get_capture_src_folder(args.device_root) capture_dir = get_capture_src_folder(args.device_root)
make_capture_src_folder(capture_dir) make_capture_src_folder(capture_dir)
capture_metadata = CaptureMetadata(device_data, capture_dir) capture_metadata = CaptureMetadata(device_data, capture_dir)
capture_metadata.set_interface(args.capture_interface) capture_metadata.interface = args.capture_interface
cmd = ['sudo', 'tcpdump', '-i', args.capture_interface] cmd = ['sudo', 'tcpdump', '-i', args.capture_interface]
cmd = build_tcpdump_args(args, cmd, capture_metadata) cmd = build_tcpdump_args(args, cmd, capture_metadata)
capture_metadata.set_tcpdump_command(cmd) capture_metadata.tcpdump_command = cmd
print('Executing: ' + ' '.join(cmd)) print('Executing: ' + ' '.join(cmd))
@ -112,16 +122,16 @@ def handle_capture(args):
start_time = datetime.now().strftime('%H:%M:%S') start_time = datetime.now().strftime('%H:%M:%S')
run_tcpdump(cmd) run_tcpdump(cmd)
stop_time = datetime.now().strftime('%H:%M:%S') stop_time = datetime.now().strftime('%H:%M:%S')
capture_metadata.set_start_time(start_time) capture_metadata.start_time = start_time
capture_metadata.set_stop_time(stop_time) capture_metadata.stop_time = stop_time
except KeyboardInterrupt: except KeyboardInterrupt:
print("Received keyboard interrupt.") print('Received keyboard interrupt.')
exit(ReturnCodes.ABORTED) exit(ReturnCodes.ABORTED)
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
print(f"Failed to capture packet: {e}") print(f'Failed to capture packet: {e}')
exit(ReturnCodes.FAILURE) exit(ReturnCodes.FAILURE)
except Exception as e: except Exception as e:
print(f"Failed to capture packet: {e}") print(f'Failed to capture packet: {e}')
exit(ReturnCodes.FAILURE) exit(ReturnCodes.FAILURE)
return ReturnCodes.SUCCESS return ReturnCodes.SUCCESS
@ -141,28 +151,27 @@ def build_tcpdump_args(args, cmd, capture_metadata: CaptureMetadata):
cmd.append('-c') cmd.append('-c')
cmd.append(str(args.count)) cmd.append(str(args.count))
elif args.mins: elif args.mins:
assert False, "Unimplemented option" assert False, 'Unimplemented option'
if args.app_name is not None: if args.app_name is not None:
capture_metadata.set_app_name(args.app_name) capture_metadata.app = args.app_name
capture_metadata.build_capture_file_name() capture_metadata.build_capture_file_name()
cmd.append('-w') cmd.append('-w')
cmd.append(capture_metadata.get_capfile_name()) cmd.append(str(capture_metadata.capture_dir) + "/" + capture_metadata.capture_file)
if args.safe: if args.safe:
cmd.append(f'host {args.device_ip}') # if not specified, filter 'any' implied by tcpdump cmd.append(f'host {args.device_ip}') # if not specified, filter 'any' implied by tcpdump
capture_metadata.set_device_ip_address(args.device_ip) capture_metadata.device_id = args.device_ip
return cmd return cmd
# def capture_file_cmd(args, cmd, capture_dir, capture_metadata: CaptureMetadata): # def capture_file_cmd(args, cmd, capture_dir, capture_metadata: CaptureMetadata):
# capture_file_prefix = capture_metadata.get_device_metadata().get_device_short_name() # capture_file_prefix = capture_metadata.get_device_metadata().get_device_short_name()
# if args.app_name is not None: # if args.app_name is not None:
# capture_file_prefix = args.app_name # capture_file_prefix = args.app_name
# capture_metadata.set_app(args.app_name) # capture_metadata.set_app(args.app_name)
# capfile_name = capture_file_prefix + "_" + str(capture_metadata.get_capture_id()) + ".pcap" # capfile_name = capture_file_prefix + '_' + str(capture_metadata.get_capture_id()) + '.pcap'
# capture_metadata.set_capture_file(capfile_name) # capture_metadata.set_capture_file(capfile_name)
# capfile_abs_path = capture_dir / capfile_name # capfile_abs_path = capture_dir / capfile_name
# capture_metadata.set_capture_file(capfile_name) # capture_metadata.set_capture_file(capfile_name)

View File

@ -0,0 +1,63 @@
import subprocess
import logging
logger = logging.getLogger('iottbLogger.capture')
logger.setLevel(logging.DEBUG)
class Sniffer:
def __init__(self):
pass
def setup_sniff_parser(subparsers):
parser = subparsers.add_parser('sniff', help='Sniff packets with tcpdump')
# metadata args
parser.add_argument('-a', '--addr', help='IP or MAC address of IoT device')
# tcpdump args
parser.add_argument('--app', help='Application name to sniff', default=None)
parser_sniff_tcpdump = parser.add_argument_group('tcpdump arguments')
parser_sniff_tcpdump.add_argument('-i', '--interface', help='Interface to capture on.', dest='capture_interface',
required=True)
parser_sniff_tcpdump.add_argument('-I', '--monitor-mode', help='Put interface into monitor mode',
action='store_true')
parser_sniff_tcpdump.add_argument('-n', help='Deactivate name resolution. True by default.',
action='store_true', dest='no_name_resolution')
parser_sniff_tcpdump.add_argument('-#', '--number',
help='Print packet number at beginning of line. True by default.',
action='store_true')
parser_sniff_tcpdump.add_argument('-e', help='Print link layer headers. True by default.',
action='store_true', dest='print_link_layer')
parser_sniff_tcpdump.add_argument('-t', action='count', default=0,
help='Please see tcpdump manual for details. Unused by default.')
cap_size_group = parser.add_mutually_exclusive_group(required=False)
cap_size_group.add_argument('-c', '--count', type=int, help='Number of packets to capture.', default=10)
cap_size_group.add_argument('--mins', type=int, help='Time in minutes to capture.', default=1)
parser.set_defaults(func=sniff)
def parse_addr(addr):
#TODO Implement
pass
def sniff(args):
if args.addr is None:
print('You must supply either a MAC or IP(v4) address to use this tool!')
logger.info("Exiting on account of missing MAC/IP.")
exit(1)
else:
(type, value) = parse_addr(args.addr)
#TODO Get this party started
def sniff_tcpdump(args, filter):
pass
def sniff_mitmproxy(args, filter):
pass
def sniff_raw(cmd,args):
pass

View File

@ -16,29 +16,29 @@ def get_capture_date_folder(device_root: Path):
try: try:
today_folder.mkdir() today_folder.mkdir()
except FileExistsError: except FileExistsError:
print(f"Folder {today_folder} already exists") print(f'Folder {today_folder} already exists')
return today_folder return today_folder
raise FileNotFoundError(f"Given path {device_root} is not a device root directory") raise FileNotFoundError(f'Given path {device_root} is not a device root directory')
def get_capture_src_folder(device_folder: Path): def get_capture_src_folder(device_folder: Path):
assert device_folder.is_dir(), f"Given path {device_folder} is not a folder" assert device_folder.is_dir(), f'Given path {device_folder} is not a folder'
today_iso = get_iso_date() today_iso = get_iso_date()
max_sequence_number = 1 max_sequence_number = 1
for d in device_folder.iterdir(): for d in device_folder.iterdir():
if d.is_dir() and d.name.startswith(f'{today_iso}_capture_'): if d.is_dir() and d.name.startswith(f'{today_iso}_capture_'):
name = d.name name = d.name
num = int(name.split("_")[2]) num = int(name.split('_')[2])
max_sequence_number = max(max_sequence_number, num) max_sequence_number = max(max_sequence_number, num)
next_sequence_number = max_sequence_number + 1 next_sequence_number = max_sequence_number + 1
return device_folder.joinpath(f"{today_iso}_capture_{next_sequence_number:03}") return device_folder.joinpath(f'{today_iso}_capture_{next_sequence_number:03}')
def make_capture_src_folder(capture_src_folder: Path): def make_capture_src_folder(capture_src_folder: Path):
try: try:
capture_src_folder.mkdir() capture_src_folder.mkdir()
except FileExistsError: except FileExistsError:
print(f"Folder {capture_src_folder} already exists") print(f'Folder {capture_src_folder} already exists')
finally: finally:
return capture_src_folder return capture_src_folder

View File

@ -12,18 +12,18 @@ def check_installed() -> bool:
def ensure_installed(): def ensure_installed():
"""Ensure that tcpdump is installed, raise an error if not.""" """Ensure that tcpdump is installed, raise an error if not."""
if not check_installed(): if not check_installed():
raise RuntimeError("tcpdump is not installed. Please install it to continue.") raise RuntimeError('tcpdump is not installed. Please install it to continue.')
def list_interfaces() -> str: def list_interfaces(args) -> str:
"""List available network interfaces using tcpdump.""" """List available network interfaces using tcpdump."""
ensure_installed() ensure_installed()
try: try:
result = subprocess.run(['tcpdump', '--list-interfaces'], capture_output=True, text=True, check=True) result = subprocess.run(['tcpdump', '--list-interfaces'], capture_output=True, text=True, check=True)
return result.stdout print(result.stdout)
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
print(f"Failed to list interfaces: {e}") print(f'Failed to list interfaces: {e}')
return "" return ''
def is_valid_ipv4(ip: str) -> bool: def is_valid_ipv4(ip: str) -> bool:

View File

@ -13,6 +13,6 @@ def subfolder_exists(parent: Path, child: str):
def generate_unique_string_with_prefix(prefix: str): def generate_unique_string_with_prefix(prefix: str):
return prefix + "_" + str(uuid.uuid4()) return prefix + '_' + str(uuid.uuid4())

View File

@ -15,5 +15,5 @@ class Metadata:
def create_metadata(filename, unique_id, device_details): def create_metadata(filename, unique_id, device_details):
date_string = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S") date_string = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
meta_filename = f"meta_{date_string}_{unique_id}.json" meta_filename = f'meta_{date_string}_{unique_id}.json'

View File

@ -8,33 +8,33 @@ from iottb.definitions import DEVICE_METADATA_FILE
def write_device_metadata_to_file(metadata: DeviceMetadata, device_path: Path): def write_device_metadata_to_file(metadata: DeviceMetadata, device_path: Path):
"""Write the device metadata to a JSON file in the specified directory.""" '''Write the device metadata to a JSON file in the specified directory.'''
meta_file_path = device_path / "meta.json" meta_file_path = device_path / 'meta.json'
meta_file_path.write_text(metadata.json(indent=2)) meta_file_path.write_text(metadata.json(indent=2))
def confirm_device_metadata(metadata: DeviceMetadata) -> bool: def confirm_device_metadata(metadata: DeviceMetadata) -> bool:
"""Display device metadata for user confirmation.""" '''Display device metadata for user confirmation.'''
print(metadata.json(indent=2)) print(metadata.json(indent=2))
return input("Confirm device metadata? (y/n): ").strip().lower() == 'y' return input('Confirm device metadata? (y/n): ').strip().lower() == 'y'
def get_device_metadata_from_user() -> DeviceMetadata: def get_device_metadata_from_user() -> DeviceMetadata:
"""Prompt the user to enter device details and return a populated DeviceMetadata object.""" '''Prompt the user to enter device details and return a populated DeviceMetadata object.'''
device_name = input("Device name: ") device_name = input('Device name: ')
device_short_name = device_name.lower().replace(" ", "-") device_short_name = device_name.lower().replace(' ', '-')
return DeviceMetadata(device_name=device_name, device_short_name=device_short_name) return DeviceMetadata(device_name=device_name, device_short_name=device_short_name)
def initialize_device_root_dir(device_name: str) -> Path: def initialize_device_root_dir(device_name: str) -> Path:
"""Create and return the path for the device directory.""" '''Create and return the path for the device directory.'''
device_path = Path.cwd() / device_name device_path = Path.cwd() / device_name
device_path.mkdir(exist_ok=True) device_path.mkdir(exist_ok=True)
return device_path return device_path
def write_metadata(metadata: BaseModel, device_name: str): def write_metadata(metadata: BaseModel, device_name: str):
"""Write device metadata to a JSON file.""" '''Write device metadata to a JSON file.'''
meta_path = Path.cwd() / device_name / DEVICE_METADATA_FILE meta_path = Path.cwd() / device_name / DEVICE_METADATA_FILE
meta_path.parent.mkdir(parents=True, exist_ok=True) meta_path.parent.mkdir(parents=True, exist_ok=True)
with meta_path.open('w') as f: with meta_path.open('w') as f:
@ -42,19 +42,19 @@ def write_metadata(metadata: BaseModel, device_name: str):
def get_device_metadata(file_path: Path) -> DeviceMetadata | None: def get_device_metadata(file_path: Path) -> DeviceMetadata | None:
"""Fetch device metadata from a JSON file.""" '''Fetch device metadata from a JSON file.'''
if dev_metadata_exists(file_path): if dev_metadata_exists(file_path):
with file_path.open('r') as f: with file_path.open('r') as f:
device_metadata_json = json.load(f) device_metadata_json = json.load(f)
try: try:
device_metadata = DeviceMetadata.model_validate_json(device_metadata_json) device_metadata = DeviceMetadata.from_json(device_metadata_json)
return device_metadata return device_metadata
except ValueError as e: except ValueError as e:
print(f"Validation error for device metadata: {e}") print(f'Validation error for device metadata: {e}')
else: else:
# TODO Decide what to do (e.g. search for file etc) # TODO Decide what to do (e.g. search for file etc)
print(f"No device metadata at {file_path}") print(f'No device metadata at {file_path}')
return None return None

16
archive/pyproject.toml Normal file
View File

@ -0,0 +1,16 @@
[build-system]
requires = ["setuptools>=42", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = 'iottb'
version = '0.1.0'
authors = [{name = "Sebastian Lenzlinger", email = "sebastian.lenzlinger@unibas.ch"}]
description = "Automation Tool for Capturing Network packets of IoT devices."
requires-python = ">=3.8"
[tool.setuptools]
packages = ["iottb"]
[project.scripts]
iottb = "iottb.__main__:main"

View File

@ -0,0 +1,47 @@
import sys
import unittest
from io import StringIO
from unittest.mock import patch, MagicMock
from pathlib import Path
from iottb.definitions import DEVICE_METADATA_FILE
import shutil
from iottb.__main__ import main
class TestDeviceMetadataFileCreation(unittest.TestCase):
def setUp(self):
self.test_dir = Path('/tmp/iottbtest/test_add_device')
self.test_dir.mkdir(parents=True, exist_ok=True)
# self.captured_output = StringIO()
# sys.stdout = self.captured_output
def tearDown(self):
# shutil.rmtree(str(self.test_dir))
for item in self.test_dir.iterdir():
if item.is_dir():
item.rmdir()
else:
item.unlink()
self.test_dir.rmdir()
# sys.stdout = sys.__stdout__
@patch('builtins.input', side_effect=['iPhone 14', 'y', 'y'])
def test_guided_device_setup(self, mock_input):
sys.argv = ['__main__.py', 'add', '--root_dir', str(self.test_dir), '--guided']
main()
expected_file = self.test_dir / DEVICE_METADATA_FILE
self.assertTrue(expected_file.exists()), f'Expected file not created: {expected_file}'
@patch('builtins.input', side_effect=['y']) # need mock_input else wont work
def test_device_setup(self, mock_input):
sys.argv = ['__main__.py', 'add', '--root_dir', str(self.test_dir), '--name', 'iPhone 14']
main()
expected_file = self.test_dir / DEVICE_METADATA_FILE
self.assertTrue(expected_file.exists()), f'Expected file not created: {expected_file}'
def test_add_when_file_exists(self):
# TODO
pass
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,2 @@

View File

@ -0,0 +1,28 @@
BSD 3-Clause License
Copyright (c) 2024, Sebastian Lenzlinger
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
3. Neither the name of the copyright holder nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -0,0 +1,82 @@
# Iottb
## Installation
There are a few different ways to install `iottb`.
In Linux, to install to a users local bin directory using poetry or pip:
- Move into the project root `cd path/to/iottb-project`, so that you are in the directory which contains the `pyproject.toml` file.
```bash
poetry install --editable
# or with pip
pip install -e .
```
Currently, this is the recommended method.
Alternatively install with pip into any activated environment:
```bash
pip install -r requirements.txt
```
It is possible to make a single executable for you machine which you can just put in your path using pyinstaller.
1. Install pyinstaller
```bash
pip install pyinstaller
```
2. Make the executable
```bash
pyinstaller --onefile --name iottb --distpath ~/opt iottb/main.py
```
to be able to run it as `iottb` if `~/opt' is a directory on your PATH.
A executable which should be able to run on linux is included in the repo.
## Basic Invocation
```bash
Usage: iottb [OPTIONS] COMMAND [ARGS]...
Options:
-v, --verbosity Set verbosity [default: 0; 0<=x<=3]
-d, --debug Enable debug mode
--cfg-file PATH Path to iottb config file [default:
/home/seb/.config/iottb/iottb.cfg]
--help Show this message and exit.
--dry-run BOOLEAN currently NOT USED! [default: True]
Commands:
add-device Add a device to a database
init-db
sniff Sniff packets with tcpdump
Debugging Commands:
show-all Show everything: configuration, databases, and...
show-cfg Show the current configuration context
```
## Usage Examples
### Initializing a database
Before devices can be added and packet captures performed, there must be a database.
Initialze a database with default values at the default location:
```bash
iottb init-db
```
### Adding a device
Typically, captures are performed for devices. To add a device (to the current default database)
```bash
iottb add-device 'Echo Dot 2'
```
if the devices is named 'Echo Dot 2'. This will get the cannonical name 'echo-dot'. This name should be used when performing
captures with `iottb`.
### Performing captures/sniffing traffic
```bash
iottb sniff -a <ipv4-addr or mac-addr> 'echo-dot'
```
to sniff traffic on the previously added device 'Echo Dot 2' which received the canonical name 'echo-dot'.
You can get the subcommand specif helptext by adding the `--help` option.
## Configuration
### Env Vars
- IOTTB_CONF_HOME
By setting this variable you control where the basic iottb application
configuration should be looked for
## License
This project is licensed under a BSD 3-clause License, a copy of which is provided in the file `code/iottb-project/LICENSE`.

View File

@ -0,0 +1,110 @@
Usage: iottb [OPTIONS] COMMAND [ARGS]...
Options:
-v, --verbosity Set verbosity [default: 0; 0<=x<=3]
-d, --debug Enable debug mode
--dry-run [default: True]
--cfg-file PATH Path to iottb config file [default:
/home/seb/.config/iottb/iottb.cfg]
--help Show this message and exit.
Commands:
add-device Add a device to a database
init-db
rm-cfg Removes the cfg file from the filesystem.
rm-dbs Removes ALL(!) databases from the filesystem if...
set-key-in-table-to Edit config or metadata files.
show-all Show everything: configuration, databases, and...
show-cfg Show the current configuration context
sniff Sniff packets with tcpdump
Usage: iottb init-db [OPTIONS]
Options:
-d, --dest PATH Location to put (new) iottb database
-n, --name TEXT Name of new database. [default: iottb.db]
--update-default / --no-update-default
If new db should be set as the new default
[default: update-default]
--help Show this message and exit.
Usage: iottb add-device [OPTIONS]
Add a device to a database
Options:
--dev, --device-name TEXT The name of the device to be added. If this
string contains spaces or other special
characters normalization is
performed to derive a canonical name [required]
--db, --database DIRECTORY Database in which to add this device. If not
specified use default from config. [env var:
IOTTB_DB]
--guided Add device interactively [env var:
IOTTB_GUIDED_ADD]
--help Show this message and exit.
Usage: iottb sniff [OPTIONS] [TCPDUMP-ARGS] [DEVICE]
Sniff packets with tcpdump
Options:
Testbed sources:
--db, --database TEXT Database of device. Only needed if not current
default. [env var: IOTTB_DB]
--app TEXT Companion app being used during capture
Runtime behaviour:
--unsafe Disable checks for otherwise required options.
[env var: IOTTB_UNSAFE]
--guided [env var: IOTTB_GUIDED]
--pre TEXT Script to be executed before main command is
started.
--post TEXT Script to be executed upon completion of main
command.
Tcpdump options:
-i, --interface TEXT Network interface to capture on.If not specified
tcpdump tries to find and appropriate one. [env
var: IOTTB_CAPTURE_INTERFACE]
-a, --address TEXT IP or MAC address to filter packets by. [env var:
IOTTB_CAPTURE_ADDRESS]
-I, --monitor-mode Put interface into monitor mode.
--ff TEXT tcpdump filter as string or file path. [env var:
IOTTB_CAPTURE_FILTER]
-#, --print-pacno Print packet number at beginning of line. True by
default. [default: True]
-e, --print-ll Print link layer headers. True by default.
-c, --count INTEGER Number of packets to capture. [default: 1000]
--help Show this message and exit.
Utility Commands mostly for development
Usage: iottb rm-cfg [OPTIONS]
Removes the cfg file from the filesystem.
This is mostly a utility during development. Once non-standard database
locations are implemented, deleting this would lead to iottb not being able
to find them anymore.
Options:
--yes Confirm the action without prompting.
--help Show this message and exit.
Usage: iottb rm-dbs [OPTIONS]
Removes ALL(!) databases from the filesystem if they're empty.
Development utility currently unfit for use.
Options:
--yes Confirm the action without prompting.
--help Show this message and exit.
Usage: iottb show-cfg [OPTIONS]
Show the current configuration context
Options:
--cfg-file PATH Path to the config file [default:
/home/seb/.config/iottb/iottb.cfg]
-pp Pretty Print
--help Show this message and exit.
Usage: iottb show-all [OPTIONS]
Show everything: configuration, databases, and device metadata
Options:
--help Show this message and exit.

View File

@ -0,0 +1,38 @@
Usage: iottb [OPTIONS] COMMAND [ARGS]...
Options:
-v, --verbosity Set verbosity [default: 0; 0<=x<=3]
-d, --debug Enable debug mode
--dry-run [default: True]
--cfg-file PATH Path to iottb config file [default:
/home/seb/.config/iottb/iottb.cfg]
--help Show this message and exit.
Commands:
add-device Add a device to a database
init-db
rm-cfg Removes the cfg file from the filesystem.
rm-dbs Removes ALL(!) databases from the filesystem if...
set-key-in-table-to Edit config or metadata files.
show-all Show everything: configuration, databases, and...
show-cfg Show the current configuration context
sniff Sniff packets with tcpdump
Usage: iottb [OPTIONS] COMMAND [ARGS]...
Options:
-v, --verbosity Set verbosity [default: 0; 0<=x<=3]
-d, --debug Enable debug mode
--dry-run [default: True]
--cfg-file PATH Path to iottb config file [default:
/home/seb/.config/iottb/iottb.cfg]
--help Show this message and exit.
Commands:
add-device Add a device to a database
init-db
rm-cfg Removes the cfg file from the filesystem.
rm-dbs Removes ALL(!) databases from the filesystem if...
set-key-in-table-to Edit config or metadata files.
show-all Show everything: configuration, databases, and...
show-cfg Show the current configuration context
sniff Sniff packets with tcpdump

View File

@ -0,0 +1,142 @@
# Main Command: `iottb`
Usage: `iottb [OPTIONS] COMMAND [ARGS]...`
Options:
-v, --verbosity Set verbosity [0<=x<=3] \n
-d, --debug Enable debug mode
--dry-run
--cfg-file PATH Path to iottb config file
--help Show this message and exit.
Commands:
add-device Add a device to a database
init-db
rm-cfg Removes the cfg file from the filesystem.
rm-dbs Removes ALL(!) databases from the filesystem if...
set-key-in-table-to Edit config or metadata files.
show-all Show everything: configuration, databases, and...
show-cfg Show the current configuration context
sniff Sniff packets with tcpdump
Command: init-db
Usage: [OPTIONS]
Options:
-d, --dest PATH Location to put (new) iottb database
-n, --name TEXT Name of new database.
--update-default / --no-update-default
If new db should be set as the new default
--help Show this message and exit.
Command: rm-cfg
Usage: [OPTIONS]
Removes the cfg file from the filesystem.
This is mostly a utility during development. Once non-standard database
locations are implemented, deleting this would lead to iottb not being able
to find them anymore.
Options:
--yes Confirm the action without prompting.
--help Show this message and exit.
Command: set-key-in-table-to
Usage: [OPTIONS]
Edit config or metadata files. TODO: Implement
Options:
--file TEXT
--table TEXT
--key TEXT
--value TEXT
--help Show this message and exit.
Command: rm-dbs
Usage: [OPTIONS]
Removes ALL(!) databases from the filesystem if they're empty.
Development utility currently unfit for use.
Options:
--yes Confirm the action without prompting.
--help Show this message and exit.
Command: add-device
Usage: [OPTIONS]
Add a device to a database
Options:
--dev, --device-name TEXT The name of the device to be added. If this
string contains spaces or other special
characters normalization is
performed to derive a canonical name [required]
--db, --database DIRECTORY Database in which to add this device. If not
specified use default from config. [env var:
IOTTB_DB]
--guided Add device interactively [env var:
IOTTB_GUIDED_ADD]
--help Show this message and exit.
Command: show-cfg
Usage: [OPTIONS]
Show the current configuration context
Options:
--cfg-file PATH Path to the config file
-pp Pretty Print
--help Show this message and exit.
Command: sniff
Usage: [OPTIONS] [TCPDUMP-ARGS] [DEVICE]
Sniff packets with tcpdump
Options:
Testbed sources:
--db, --database TEXT Database of device. Only needed if not current
default. [env var: IOTTB_DB]
--app TEXT Companion app being used during capture
Runtime behaviour:
--unsafe Disable checks for otherwise required options.
[env var: IOTTB_UNSAFE]
--guided [env var: IOTTB_GUIDED]
--pre PATH Script to be executed before main commandis
started.
Tcpdump options:
-i, --interface TEXT Network interface to capture on.If not specified
tcpdump tries to find and appropriate one. [env
var: IOTTB_CAPTURE_INTERFACE]
-a, --address TEXT IP or MAC address to filter packets by. [env var:
IOTTB_CAPTURE_ADDRESS]
-I, --monitor-mode Put interface into monitor mode.
--ff TEXT tcpdump filter as string or file path. [env var:
IOTTB_CAPTURE_FILTER]
-#, --print-pacno Print packet number at beginning of line. True by
default.
-e, --print-ll Print link layer headers. True by default.
-c, --count INTEGER Number of packets to capture.
--help Show this message and exit.
Command: show-all
Usage: [OPTIONS]
Show everything: configuration, databases, and device metadata
Options:
--help Show this message and exit.

View File

@ -0,0 +1,16 @@
from pathlib import Path
from iottb import definitions
import logging
from iottb.utils.user_interaction import tb_echo
import click
click.echo = tb_echo # This is very hacky
logging.basicConfig(level=definitions.LOGLEVEL)
log_dir = definitions.LOGDIR
# Ensure logs dir exists before new handlers are registered in main.py
if not log_dir.is_dir():
log_dir.mkdir()
DOCS_FOLDER = Path.cwd() / 'docs'

View File

@ -0,0 +1,199 @@
import json
import sys
import click
from pathlib import Path
import logging
import re
from iottb import definitions
from iottb.models.device_metadata import DeviceMetadata
from iottb.models.iottb_config import IottbConfig
from iottb.definitions import CFG_FILE_PATH, TB_ECHO_STYLES
logger = logging.getLogger(__name__)
def prompt_for_device_details():
device_details = {}
aliases = []
while True:
click.echo("\nEnter the details for the new device:")
click.echo("1. Device Name")
click.echo("2. Description")
click.echo("3. Model")
click.echo("4. Manufacturer")
click.echo("5. Current Firmware Version")
click.echo("6. Device Type")
click.echo("7. Supported Interfaces")
click.echo("8. Companion Applications")
click.echo("9. Add Alias")
click.echo("10. Finish and Save")
choice = click.prompt("Choose an option", type=int)
if choice == 1:
device_details['device_name'] = click.prompt("Enter the device name")
elif choice == 2:
device_details['description'] = click.prompt("Enter the description")
elif choice == 3:
device_details['model'] = click.prompt("Enter the model")
elif choice == 4:
device_details['manufacturer'] = click.prompt("Enter the manufacturer")
elif choice == 5:
device_details['firmware_version'] = click.prompt("Enter the current firmware version")
elif choice == 6:
device_details['device_type'] = click.prompt("Enter the device type")
elif choice == 7:
device_details['supported_interfaces'] = click.prompt("Enter the supported interfaces")
elif choice == 8:
device_details['companion_applications'] = click.prompt("Enter the companion applications")
elif choice == 9:
alias = click.prompt("Enter an alias")
aliases.append(alias)
elif choice == 10:
break
else:
click.echo("Invalid choice. Please try again.")
device_details['aliases'] = aliases
return device_details
def confirm_and_add_device(device_details, db_path):
click.echo("\nDevice metadata:")
for key, value in device_details.items():
click.echo(f"{key.replace('_', ' ').title()}: {value}")
confirm = click.confirm("Do you want to add this device with above metadata?")
if confirm:
device_name = device_details.get('device_name')
if not device_name:
click.echo("Device name is required. Exiting...")
return
device_metadata = DeviceMetadata(**device_details)
device_dir = db_path / device_metadata.canonical_name
if device_dir.exists():
click.echo(f"Device {device_name} already exists in the database.")
click.echo("Exiting...")
return
try:
device_dir.mkdir(parents=True, exist_ok=True)
metadata_path = device_dir / definitions.DEVICE_METADATA_FILE_NAME
device_metadata.save_metadata_to_file(metadata_path)
click.echo(f"Successfully added device {device_name} to database.")
except OSError as e:
click.echo(f"Error trying to create device directory: {e}")
click.echo("Exiting...")
else:
click.echo("Operation cancelled. Exiting...")
def add_device_guided(cfg, db):
logger.info('Adding device interactively')
# logger.debug(f'Parameters: {params}. value: {value}')
databases = cfg.db_path_dict
if not databases:
click.echo('No databases found in config file.')
return
click.echo('Available Databases:')
last = 0
for i, db_name in enumerate(databases.keys(), start=1):
click.echo(f'[{i}] {db_name}')
last = i if last < i else last
db_choice = click.prompt('Select the database to add the new device to (1 - {last}, 0 to quit)',
type=int, default=1)
if 1 <= db_choice <= last:
selected_db = list(databases.keys())[db_choice - 1]
click.confirm(f'Use {selected_db}?', abort=True)
db_path = Path(databases[selected_db]) / selected_db
logger.debug(f'DB Path {str(db_path)}')
device_details = prompt_for_device_details()
confirm_and_add_device(device_details, db_path)
elif db_choice == 0:
click.echo(f'Quitting...')
else:
click.echo(f'{db_choice} is not a valid choice. Please rerun command and select a valid database.')
@click.command('add-device', help='Add a device to a database')
@click.argument('device', type=str, default="")
@click.option('--db', '--database', type=str,
envvar='IOTTB_DB', show_envvar=True, default="",
help='Database in which to add this device. If not specified use default from config.')
@click.option('--guided', is_flag=True,
help='Add device interactively')
def add_device(device, db, guided):
"""Add a new device to a database
Device name must be supplied unless in an interactive setup.
Database is taken from config by default.
If this device name contains spaces or other special characters normalization is performed to derive a canonical name.
"""
logger.info('add-device invoked')
# Step 1: Load Config
# Dependency: Config file must exist
config = IottbConfig(Path(CFG_FILE_PATH))
logger.debug(f'Config loaded: {config}')
# If guided flag set, continue with guided add and leave
if guided:
click.echo('Guided option set. Continuing with guided add.')
add_device_guided(config, device, db)
logger.info('Finished guided device add.')
return
# Step 2: Load database
# dependency: Database folder must exist
if db != "":
database = db
path = config.db_path_dict[database]
logger.debug(f'Resolved (path, db) {path}, {database}')
else:
path = config.default_db_location
database = config.default_database
logger.debug(f'Default (path, db) {path}, {database}')
click.secho(f'Using database {database}')
full_db_path = Path(path) / database
if not full_db_path.is_dir():
logger.warning(f'No database at {database}')
click.echo(f'No database found at {full_db_path}', lvl='w')
click.echo(
f'You need to initialize the testbed before before you add devices!')
click.echo(
f'To initialize the testbed in the default location run "iottb init-db"')
click.echo('Exiting...')
sys.exit()
# Ensure a device name was passed as argument
if device == "":
click.echo("Device name cannot be an empty string. Exiting...", lvl='w')
return
# Step 3: Check if device already exists in database
# dependency: DeviceMetadata object
device_metadata = DeviceMetadata(device_name=device)
device_dir = full_db_path / device_metadata.canonical_name
# Check if device is already registered
if device_dir.exists():
logger.warning(f'Device directory {device_dir} already exists.')
click.echo(f'Device {device} already exists in the database.')
click.echo('Exiting...')
sys.exit()
try:
device_dir.mkdir()
except OSError as e:
logger.error(f'Error trying to create device {e}')
click.echo('Exiting...')
sys.exit()
# Step 4: Save metadata into device_dir
metadata_path = device_dir / definitions.DEVICE_METADATA_FILE_NAME
with metadata_path.open('w') as metadata_file:
json.dump(device_metadata.__dict__, metadata_file, indent=4)
click.echo(f'Successfully added device {device} to database')
logger.debug(f'Added device {device} to database {database}. Full path of metadata {metadata_path}')
logger.info(f'Metadata for {device} {device_metadata.print_attributes()}')

View File

@ -0,0 +1,130 @@
from pathlib import Path
import logging
import click
from iottb import tb_echo
from iottb.definitions import DB_NAME, CFG_FILE_PATH
from iottb.models.iottb_config import IottbConfig
logger = logging.getLogger(__name__)
@click.group('util')
def tb():
pass
@click.command()
@click.option('--file', default=DB_NAME)
@click.option('--table', type=str, default='DefaultDatabase')
@click.option('--key')
@click.option('--value')
@click.pass_context
def set_key_in_table_to(ctx, file, table, key, value):
"""Edit config or metadata files. TODO: Implement"""
click.echo(f'set_key_in_table_to invoked')
logger.warning("Unimplemented subcommand invoked.")
@click.command()
@click.confirmation_option(prompt="Are you certain that you want to delete the cfg file?")
def rm_cfg():
""" Removes the cfg file from the filesystem.
This is mostly a utility during development. Once non-standard database locations are implemented,
deleting this would lead to iottb not being able to find them anymore.
"""
Path(CFG_FILE_PATH).unlink()
click.echo(f'Iottb configuration removed at {CFG_FILE_PATH}')
@click.command()
@click.confirmation_option(prompt="Are you certain that you want to delete the databases file?")
def rm_dbs(dbs):
""" Removes ALL(!) databases from the filesystem if they're empty.
Development utility currently unfit for use.
"""
config = IottbConfig()
paths = config.get_know_database_paths()
logger.debug(f'Known db paths: {str(paths)}')
for dbs in paths:
try:
Path(dbs).rmdir()
click.echo(f'{dbs} deleted')
except Exception as e:
logger.debug(f'Failed unlinking db {dbs} with error {e}')
logger.info(f'All databases deleted')
@click.command('show-cfg', help='Show the current configuration context')
@click.option('--cfg-file', type=click.Path(), default=CFG_FILE_PATH, help='Path to the config file')
@click.option('-pp', is_flag=True, default=False, help='Pretty Print')
@click.pass_context
def show_cfg(ctx, cfg_file, pp):
logger.debug(f'Pretty print option set to {pp}')
if pp:
try:
config = IottbConfig(Path(cfg_file))
click.echo("Configuration Context:")
click.echo(f"Default Database: {config.default_database}")
click.echo(f"Default Database Path: {config.default_db_location}")
click.echo("Database Locations:")
for db_name, db_path in config.db_path_dict.items():
click.echo(f" - {db_name}: {db_path}")
except Exception as e:
logger.error(f"Error loading configuration: {e}")
click.echo(f"Failed to load configuration from {cfg_file}")
else:
path = Path(cfg_file)
if path.is_file():
with path.open('r') as file:
content = file.read()
click.echo(content)
else:
click.echo(f"Configuration file not found at {cfg_file}")
@click.command('show-all', help='Show everything: configuration, databases, and device metadata')
@click.pass_context
def show_everything(ctx):
"""Show everything that can be recursively found based on config except file contents."""
config = ctx.obj['CONFIG']
click.echo("Configuration Context:")
click.echo(f"Default Database: {config.default_database}")
click.echo(f"Default Database Path: {config.default_db_location}")
click.echo("Database Locations:")
everything_dict = {}
for db_name, db_path in config.db_path_dict.items():
click.echo(f" - {db_name}: {db_path}")
for db_name, db_path in config.db_path_dict.items():
full_db_path = Path(db_path) / db_name
if full_db_path.is_dir():
click.echo(f"\nContents of {full_db_path}:")
flag = True
for item in full_db_path.iterdir():
flag = False
if item.is_file():
click.echo(f" - {item.name}")
try:
with item.open('r', encoding='utf-8') as file:
content = file.read()
click.echo(f" Content:\n{content}")
except UnicodeDecodeError:
click.echo(" Content is not readable as text")
elif item.is_dir():
click.echo(f" - {item.name}/")
for subitem in item.iterdir():
if subitem.is_file():
click.echo(f" - {subitem.name}")
elif subitem.is_dir():
click.echo(f" - {subitem.name}/")
if flag:
tb_echo(f'\t EMPTY')
else:
click.echo(f"{full_db_path} is not a directory")

View File

@ -0,0 +1,347 @@
import json
import logging
import os
import re
import subprocess
import sys
import uuid
from datetime import datetime
from pathlib import Path
from time import time
import click
from click_option_group import optgroup
from iottb.utils.string_processing import make_canonical_name
# Setup logger
logger = logging.getLogger('iottb.sniff')
def is_ip_address(address):
ip_pattern = re.compile(r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$")
return ip_pattern.match(address) is not None
def is_mac_address(address):
mac_pattern = re.compile(r"^([0-9A-Fa-f]{2}:){5}[0-9A-Fa-f]{2}$")
return mac_pattern.match(address) is not None
def load_config(cfg_file):
"""Loads configuration from the given file path."""
with open(cfg_file, 'r') as config_file:
return json.load(config_file)
def validate_sniff(ctx, param, value):
logger.info('Validating sniff...')
if ctx.params.get('unsafe') and not value:
return None
if not ctx.params.get('unsafe') and not value:
raise click.BadParameter('Address is required unless --unsafe is set.')
if not is_ip_address(value) and not is_mac_address(value):
raise click.BadParameter('Address must be a valid IP address or MAC address.')
return value
def run_pre(pre):
subprocess.run(pre, shell=True)
logger.debug(f'finnished {pre}')
def run_post(post):
subprocess.run(post, shell=True)
logger.debug(f'finnished {post}')
@click.command('sniff', help='Sniff packets with tcpdump')
@optgroup.group('Testbed sources')
@optgroup.option('--db', '--database', type=str, envvar='IOTTB_DB', show_envvar=True,
help='Database of device. Only needed if not current default.')
@optgroup.option('--app', type=str, help='Companion app being used during capture', required=False)
@optgroup.group('Runtime behaviour')
@optgroup.option('--unsafe', is_flag=True, default=False, envvar='IOTTB_UNSAFE', is_eager=True,
help='Disable checks for otherwise required options.\n', show_envvar=True)
@optgroup.option('--guided', is_flag=True, default=False, envvar='IOTTB_GUIDED', show_envvar=True)
@optgroup.option('--pre', help='Script to be executed before main command is started.')
@optgroup.option('--post', help='Script to be executed upon completion of main command.')
@optgroup.group('Tcpdump options')
@optgroup.option('-i', '--interface',
help='Network interface to capture on.' +
'If not specified tcpdump tries to find and appropriate one.\n', show_envvar=True,
envvar='IOTTB_CAPTURE_INTERFACE')
@optgroup.option('-a', '--address', callback=validate_sniff,
help='IP or MAC address to filter packets by.\n', show_envvar=True,
envvar='IOTTB_CAPTURE_ADDRESS')
@optgroup.option('-I', '--monitor-mode', help='Put interface into monitor mode.\n', is_flag=True)
@optgroup.option('--ff', type=str, envvar='IOTTB_CAPTURE_FILTER', show_envvar=True,
help='tcpdump filter as string or file path.')
@optgroup.option('-#', '--print-pacno', is_flag=True, default=True,
help='Print packet number at beginning of line. True by default.\n')
@optgroup.option('-e', '--print-ll', is_flag=True, default=False,
help='Print link layer headers. True by default.')
@optgroup.option('-c', '--count', type=int, help='Number of packets to capture.', default=1000)
# @optgroup.option('--mins', type=int, help='Time in minutes to capture.', default=1)
@click.argument('tcpdump-args', nargs=-1, required=False, metavar='[TCPDUMP-ARGS]')
@click.argument('device', required=False)
@click.pass_context
def sniff(ctx, device, interface, print_pacno, ff, count, monitor_mode, print_ll, address, db, unsafe, guided,
app, tcpdump_args, pre, post, **params):
""" Sniff packets from a device """
logger.info('sniff command invoked')
# Step 0: run pre script:
if pre:
click.echo(f'Running pre command {pre}')
run_pre(pre)
# Step1: Load Config
config = ctx.obj['CONFIG']
logger.debug(f'Config loaded: {config}')
# Step2: determine relevant database
database = db if db else config.default_database
path = config.db_path_dict[database]
full_db_path = Path(path) / database
logger.debug(f'Full db path is {str(full_db_path)}')
# 2.2: Check if it exists
if not full_db_path.is_dir():
logger.error('DB unexpectedly missing')
click.echo('DB unexpectedly missing')
return
canonical_name, aliases = make_canonical_name(device)
click.echo(f'Using canonical device name {canonical_name}')
device_path = full_db_path / canonical_name
# Step 3: now the device
if not device_path.exists():
if not unsafe:
logger.error(f'Device path {device_path} does not exist')
click.echo(f'Device path {device_path} does not exist')
return
else:
device_path.mkdir(parents=True, exist_ok=True)
logger.info(f'Device path {device_path} created')
click.echo(f'Found device at path {device_path}')
# Step 4: Generate filter
generic_filter = None
cap_filter = None
if ff:
logger.debug(f'ff: {ff}')
if Path(ff).is_file():
logger.info('Given filter option is a file')
with open(ff, 'r') as f:
cap_filter = f.read().strip()
else:
logger.info('Given filter option is an expression')
cap_filter = ff
else:
if address is not None:
if is_ip_address(address):
generic_filter = 'net'
cap_filter = f'{generic_filter} {address}'
elif is_mac_address(address):
generic_filter = 'ether net'
cap_filter = f'{generic_filter} {address}'
elif not unsafe:
logger.error('Invalid address format')
click.echo('Invalid address format')
return
logger.info(f'Generic filter {generic_filter}')
click.echo(f'Using filter {cap_filter}')
# Step 5: prep capture directory
capture_date = datetime.now().strftime('%Y-%m-%d')
capture_base_dir = device_path / f'sniffs/{capture_date}'
capture_base_dir.mkdir(parents=True, exist_ok=True)
logger.debug(f'Previous captures {capture_base_dir.glob('cap*')}')
capture_count = sum(1 for _ in capture_base_dir.glob('cap*'))
logger.debug(f'Capture count is {capture_count}')
capture_dir = f'cap{capture_count:04d}-{datetime.now().strftime('%H%M')}'
logger.debug(f'capture_dir: {capture_dir}')
# Full path
capture_dir_full_path = capture_base_dir / capture_dir
capture_dir_full_path.mkdir(parents=True, exist_ok=True)
click.echo(f'Files will be placed in {str(capture_dir_full_path)}')
logger.debug(f'successfully created capture directory')
# Step 6: Prepare capture file names
# Generate UUID for filenames
capture_uuid = str(uuid.uuid4())
click.echo(f'Capture has id {capture_uuid}')
pcap_file = f"{canonical_name}_{capture_uuid}.pcap"
pcap_file_full_path = capture_dir_full_path / pcap_file
stdout_log_file = f'stdout_{capture_uuid}.log'
stderr_log_file = f'stderr_{capture_uuid}.log'
logger.debug(f'Full pcap file path is {pcap_file_full_path}')
logger.info(f'pcap file name is {pcap_file}')
logger.info(f'stdout log file is {stdout_log_file}')
logger.info(f'stderr log file is {stderr_log_file}')
# Step 7: Build tcpdump command
logger.debug(f'pgid {os.getpgrp()}')
logger.debug(f'ppid {os.getppid()}')
logger.debug(f'(real, effective, saved) user id: {os.getresuid()}')
logger.debug(f'(real, effective, saved) group id: {os.getresgid()}')
cmd = ['sudo', 'tcpdump']
# 7.1 process flags
flags = []
if print_pacno:
flags.append('-#')
if print_ll:
flags.append('-e')
if monitor_mode:
flags.append('-I')
flags.append('-n') # TODO: Integrate, in case name resolution is wanted!
cmd.extend(flags)
flags_string = " ".join(flags)
logger.debug(f'Flags: {flags_string}')
# debug interlude
verbosity = ctx.obj['VERBOSITY']
if verbosity > 0:
verbosity_flag = '-'
for i in range(0, verbosity):
verbosity_flag = verbosity_flag + 'v'
logger.debug(f'verbosity string to pass to tcpdump: {verbosity_flag}')
cmd.append(verbosity_flag)
# 7.2 generic (i.e. reusable) kw args
generic_kw_args = []
if count:
generic_kw_args.extend(['-c', str(count)])
# if mins:
# generic_kw_args.extend(['-G', str(mins * 60)]) TODO: this currently loads to errors with sudo
cmd.extend(generic_kw_args)
generic_kw_args_string = " ".join(generic_kw_args)
logger.debug(f'KW args: {generic_kw_args_string}')
# 7.3 special kw args (not a priori reusable)
non_generic_kw_args = []
if interface:
non_generic_kw_args.extend(['-i', interface])
non_generic_kw_args.extend(['-w', str(pcap_file_full_path)])
cmd.extend(non_generic_kw_args)
non_generic_kw_args_string = " ".join(non_generic_kw_args)
logger.debug(f'Non transferable (special) kw args: {non_generic_kw_args_string}')
# 7.4 add filter expression
if cap_filter:
logger.debug(f'cap_filter (not generic): {cap_filter}')
cmd.append(cap_filter)
full_cmd_string = " ".join(cmd)
logger.info(f'tcpdump command: {"".join(full_cmd_string)}')
click.echo('Capture setup complete!')
# Step 8: Execute tcpdump command
start_time = datetime.now().strftime("%H:%M:%S")
start = time()
try:
if guided:
click.confirm(f'Execute following command: {full_cmd_string}')
stdout_log_file_abs_path = capture_dir_full_path / stdout_log_file
stderr_log_file_abs_path = capture_dir_full_path / stderr_log_file
stdout_log_file_abs_path.touch(mode=0o777)
stderr_log_file_abs_path.touch(mode=0o777)
with open(stdout_log_file_abs_path, 'w') as out, open(stderr_log_file_abs_path, 'w') as err:
logger.debug(f'\nstdout: {out}.\nstderr: {err}.\n')
tcp_complete = subprocess.run(cmd, check=True, capture_output=True, text=True)
out.write(tcp_complete.stdout)
err.write(tcp_complete.stderr)
# click.echo(f'Mock sniff execution')
click.echo(f"Capture complete. Saved to {pcap_file}")
except subprocess.CalledProcessError as e:
logger.error(f'Failed to capture packets: {e}')
click.echo(f'Failed to capture packets: {e}')
click.echo(f'Check {stderr_log_file} for more info.')
if ctx.obj['DEBUG']:
msg = [f'STDERR log {stderr_log_file} contents:\n']
with open(capture_dir_full_path / stderr_log_file) as log:
for line in log:
msg.append(line)
click.echo("\t".join(msg), lvl='e')
# print('DEBUG ACTIVE')
if guided:
click.prompt('Create metadata anyway?')
else:
click.echo('Aborting capture...')
sys.exit()
end_time = datetime.now().strftime("%H:%M:%S")
end = time()
delta = end - start
click.echo(f'tcpdump took {delta:.2f} seconds.')
# Step 9: Register metadata
metadata = {
'device': canonical_name,
'device_id': device,
'capture_id': capture_uuid,
'capture_date_iso': datetime.now().isoformat(),
'invoked_command': " ".join(map(str, cmd)),
'capture_duration': delta,
'generic_parameters': {
'flags': flags_string,
'kwargs': generic_kw_args_string,
'filter': generic_filter
},
'non_generic_parameters': {
'kwargs': non_generic_kw_args_string,
'filter': cap_filter
},
'features': {
'interface': interface,
'address': address
},
'resources': {
'pcap_file': str(pcap_file),
'stdout_log': str(stdout_log_file),
'stderr_log': str(stderr_log_file),
'pre': str(pre),
'post': str(post)
},
'environment': {
'capture_dir': capture_dir,
'database': database,
'capture_base_dir': str(capture_base_dir),
'capture_dir_abs_path': str(capture_dir_full_path)
}
}
click.echo('Ensuring correct ownership of created files.')
username = os.getlogin()
gid = os.getgid()
# Else there are issues when running with sudo:
try:
subprocess.run(f'sudo chown -R {username}:{username} {device_path}', shell=True)
except OSError as e:
click.echo(f'Some error {e}')
click.echo(f'Saving metadata.')
metadata_abs_path = capture_dir_full_path / 'capture_metadata.json'
with open(metadata_abs_path, 'w') as f:
json.dump(metadata, f, indent=4)
click.echo(f'END SNIFF SUBCOMMAND')
if post:
click.echo(f'Running post script {post}')
run_post(post)

View File

@ -0,0 +1,70 @@
import click
from pathlib import Path
import logging
from logging.handlers import RotatingFileHandler
import sys
from iottb.models.iottb_config import IottbConfig
from iottb.definitions import DB_NAME, CFG_FILE_PATH
logger = logging.getLogger(__name__)
@click.command()
@click.option('-d', '--dest', type=click.Path(exists=True, file_okay=False, dir_okay=True),
help='Location to put (new) iottb database')
@click.option('-n', '--name', default=DB_NAME, type=str,
help='Name of new database.')
@click.option('--update-default/--no-update-default', default=True,
help='If new db should be set as the new default')
@click.pass_context
def init_db(ctx, dest, name, update_default):
logger.info('init-db invoked')
config = ctx.obj['CONFIG']
logger.debug(f'str(config)')
# Use the default path from config if dest is not provided
known_dbs = config.get_known_databases()
logger.debug(f'Known databases: {known_dbs}')
if name in known_dbs:
dest = config.get_database_location(name)
if Path(dest).joinpath(name).is_dir():
click.echo(f'A database {name} already exists.')
logger.debug(f'DB {name} exists in {dest}')
click.echo(f'Exiting...')
sys.exit()
logger.debug(f'DB name {name} registered but does not exist.')
if not dest:
logger.info('No dest set, choosing default destination.')
dest = Path(config.default_db_location)
db_path = Path(dest).joinpath(name)
logger.debug(f'Full path for db {str(db_path)}')
# Create the directory if it doesn't exist
db_path.mkdir(parents=True, exist_ok=True)
logger.info(f"mkdir {db_path} successful")
click.echo(f'Created {db_path}')
# Update configuration
config.set_database_location(name, str(dest))
if update_default:
config.set_default_database(name, str(dest))
config.save_config()
logger.info(f"Updated configuration with database {name} at {db_path}")
# @click.group('config')
# @click.pass_context
# def cfg(ctx):
# pass
#
# @click.command('set', help='Set the location of a database.')
# @click.argument('database', help='Name of database')
# @click.argument('location', help='Where the database is located (i.e. its parent directory)')
# @click.pass_context
# def set(ctx, key, value):
# click.echo(f'Setting {key} to {value} in config')
# config = ctx.obj['CONFIG']
# logger.warning('No checks performed!')
# config.set_database_location(key, value)
# config.save_config()

View File

@ -0,0 +1,48 @@
import logging
from pathlib import Path
import click
APP_NAME = 'iottb'
DB_NAME = 'iottb.db'
CFG_FILE_PATH = str(Path(click.get_app_dir(APP_NAME)).joinpath('iottb.cfg'))
CONSOLE_LOG_FORMATS = {
0: '%(levelname)s - %(message)s',
1: '%(levelname)s - %(module)s - %(message)s',
2: '%(levelname)s - %(module)s - %(funcName)s - %(lineno)d - %(message)s'
}
LOGFILE_LOG_FORMAT = {
0: '%(levelname)s - %(asctime)s - %(module)s - %(message)s',
1: '%(levelname)s - %(asctime)s - %(module)s - %(funcName)s - %(message)s',
2: '%(levelname)s - %(asctime)s - %(module)s - %(funcName)s - %(lineno)d - %(message)s'
}
MAX_VERBOSITY = len(CONSOLE_LOG_FORMATS) - 1
assert len(LOGFILE_LOG_FORMAT) == len(CONSOLE_LOG_FORMATS), 'Log formats must be same size'
LOGLEVEL = logging.DEBUG
LOGDIR = Path.cwd() / 'logs'
# Characters to just replace
REPLACEMENT_SET_CANONICAL_DEVICE_NAMES = {' ', '_', ',', '!', '@', '#', '$', '%', '^', '&', '*', '(', ')', '+', '=',
'{', '}', '[', ']',
'|',
'\\', ':', ';', '"', "'", '<', '>', '?', '/', '`', '~'}
# Characters to possibly error on
ERROR_SET_CANONICAL_DEVICE_NAMES = {',', '!', '@', '#', '$', '%', '^', '&', '*', '(', ')', '+', '=', '{', '}', '[', ']',
'|',
'\\', ':', ';', '"', "'", '<', '>', '?', '/', '`', '~'}
DEVICE_METADATA_FILE_NAME = 'device_metadata.json'
TB_ECHO_STYLES = {
'w': {'fg': 'yellow', 'bold': True},
'i': {'fg': 'blue', 'italic': True},
's': {'fg': 'green', 'bold': True},
'e': {'fg': 'red', 'bold': True},
'header': {'fg': 'bright_cyan', 'bold': True, 'italic': True}
}
NAME_OF_CAPTURE_DIR = 'sniffs'

View File

@ -0,0 +1,77 @@
import sys
import click
from pathlib import Path
import logging
from iottb.commands.sniff import sniff
from iottb.commands.developer import set_key_in_table_to, rm_cfg, rm_dbs, show_cfg, show_everything
##################################################
# Import package modules
#################################################
from iottb.utils.logger_config import setup_logging
from iottb import definitions
from iottb.models.iottb_config import IottbConfig
from iottb.commands.testbed import init_db
from iottb.commands.add_device import add_device
############################################################################
# Module shortcuts for global definitions
###########################################################################
APP_NAME = definitions.APP_NAME
DB_NAME = definitions.DB_NAME
CFG_FILE_PATH = definitions.CFG_FILE_PATH
# These are (possibly) redundant when defined in definitions.py
# keeping them here until refactored and tested
MAX_VERBOSITY = definitions.MAX_VERBOSITY
# Logger stuff
loglevel = definitions.LOGLEVEL
logger = logging.getLogger(__name__)
@click.group(context_settings=dict(auto_envvar_prefix='IOTTB', show_default=True))
@click.option('-v', '--verbosity', count=True, type=click.IntRange(0, 3), default=0, is_eager=True,
help='Set verbosity')
@click.option('-d', '--debug', is_flag=True, default=False, is_eager=True,
help='Enable debug mode')
@click.option('--dry-run', is_flag=False, default=True, is_eager=True, help='NOT USED!')
@click.option('--cfg-file', type=click.Path(),
default=Path(click.get_app_dir(APP_NAME)).joinpath('iottb.cfg'),
envvar='IOTTB_CONF_HOME', help='Path to iottb config file')
@click.pass_context
def cli(ctx, verbosity, debug, dry_run, cfg_file):
# Setup logging based on the loaded configuration and other options
setup_logging(verbosity, debug)
ctx.ensure_object(dict) # Make sure context is ready for use
logger.info("Starting execution.")
ctx.obj['CONFIG'] = IottbConfig(cfg_file) # Load configuration directly
ctx.meta['FULL_PATH_CONFIG_FILE'] = str(cfg_file)
ctx.meta['DRY_RUN'] = dry_run
logger.debug(f'Verbosity: {verbosity}')
ctx.obj['VERBOSITY'] = verbosity
logger.debug(f'Debug: {debug}')
ctx.obj['DEBUG'] = debug
##################################################################################
# Add all subcommands to group here
#################################################################################
# TODO: Is there a way to do this without pylint freaking out?
# noinspection PyTypeChecker
cli.add_command(init_db)
cli.add_command(rm_cfg)
cli.add_command(set_key_in_table_to)
cli.add_command(rm_dbs)
# noinspection PyTypeChecker
cli.add_command(add_device)
cli.add_command(show_cfg)
cli.add_command(sniff)
cli.add_command(show_everything)
if __name__ == '__main__':
cli()
for log in Path.cwd().iterdir():
log.chmod(0o777)

View File

@ -0,0 +1,6 @@
class Database:
def __init__(self, name, path):
self.name = name
self.path = path
self.device_list = [] # List of the canonical names of devices registered in this database

View File

@ -0,0 +1,50 @@
import json
import logging
import uuid
from datetime import datetime
import logging
import click
from iottb.utils.string_processing import make_canonical_name
logger = logging.getLogger(__name__)
class DeviceMetadata:
def __init__(self, device_name, description="", model="", manufacturer="", firmware_version="", device_type="",
supported_interfaces="", companion_applications="", save_to_file=None, aliases=None):
self.device_id = str(uuid.uuid4())
self.device_name = device_name
cn, default_aliases = make_canonical_name(device_name)
logger.debug(f'cn, default aliases = {cn}, {str(default_aliases)}')
self.aliases = default_aliases if aliases is None else default_aliases + aliases
self.canonical_name = cn
self.date_added = datetime.now().isoformat()
self.description = description
self.model = model
self.manufacturer = manufacturer
self.current_firmware_version = firmware_version
self.device_type = device_type
self.supported_interfaces = supported_interfaces
self.companion_applications = companion_applications
self.last_metadata_update = datetime.now().isoformat()
if save_to_file is not None:
click.echo('TODO: Implement saving config to file after creation!')
def add_alias(self, alias: str = ""):
if alias == "":
return
self.aliases.append(alias)
def get_canonical_name(self):
return self.canonical_name
def print_attributes(self):
print(f'Printing attribute value pairs in {__name__}')
for attr, value in self.__dict__.items():
print(f'{attr}: {value}')
def save_metadata_to_file(self, metadata_path):
with open(metadata_path, 'w') as metadata_file:
json.dump(self.__dict__, metadata_file, indent=4)
click.echo(f'Metadata saved to {metadata_path}')

View File

@ -0,0 +1,124 @@
import json
from pathlib import Path
from iottb import definitions
import logging
logger = logging.getLogger(__name__)
DB_NAME = definitions.DB_NAME
class IottbConfig:
""" Class to handle testbed configuration.
TODO: Add instead of overwrite Database locations when initializing if a location with valid db
exists.
"""
@staticmethod
def warn():
logger.warning(f'DatabaseLocations are DatabaseLocationMap in the class {__name__}')
def __init__(self, cfg_file=definitions.CFG_FILE_PATH):
logger.info('Initializing Config object')
IottbConfig.warn()
self.cfg_file = Path(cfg_file)
self.default_database = None
self.default_db_location = None
self.db_path_dict = dict()
self.load_config()
def create_default_config(self):
"""Create default iottb config file."""
logger.info(f'Creating default config file at {self.cfg_file}')
self.default_database = DB_NAME
self.default_db_location = str(Path.home())
self.db_path_dict = {
DB_NAME: self.default_db_location
}
defaults = {
'DefaultDatabase': self.default_database,
'DefaultDatabasePath': self.default_db_location,
'DatabaseLocations': self.db_path_dict
}
try:
self.cfg_file.parent.mkdir(parents=True, exist_ok=True)
with self.cfg_file.open('w') as config_file:
json.dump(defaults, config_file, indent=4)
except IOError as e:
logger.error(f"Failed to create default configuration file at {self.cfg_file}: {e}")
raise RuntimeError(f"Failed to create configuration file: {e}") from e
def load_config(self):
"""Loads or creates default configuration from given file path."""
logger.info('Loading configuration file')
if not self.cfg_file.is_file():
logger.info('Config file does not exist.')
self.create_default_config()
else:
logger.info('Config file exists, opening.')
with self.cfg_file.open('r') as config_file:
data = json.load(config_file)
self.default_database = data.get('DefaultDatabase')
self.default_db_location = data.get('DefaultDatabasePath')
self.db_path_dict = data.get('DatabaseLocations', {})
def save_config(self):
"""Save the current configuration to the config file."""
data = {
'DefaultDatabase': self.default_database,
'DefaultDatabasePath': self.default_db_location,
'DatabaseLocations': self.db_path_dict
}
try:
with self.cfg_file.open('w') as config_file:
json.dump(data, config_file, indent=4)
except IOError as e:
logger.error(f"Failed to save configuration file at {self.cfg_file}: {e}")
raise RuntimeError(f"Failed to save configuration file: {e}") from e
def set_default_database(self, name, path):
"""Set the default database and its path."""
self.default_database = name
self.default_db_location = path
self.db_path_dict[name] = path
def get_default_database_location(self):
return self.default_db_location
def get_default_database(self):
return self.default_database
def get_database_location(self, name):
"""Get the location of a specific database."""
return self.db_path_dict.get(name)
def set_database_location(self, name, path):
"""Set the location for a database."""
logger.debug(f'Type of "path" parameter {type(path)}')
logger.debug(f'String value of "path" parameter {str(path)}')
logger.debug(f'Type of "name" parameter {type(name)}')
logger.debug(f'String value of "name" parameter {str(name)}')
path = Path(path)
name = Path(name)
logger.debug(f'path:name = {path}:{name}')
if path.name == name:
path = path.parent
self.db_path_dict[str(name)] = str(path)
def get_known_databases(self):
"""Get the set of known databases"""
logger.info(f'Getting known databases.')
return self.db_path_dict.keys()
def get_know_database_paths(self):
"""Get the paths of all known databases"""
logger.info(f'Getting known database paths.')
return self.db_path_dict.values()
def get_full_default_path(self):
return Path(self.default_db_location) / self.default_database

View File

@ -0,0 +1,39 @@
import json
import logging
import uuid
from datetime import datetime
from pathlib import Path
logger = logging.getLogger('iottb.sniff') # Log with sniff subcommand
class CaptureMetadata:
def __init__(self, device_id, capture_dir, interface, address, capture_file, tcpdump_command, tcpdump_stdout, tcpdump_stderr, packet_filter, alias):
self.base_data = {
'device_id': device_id,
'capture_id': str(uuid.uuid4()),
'capture_date': datetime.now().isoformat(),
'capture_dir': str(capture_dir),
'capture_file': capture_file,
'start_time': "",
'stop_time': "",
'alias': alias
}
self.features = {
'interface': interface,
'device_ip_address': address if address else "No IP Address set",
'tcpdump_stdout': str(tcpdump_stdout),
'tcpdump_stderr': str(tcpdump_stderr),
'packet_filter': packet_filter
}
self.command = tcpdump_command
def save_to_file(self):
metadata = {
'base_data': self.base_data,
'features': self.features,
'command': self.command
}
metadata_file_path = Path(self.base_data['capture_dir']) / 'metadata.json'
with open(metadata_file_path, 'w') as f:
json.dump(metadata, f, indent=4)
logger.info(f'Metadata saved to {metadata_file_path}')

View File

@ -0,0 +1,74 @@
from pathlib import Path
import click
from io import StringIO
import sys
from iottb import DOCS_FOLDER
# Import your CLI app here
from iottb.main import cli
"""Script to generate the help text and write to file.
Definitely needs better formatting.
Script is also not very flexible.
"""
def get_help_text(command):
"""Get the help text for a given command."""
help_text = StringIO()
with click.Context(command) as ctx:
# chatgpt says this helps: was right
sys_stdout = sys.stdout
sys.stdout = help_text
try:
click.echo(command.get_help(ctx))
finally:
sys.stdout = sys_stdout
return help_text.getvalue()
def write_help_to_file(cli, filename):
"""Write help messages of all commands and subcommands to a file."""
with open(filename, 'w+') as f:
# main
f.write(f"Main Command: iottb\n")
f.write(get_help_text(cli))
f.write("\n\n")
# go through subcommands
for cmd_name, cmd in cli.commands.items():
f.write(f"Command: {cmd_name}\n")
f.write(get_help_text(cmd))
f.write("\n\n")
# subcommands of subcommands
if isinstance(cmd, click.Group):
for sub_cmd_name, sub_cmd in cmd.commands.items():
f.write(f"Subcommand: {cmd_name} {sub_cmd_name}\n")
f.write(get_help_text(sub_cmd))
f.write("\n\n")
def manual():
comands = [
'init-db',
'add-device',
'sniff'
]
dev_commands = [
'show-all',
'rm-dbs',
'show-cfg',
'show-all'
]
if __name__ == "__main__":
from iottb import DOCS_FOLDER
print('Must be in project root for this to work properly!')
print(f'CWD is {str(Path.cwd())}')
DOCS_FOLDER.mkdir(exist_ok=True)
write_help_to_file(cli, str(DOCS_FOLDER / "help_messages.md"))
print(f'Wrote help_messages.md to {str(DOCS_FOLDER / "help_messages.md")}')

View File

@ -0,0 +1,4 @@
#/bin/sh
echo 'Running iottb as sudo'
sudo $(which python) iottb $@
echo 'Finished executing iottb with sudo'

View File

@ -0,0 +1,41 @@
import logging
import sys
from logging.handlers import RotatingFileHandler
from iottb import definitions
from iottb.definitions import MAX_VERBOSITY, CONSOLE_LOG_FORMATS, APP_NAME, LOGFILE_LOG_FORMAT
loglevel = definitions.LOGLEVEL
def setup_logging(verbosity, debug=loglevel):
""" Setup root logger for iottb """
log_level = loglevel
handlers = []
date_format = '%Y-%m-%d %H:%M:%S'
if verbosity > 0:
log_level = logging.WARNING
if verbosity > MAX_VERBOSITY:
verbosity = MAX_VERBOSITY
log_level = logging.INFO
assert verbosity <= MAX_VERBOSITY, f'Verbosity must be <= {MAX_VERBOSITY}'
console_handler = logging.StreamHandler(sys.stdout)
print(str(sys.stdout))
console_handler.setFormatter(logging.Formatter(CONSOLE_LOG_FORMATS[verbosity], datefmt=date_format))
console_handler.setLevel(logging.DEBUG) # can keep at debug since it depends on global level?
handlers.append(console_handler)
if debug:
log_level = logging.DEBUG
# Logfile logs INFO+, no debugs though
file_handler = RotatingFileHandler(f'{str(definitions.LOGDIR / APP_NAME)}.log', maxBytes=10240, backupCount=5)
file_handler.setFormatter(logging.Formatter(LOGFILE_LOG_FORMAT[verbosity], datefmt=date_format))
file_handler.setLevel(logging.INFO)
# finnish root logger setup
handlers.append(file_handler)
# Force this config to be applied to root logger
logging.basicConfig(level=log_level, handlers=handlers, force=True)

View File

@ -0,0 +1,40 @@
import re
from iottb import definitions
import logging
logger = logging.getLogger(__name__)
def normalize_string(s, chars_to_replace=None, replacement=None, allow_unicode=False):
pass
def make_canonical_name(name):
"""
Normalize the device name to a canonical form:
- Replace the first two occurrences of spaces and transform characters with dashes.
- Remove any remaining spaces and non-ASCII characters.
- Convert to lowercase.
"""
aliases = [name]
logger.info(f'Normalizing name {name}')
# We first normalize
chars_to_replace = definitions.REPLACEMENT_SET_CANONICAL_DEVICE_NAMES
pattern = re.compile('|'.join(re.escape(char) for char in chars_to_replace))
norm_name = pattern.sub('-', name)
norm_name = re.sub(r'[^\x00-\x7F]+', '', norm_name) # removes non ascii chars
aliases.append(norm_name)
# Lower case
norm_name = norm_name.lower()
aliases.append(norm_name)
# canonical name is only first two parts of resulting string
parts = norm_name.split('-')
canonical_name = canonical_name = '-'.join(parts[:2])
aliases.append(canonical_name)
aliases = list(set(aliases))
logger.debug(f'Canonical name: {canonical_name}')
logger.debug(f'Aliases: {aliases}')
return canonical_name, aliases

View File

@ -0,0 +1,42 @@
# iottb/utils/user_interaction.py
import click
from iottb.definitions import TB_ECHO_STYLES
import sys
import os
def tb_echo2(msg: str, lvl='i', log=True):
style = TB_ECHO_STYLES.get(lvl, {})
click.secho(f'[IOTTB]', **style)
click.secho(f'[IOTTB] \t {msg}', **style)
last_prefix = None
def tb_echo(msg: str, lvl='i', log=True):
global last_prefix
prefix = f'Testbed [{lvl.upper()}]\n'
if last_prefix != prefix:
click.secho(prefix, nl=False, **TB_ECHO_STYLES['header'])
last_prefix = prefix
click.secho(f' {msg}', **TB_ECHO_STYLES[lvl])
def main():
tb_echo('Info message', 'i')
tb_echo('Warning message', 'w')
tb_echo('Error message', 'e')
tb_echo('Success message', 's')
if __name__ == '__main__':
# arrrgggg hacky
current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.abspath(os.path.join(current_dir, '../../'))
sys.path.insert(0, project_root)
main()

107
code/iottb-project/poetry.lock generated Normal file
View File

@ -0,0 +1,107 @@
# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand.
[[package]]
name = "click"
version = "8.1.7"
description = "Composable command line interface toolkit"
optional = false
python-versions = ">=3.7"
files = [
{file = "click-8.1.7-py3-none-any.whl", hash = "sha256:ae74fb96c20a0277a1d615f1e4d73c8414f5a98db8b799a7931d1582f3390c28"},
{file = "click-8.1.7.tar.gz", hash = "sha256:ca9853ad459e787e2192211578cc907e7594e294c7ccc834310722b41b9ca6de"},
]
[package.dependencies]
colorama = {version = "*", markers = "platform_system == \"Windows\""}
[[package]]
name = "click-option-group"
version = "0.5.6"
description = "Option groups missing in Click"
optional = false
python-versions = ">=3.6,<4"
files = [
{file = "click-option-group-0.5.6.tar.gz", hash = "sha256:97d06703873518cc5038509443742b25069a3c7562d1ea72ff08bfadde1ce777"},
{file = "click_option_group-0.5.6-py3-none-any.whl", hash = "sha256:38a26d963ee3ad93332ddf782f9259c5bdfe405e73408d943ef5e7d0c3767ec7"},
]
[package.dependencies]
Click = ">=7.0,<9"
[package.extras]
docs = ["Pallets-Sphinx-Themes", "m2r2", "sphinx"]
tests = ["pytest"]
tests-cov = ["coverage", "coveralls", "pytest", "pytest-cov"]
[[package]]
name = "colorama"
version = "0.4.6"
description = "Cross-platform colored terminal text."
optional = false
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7"
files = [
{file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"},
{file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"},
]
[[package]]
name = "iniconfig"
version = "2.0.0"
description = "brain-dead simple config-ini parsing"
optional = false
python-versions = ">=3.7"
files = [
{file = "iniconfig-2.0.0-py3-none-any.whl", hash = "sha256:b6a85871a79d2e3b22d2d1b94ac2824226a63c6b741c88f7ae975f18b6778374"},
{file = "iniconfig-2.0.0.tar.gz", hash = "sha256:2d91e135bf72d31a410b17c16da610a82cb55f6b0477d1a902134b24a455b8b3"},
]
[[package]]
name = "packaging"
version = "24.1"
description = "Core utilities for Python packages"
optional = false
python-versions = ">=3.8"
files = [
{file = "packaging-24.1-py3-none-any.whl", hash = "sha256:5b8f2217dbdbd2f7f384c41c628544e6d52f2d0f53c6d0c3ea61aa5d1d7ff124"},
{file = "packaging-24.1.tar.gz", hash = "sha256:026ed72c8ed3fcce5bf8950572258698927fd1dbda10a5e981cdf0ac37f4f002"},
]
[[package]]
name = "pluggy"
version = "1.5.0"
description = "plugin and hook calling mechanisms for python"
optional = false
python-versions = ">=3.8"
files = [
{file = "pluggy-1.5.0-py3-none-any.whl", hash = "sha256:44e1ad92c8ca002de6377e165f3e0f1be63266ab4d554740532335b9d75ea669"},
{file = "pluggy-1.5.0.tar.gz", hash = "sha256:2cffa88e94fdc978c4c574f15f9e59b7f4201d439195c3715ca9e2486f1d0cf1"},
]
[package.extras]
dev = ["pre-commit", "tox"]
testing = ["pytest", "pytest-benchmark"]
[[package]]
name = "pytest"
version = "8.2.2"
description = "pytest: simple powerful testing with Python"
optional = false
python-versions = ">=3.8"
files = [
{file = "pytest-8.2.2-py3-none-any.whl", hash = "sha256:c434598117762e2bd304e526244f67bf66bbd7b5d6cf22138be51ff661980343"},
{file = "pytest-8.2.2.tar.gz", hash = "sha256:de4bb8104e201939ccdc688b27a89a7be2079b22e2bd2b07f806b6ba71117977"},
]
[package.dependencies]
colorama = {version = "*", markers = "sys_platform == \"win32\""}
iniconfig = "*"
packaging = "*"
pluggy = ">=1.5,<2.0"
[package.extras]
dev = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"]
[metadata]
lock-version = "2.0"
python-versions = "^3.12"
content-hash = "05aa11a74b8a6411a4413684f1a4cb0e5bcd271e16b4de9ae5205d52232c91a3"

View File

@ -0,0 +1,23 @@
[tool.poetry]
name = "iottb"
version = "0.1.0"
description = "IoT Testbed"
authors = ["Sebastian Lenzlinger <sebastian.lenzlinger@unibas.ch>"]
readme = "README.md"
license = "LICENSE"
[tool.poetry.dependencies]
python = "^3.12"
click = "^8.1"
# scapy = "^2.5"
click-option-group = "^0.5.6"
[tool.poetry.scripts]
iottb = "iottb.main:cli"
[tool.poetry.group.test.dependencies]
pytest = "^8.2.2"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

View File

@ -0,0 +1,9 @@
click-option-group==0.5.6 ; python_version >= "3.12" and python_version < "4" \
--hash=sha256:38a26d963ee3ad93332ddf782f9259c5bdfe405e73408d943ef5e7d0c3767ec7 \
--hash=sha256:97d06703873518cc5038509443742b25069a3c7562d1ea72ff08bfadde1ce777
click==8.1.7 ; python_version >= "3.12" and python_version < "4.0" \
--hash=sha256:ae74fb96c20a0277a1d615f1e4d73c8414f5a98db8b799a7931d1582f3390c28 \
--hash=sha256:ca9853ad459e787e2192211578cc907e7594e294c7ccc834310722b41b9ca6de
colorama==0.4.6 ; python_version >= "3.12" and python_version < "4.0" and platform_system == "Windows" \
--hash=sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44 \
--hash=sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6

View File

@ -0,0 +1,23 @@
from iottb.utils.string_processing import make_canonical_name
import pytest
class TestMakeCanonicalName:
def test_normalizes_name_with_spaces_to_dashes(self):
name = "Device Name With Spaces"
expected_canonical_name = "device-name"
canonical_name, aliases = make_canonical_name(name)
assert canonical_name == expected_canonical_name
assert "device-name-with-spaces" in aliases
assert "device-name" in aliases
assert "Device Name With Spaces" in aliases
def test_name_with_no_spaces_or_special_characters(self):
name = "DeviceName123"
expected_canonical_name = "devicename123"
canonical_name, aliases = make_canonical_name(name)
assert canonical_name == expected_canonical_name
assert "DeviceName123" in aliases
assert "devicename123" in aliases

View File

@ -1,38 +0,0 @@
#!/usr/bin/env python3
import argparse
from iottb.subcommands.capture import setup_capture_parser
from iottb.subcommands.add_device import setup_init_device_root_parser
######################
# Argparse setup
######################
def setup_argparse():
# create top level parser
root_parser = argparse.ArgumentParser(prog="iottb")
subparsers = root_parser.add_subparsers(title="subcommands", required=True, dest="command")
setup_capture_parser(subparsers)
setup_init_device_root_parser(subparsers)
return root_parser
def main():
parser = setup_argparse()
args = parser.parse_args()
print(args)
if args.command:
try:
args.func(args)
except KeyboardInterrupt:
print("Received keyboard interrupt. Exiting...")
exit(1)
except Exception as e:
print(f"Error: {e}")
# create_capture_directory(args.device_name)
if __name__ == "__main__":
main()

View File

@ -1,26 +0,0 @@
from datetime import datetime
from enum import Flag, unique, global_enum
DEVICE_METADATA_FILE = "device_metadata.json"
CAPTURE_METADATA_FILE = "capture_metadata.json"
TODAY_DATE_STRING = datetime.now().strftime("%d%b%Y").lower() # TODO convert to function in utils or so
CAPTURE_FOLDER_BASENAME = "capture_###"
AFFIRMATIVE_USER_RESPONSE = {"yes", "y", "true", "Y", "Yes", "YES"}
NEGATIVE_USER_RESPONSE = {"no", "n", "N", "No"}
YES_DEFAULT = AFFIRMATIVE_USER_RESPONSE.union({"", " "})
NO_DEFAULT = NEGATIVE_USER_RESPONSE.union({"", " "})
@unique
@global_enum
class ReturnCodes(Flag):
SUCCESS = 0
ABORTED = 1
FAILURE = 2
UNKNOWN = 3
FILE_NOT_FOUND = 4
FILE_ALREADY_EXISTS = 5
INVALID_ARGUMENT = 6
INVALID_ARGUMENT_VALUE = 7

View File

@ -1,28 +0,0 @@
import logging
import sys
from logging.handlers import RotatingFileHandler
def setup_logging():
logger_obj = logging.getLogger('iottbLogger')
logger_obj.setLevel(logging.INFO)
file_handler = RotatingFileHandler('iottb.log')
console_handler = logging.StreamHandler(sys.stdout)
file_handler.setLevel(logging.DEBUG)
console_handler.setLevel(logging.INFO)
file_fmt = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_fmt = logging.Formatter('%(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(file_fmt)
console_handler.setFormatter(console_fmt)
logger_obj.addHandler(file_handler)
logger_obj.addHandler(console_handler)
return logger_obj
logger = setup_logging()

View File

@ -1,157 +0,0 @@
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Optional, Any
from uuid import UUID
from pydantic import BaseModel, Field
from iottb.definitions import ReturnCodes, CAPTURE_METADATA_FILE
from iottb.models.device_metadata_model import DeviceMetadata
class CaptureMetadata(BaseModel):
# Required Fields
device_metadata: DeviceMetadata = Field(exclude=True)
capture_id: uuid.UUID = Field(default_factory=lambda: str(uuid.uuid4()))
capture_dir: Path
capture_file: str
capture_date: str = Field(default_factory=lambda: datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower())
# Statistics
start_time: str
stop_time: str
# tcpdump
packet_count: Optional[int]
pcap_filter: str = ""
tcpdump_command: str = ""
interface: str = ""
# Optional Fields
device_ip_address: Optional[str] = "No IP Address set"
device_mac_address: Optional[str] = None
app: Optional[str] = None
app_version: Optional[str] = None
firmware_version: Optional[str] = None
def __init__(self, device_metadata: DeviceMetadata, capture_dir: Path, /, **data: Any):
super().__init__(**data) # Pycharms orders
self.device_metadata = device_metadata
self.capture_dir = capture_dir
assert capture_dir.is_dir()
# Getters
def get_device_id(self) -> str:
return self.device_id
def get_start_time(self) -> str:
return self.start_time
def get_stop_time(self) -> str:
return self.stop_time
def get_packet_count(self) -> int:
return self.packet_count
def get_pcap_filter(self) -> str:
return self.pcap_filter
def get_device_ip_address(self) -> str:
return self.device_ip_address
def get_device_mac_address(self) -> str:
return self.device_mac_address
def get_app(self) -> str:
return self.app
def get_app_version(self) -> str:
return self.app_version
def get_firmware_version(self) -> str:
return self.firmware_version
def get_capture_id(self) -> UUID:
return self.capture_id
def get_capture_date(self) -> str:
return self.capture_date
def get_capfile_name(self):
return self.capture_file
def get_device_metadata(self) -> DeviceMetadata:
return self.device_metadata
def get_interface(self):
return self.interface
# Setters
def set_capture_dir(self, capture_dir: Path):
self.capture_dir = capture_dir
def set_capture_file(self, capture_file: str):
self.capture_file = capture_file
def set_capture_date(self, capture_date: str):
self.capture_date = capture_date
def set_start_time(self, start_time: str):
self.start_time = start_time
def set_stop_time(self, stop_time: str):
self.stop_time = stop_time
def set_packet_count(self, packet_count: int):
self.packet_count = packet_count
def set_pcap_filter(self, pcap_filter: str):
self.pcap_filter = pcap_filter
def set_device_ip_address(self, device_ip_address: str):
self.device_ip_address = device_ip_address
def set_device_mac_address(self, device_mac_address: str):
self.device_mac_address = device_mac_address
def set_app(self, app: str):
self.app = app
def set_app_version(self, app_version: str):
self.app_version = app_version
def set_firmware_version(self, firmware_version: str):
self.firmware_version = firmware_version
self.device_metadata.set_device_firmware_version(firmware_version)
def set_interface(self, interface: str):
self.interface = interface
def set_tcpdump_command(self, tcpdump_command: str):
self.tcpdump_command = tcpdump_command
# Other
def build_capture_file_name(self):
prefix = ""
if self.app is None:
prefix = self.device_metadata.get_device_short_name()
else:
assert str(self.app).strip() not in {"", " "}, f"app is not a valid name: {self.app}"
prefix = self.get_app()
# assert self.capture_dir is not None, f"{self.capture_dir} does not exist"
filename = f"{prefix}_{str(self.capture_id)}.pcap"
self.set_capture_file(filename)
def save_capture_metadata_to_json(self, file_path: Path = Path(CAPTURE_METADATA_FILE)):
assert self.capture_dir.is_dir(), f"capture_dir is not a directory: {self.capture_dir}"
if file_path.is_file():
print(f"File {file_path} already exists, update instead.")
return ReturnCodes.FILE_ALREADY_EXISTS
metadata = self.model_dump_json(indent=2, exclude_unset=True, exclude_none=True)
with file_path.open('w') as file:
json.dump(metadata, file)
return ReturnCodes.SUCCESS

View File

@ -1,128 +0,0 @@
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Optional, List, Any
# iottb modules
from iottb.definitions import ReturnCodes, DEVICE_METADATA_FILE
# 3rd party libs
from pydantic import BaseModel, Field
IMMUTABLE_FIELDS = {"device_name", "device_short_name", "device_id", "date_created"}
class DeviceMetadata(BaseModel):
# Required fields
device_name: str
device_short_name: str
device_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
date_created: str = Field(default_factory=lambda: datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower())
device_root_path: Path
# Optional Fields
aliases: List[str] = Field(default_factory=lambda: [])
device_type: Optional[str] = None
device_serial_number: Optional[str] = None
device_firmware_version: Optional[str] = None
date_updated: Optional[str] = None
capture_files: Optional[List[str]] = []
def __init__(self, device_name: str, device_root_dir: Path, /, **data: Any):
super().__init__(**data)
self.device_name = device_name
self.device_short_name = device_name.lower().replace(" ", "_")
# assert dir_contains_device_metadata(device_root_dir), \
# f"Directory {device_root_dir} is missing a {DEVICE_METADATA_FILE} file"
self.device_root_dir = device_root_dir
def get_device_id(self) -> str:
return self.device_id
def get_device_name(self) -> str:
return self.device_name
def get_device_short_name(self) -> str:
return self.device_short_name
def get_device_type(self) -> str:
return self.device_type
def get_device_serial_number(self) -> str:
return self.device_serial_number
def get_device_firmware_version(self) -> str:
return self.device_firmware_version
def get_date_updated(self) -> str:
return self.date_updated
def get_capture_files(self) -> List[str]:
return self.capture_files
def get_aliases(self) -> List[str]:
return self.aliases
def set_device_type(self, device_type: str) -> None:
self.device_type = device_type
self.date_updated = datetime.now().strftime('%d-%m-%YT%H:%M:%S')
def set_device_serial_number(self, device_serial_number: str) -> None:
self.device_serial_number = device_serial_number
self.date_updated = datetime.now().strftime('%d-%m-%YT%H:%M:%S')
def set_device_firmware_version(self, device_firmware_version: str) -> None:
self.device_firmware_version = device_firmware_version
self.date_updated = datetime.now().strftime('%d-%m-%YT%H:%M:%S')
def set_device_name(self, device_name: str) -> None:
self.device_name = device_name
self.device_short_name = device_name.lower().replace(" ", "_")
self.date_updated = datetime.now().strftime('%d-%m-%YT%H:%M:%S')
@classmethod
def load_from_json(cls, device_file_path: Path):
assert device_file_path.is_file(), f"{device_file_path} is not a file"
assert device_file_path.name == DEVICE_METADATA_FILE, f"{device_file_path} is not a {DEVICE_METADATA_FILE}"
device_meta_filename = device_file_path
with device_meta_filename.open('r') as file:
metadata_json = json.load(file)
metadata_model_obj = cls.model_validate_json(metadata_json)
return metadata_model_obj
def save_to_json(self, file_path: Path):
if file_path.is_file():
print(f"File {file_path} already exists, update instead.")
return ReturnCodes.FILE_ALREADY_EXISTS
metadata = self.model_dump_json(indent=2)
with file_path.open('w') as file:
json.dump(metadata, file)
return ReturnCodes.SUCCESS
@classmethod
def update_metadata_in_json(cls, file_path: Path, **kwargs):
# TODO Maybe not needed at all.
assert file_path.is_file()
for field in IMMUTABLE_FIELDS:
if field in kwargs:
print(f"Field {field} is immutable")
return ReturnCodes.IMMUTABLE
metadata = cls.load_from_json(file_path)
for field, value in kwargs.items():
if field in metadata.model_fields_set:
setattr(metadata, field, value)
metadata.date_updated = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
pass
def dir_contains_device_metadata(dir_path: Path):
if not dir_path.is_dir():
return False
else:
meta_file_path = dir_path / DEVICE_METADATA_FILE
print(f"Device metadata file path {str(meta_file_path)}")
if not meta_file_path.is_file():
return False
else:
return True

View File

@ -1,59 +0,0 @@
import pathlib
from iottb import definitions
from iottb.definitions import DEVICE_METADATA_FILE, ReturnCodes
from iottb.models.device_metadata_model import DeviceMetadata
from iottb.utils.device_metadata_utils import *
def setup_init_device_root_parser(subparsers):
parser = subparsers.add_parser("add-device", aliases=["add-device-root", "add"])
parser.add_argument("--root_dir", type=pathlib.Path, default=pathlib.Path.cwd())
group = parser.add_mutually_exclusive_group()
group.add_argument("--guided", action="store_true", help="Guided setup", default=False)
group.add_argument("--name", action="store", type=str, help="name of device")
parser.set_defaults(func=handle_add)
def handle_add(args):
print("Entered add-device-root")
if args.guided:
metadata = guided_setup(args.root_dir)
else:
device_name = args.name
args.root_dir.mkdir(parents=True, exist_ok=True)
args.root_dir.chdir()
metadata = DeviceMetadata(device_name, args.root_dir)
file_path = args.root_dir / DEVICE_METADATA_FILE
response = input(f"Confirm device metadata: {metadata.model_dump()} [y/N]")
if response.lower() not in definitions.AFFIRMATIVE_USER_RESPONSE.add(""):
configure_metadata()
assert False, "TODO implement dynamic setup"
assert metadata.model_dump() != ""
if metadata.save_to_json(file_path) == ReturnCodes.FILE_ALREADY_EXISTS:
print("Directory already contains a device metadata file. Aborting operation.")
return ReturnCodes.ABORTED
assert Path(file_path).exists(), f"{file_path} does not exist"
return ReturnCodes.SUCCESS
def configure_metadata():
pass
def guided_setup(device_root) -> DeviceMetadata:
response = "N"
device_name = ""
while response.upper() == "N":
device_name = input("Please enter name of device: ")
if device_name == "" or device_name is None:
print("Name cannot be empty")
response = input(f"Confirm device name: {device_name} [y/N] ")
assert response.lower() in definitions.AFFIRMATIVE_USER_RESPONSE.add(""), f"{response.upper()} not supported"
assert device_name != ""
assert device_name is not None
return DeviceMetadata(device_name, device_root)

View File

@ -1,6 +0,0 @@
import json
from pathlib import Path
from unittest.mock import mock_open, patch
import pytest
from iottb.utils.capture_metadata_utils import set_device_ip_address

BIN
iottb Executable file

Binary file not shown.

View File

View File

Some files were not shown because too many files have changed in this diff Show More