Add code for capture testbed. This is a huge commit. End of day sync...
This commit is contained in:
parent
038f6e308b
commit
48141f8c9c
2
.gitignore
vendored
2
.gitignore
vendored
@ -1 +1,3 @@
|
|||||||
.obsidian
|
.obsidian
|
||||||
|
venv
|
||||||
|
|
||||||
|
|||||||
32
archive/functions_dump.py
Normal file
32
archive/functions_dump.py
Normal file
@ -0,0 +1,32 @@
|
|||||||
|
def setup_sniff_tcpdump_parser(parser_sniff):
|
||||||
|
# arguments which will be passed to tcpdump
|
||||||
|
parser_sniff_tcpdump = parser_sniff.add_argument_group('tcpdump arguments')
|
||||||
|
# TODO: tcpdump_parser.add_argument('-c', '--count', re)
|
||||||
|
parser_sniff_tcpdump.add_argument("-a", "--ip-address=", help="IP address of the device to sniff", dest="device_ip")
|
||||||
|
parser_sniff_tcpdump.add_argument("-i", "--interface=", help="Interface of the capture device.", dest="capture_interface",default="")
|
||||||
|
parser_sniff_tcpdump.add_argument("-I", "--monitor-mode", help="Put interface into monitor mode",
|
||||||
|
action="store_true")
|
||||||
|
parser_sniff_tcpdump.add_argument("-n", help="Deactivate name resolution. Option is set by default.",
|
||||||
|
action="store_true")
|
||||||
|
parser_sniff_tcpdump.add_argument("-#", "--number",
|
||||||
|
help="Print packet number at beginning of line. Set by default.",
|
||||||
|
action="store_true")
|
||||||
|
parser_sniff_tcpdump.add_argument("-e", help="Print link layer headers. Option is set by default.",
|
||||||
|
action="store_true")
|
||||||
|
parser_sniff_tcpdump.add_argument("-t", action="count", default=0,
|
||||||
|
help="Please see tcpdump manual for details. Unused by default.")
|
||||||
|
|
||||||
|
|
||||||
|
def setup_sniff_parser(subparsers):
|
||||||
|
# create parser for "sniff" command
|
||||||
|
parser_sniff = subparsers.add_parser("sniff", help="Start tcpdump capture.")
|
||||||
|
setup_sniff_tcpdump_parser(parser_sniff)
|
||||||
|
setup_pcap_filter_parser(parser_sniff)
|
||||||
|
cap_size_group = parser_sniff.add_mutually_exclusive_group(required=True)
|
||||||
|
cap_size_group.add_argument("-c", "--count", type=int, help="Number of packets to capture.", default=0)
|
||||||
|
cap_size_group.add_argument("--mins", type=int, help="Time in minutes to capture.", default=60)
|
||||||
|
|
||||||
|
|
||||||
|
def setup_pcap_filter_parser(parser_sniff):
|
||||||
|
parser_pcap_filter = parser_sniff.add_argument_parser("pcap-filter expression")
|
||||||
|
pass
|
||||||
19
archive/metadata.py
Normal file
19
archive/metadata.py
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
import json
|
||||||
|
import uuid
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
|
||||||
|
class Metadata:
|
||||||
|
def __init__(self, name):
|
||||||
|
self.device = name
|
||||||
|
self.timestamp = datetime.now().timestamp()
|
||||||
|
self.capture_id = uuid.uuid4().hex
|
||||||
|
self.capture_mode = ... # TODO: eg. promiscuous/monitor/other
|
||||||
|
self.host_ip = ...
|
||||||
|
self.host_mac = ...
|
||||||
|
self.protocol = ...
|
||||||
|
|
||||||
|
|
||||||
|
def create_metadata(filename, unique_id, device_details):
|
||||||
|
date_string = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
|
||||||
|
meta_filename = f"meta_{date_string}_{unique_id}.json"
|
||||||
69
archive/metadata_utils.py
Normal file
69
archive/metadata_utils.py
Normal file
@ -0,0 +1,69 @@
|
|||||||
|
import json
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
from kydcap.models.device_metadata import DeviceMetadata
|
||||||
|
from kydcap.config import DEVICE_METADATA_FILE
|
||||||
|
|
||||||
|
|
||||||
|
def write_device_metadata_to_file(metadata: DeviceMetadata, device_path: Path):
|
||||||
|
"""Write the device metadata to a JSON file in the specified directory."""
|
||||||
|
meta_file_path = device_path / "meta.json"
|
||||||
|
meta_file_path.write_text(metadata.json(indent=2))
|
||||||
|
|
||||||
|
|
||||||
|
def confirm_device_metadata(metadata: DeviceMetadata) -> bool:
|
||||||
|
"""Display device metadata for user confirmation."""
|
||||||
|
print(metadata.json(indent=2))
|
||||||
|
return input("Confirm device metadata? (y/n): ").strip().lower() == 'y'
|
||||||
|
|
||||||
|
|
||||||
|
def get_device_metadata_from_user() -> DeviceMetadata:
|
||||||
|
"""Prompt the user to enter device details and return a populated DeviceMetadata object."""
|
||||||
|
device_name = input("Device name: ")
|
||||||
|
device_short_name = device_name.lower().replace(" ", "-")
|
||||||
|
return DeviceMetadata(device_name=device_name, device_short_name=device_short_name)
|
||||||
|
|
||||||
|
|
||||||
|
def initialize_device_root_dir(device_name: str) -> Path:
|
||||||
|
"""Create and return the path for the device directory."""
|
||||||
|
device_path = Path.cwd() / device_name
|
||||||
|
device_path.mkdir(exist_ok=True)
|
||||||
|
return device_path
|
||||||
|
|
||||||
|
|
||||||
|
def write_metadata(metadata: BaseModel, device_name: str):
|
||||||
|
"""Write device metadata to a JSON file."""
|
||||||
|
meta_path = Path.cwd() / device_name / DEVICE_METADATA_FILE
|
||||||
|
meta_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
with meta_path.open('w') as f:
|
||||||
|
json.dump(metadata.dict(), f, indent=4)
|
||||||
|
|
||||||
|
|
||||||
|
def get_device_metadata(file_path: Path) -> DeviceMetadata | None:
|
||||||
|
"""Fetch device metadata from a JSON file."""
|
||||||
|
|
||||||
|
if dev_metadata_exists(file_path):
|
||||||
|
with file_path.open('r') as f:
|
||||||
|
device_metadata_json = json.load(f)
|
||||||
|
try:
|
||||||
|
device_metadata = DeviceMetadata.model_validate_json(device_metadata_json)
|
||||||
|
return device_metadata
|
||||||
|
except ValueError as e:
|
||||||
|
print(f"Validation error for device metadata: {e}")
|
||||||
|
else:
|
||||||
|
# TODO Decide what to do (e.g. search for file etc)
|
||||||
|
print(f"No device metadata at {file_path}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def search_device_metadata(filename=DEVICE_METADATA_FILE, start_dir=Path.cwd(), max_parents=3) -> Path:
|
||||||
|
pass # TODO
|
||||||
|
|
||||||
|
|
||||||
|
def dev_metadata_exists(file_path: Path) -> bool:
|
||||||
|
if file_path.is_file():
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return False
|
||||||
0
code/kydcap/__init__.py
Normal file
0
code/kydcap/__init__.py
Normal file
19
code/kydcap/config.py
Normal file
19
code/kydcap/config.py
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
from datetime import datetime
|
||||||
|
from enum import Flag, unique, global_enum
|
||||||
|
|
||||||
|
DEVICE_METADATA_FILE = "device-metadata.json"
|
||||||
|
CAPTURE_METADATA_FILE = "capture-metadata.json"
|
||||||
|
TODAY_DATE_STRING = datetime.now().strftime("%d%b%Y").lower()
|
||||||
|
|
||||||
|
|
||||||
|
@unique
|
||||||
|
@global_enum
|
||||||
|
class ReturnCodes(Flag):
|
||||||
|
SUCCESS = 0
|
||||||
|
ABORTED = 1
|
||||||
|
FAILURE = 2
|
||||||
|
UNKNOWN = 3
|
||||||
|
FILE_NOT_FOUND = 4
|
||||||
|
FILE_ALREADY_EXISTS = 5
|
||||||
|
INVALID_ARGUMENT = 6
|
||||||
|
INVALID_ARGUMENT_VALUE = 7
|
||||||
26
code/kydcap/main.py
Normal file
26
code/kydcap/main.py
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
from subcommands.sniff import setup_sniff_parser
|
||||||
|
|
||||||
|
CAP_DIR_PREFIX = ...
|
||||||
|
|
||||||
|
|
||||||
|
######################
|
||||||
|
# Argparse setup
|
||||||
|
######################
|
||||||
|
def setup_argparse():
|
||||||
|
# create top level parser
|
||||||
|
root_parser = argparse.ArgumentParser(prog="kydcap")
|
||||||
|
subparsers = root_parser.add_subparsers(title="subcommands", required=True, dest="command")
|
||||||
|
|
||||||
|
setup_sniff_parser(subparsers)
|
||||||
|
|
||||||
|
return root_parser
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
parser = setup_argparse()
|
||||||
|
args = parser.parse_args()
|
||||||
|
# if args.command
|
||||||
|
# create_capture_directory(args.device_name)
|
||||||
0
code/kydcap/models/__init__.py
Normal file
0
code/kydcap/models/__init__.py
Normal file
47
code/kydcap/models/capture_metadata.py
Normal file
47
code/kydcap/models/capture_metadata.py
Normal file
@ -0,0 +1,47 @@
|
|||||||
|
import json
|
||||||
|
import uuid
|
||||||
|
from datetime import datetime
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Optional, Any
|
||||||
|
|
||||||
|
from pydantic import BaseModel, Field
|
||||||
|
|
||||||
|
from kydcap.config import ReturnCodes
|
||||||
|
|
||||||
|
|
||||||
|
class KydcapCaptureMetadata(BaseModel):
|
||||||
|
# Required Fields
|
||||||
|
device_id: str
|
||||||
|
capture_id: uuid.UUID = Field(default_factory=lambda: str(uuid.uuid4()))
|
||||||
|
capture_date: str = Field(default_factory=lambda: datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower())
|
||||||
|
|
||||||
|
# Statistics
|
||||||
|
start_time: str
|
||||||
|
stop_time: str
|
||||||
|
packet_count: Optional[int]
|
||||||
|
|
||||||
|
# Optional Fields
|
||||||
|
device_ip_address: Optional[str] = None
|
||||||
|
device_mac_address: Optional[str] = None
|
||||||
|
|
||||||
|
app: Optional[str] = None
|
||||||
|
app_version: Optional[str] = None
|
||||||
|
firmware_version: Optional[str] = None
|
||||||
|
|
||||||
|
def __init__(self, device_id: str, start_time: str, stop_time: str, /, **data: Any):
|
||||||
|
super().__init__(**data) # Pycharms orders
|
||||||
|
assert isinstance(device_id, str)
|
||||||
|
assert isinstance(start_time, str)
|
||||||
|
assert isinstance(stop_time, str)
|
||||||
|
self.device_id = device_id
|
||||||
|
self.start_time = start_time
|
||||||
|
self.stop_time = stop_time
|
||||||
|
|
||||||
|
def save_to_json(self, file_path: Path):
|
||||||
|
if file_path.is_file():
|
||||||
|
print(f"File {file_path} already exists, update instead.")
|
||||||
|
return ReturnCodes.FILE_ALREADY_EXISTS
|
||||||
|
metadata = self.model_dump_json(indent=2)
|
||||||
|
with file_path.open('w') as file:
|
||||||
|
json.dump(metadata, file)
|
||||||
|
return ReturnCodes.SUCCESS
|
||||||
66
code/kydcap/models/device_metadata.py
Normal file
66
code/kydcap/models/device_metadata.py
Normal file
@ -0,0 +1,66 @@
|
|||||||
|
import json
|
||||||
|
import uuid
|
||||||
|
from datetime import datetime
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Optional, List, Any
|
||||||
|
|
||||||
|
# kydcap modules
|
||||||
|
from kydcap.config import ReturnCodes
|
||||||
|
|
||||||
|
# 3rd party libs
|
||||||
|
from pydantic import BaseModel, Field
|
||||||
|
|
||||||
|
IMMUTABLE_FIELDS = {"device_name", "device_short_name", "device_id", "date_created"}
|
||||||
|
|
||||||
|
|
||||||
|
class DeviceMetadata(BaseModel):
|
||||||
|
# Required fields
|
||||||
|
device_name: str
|
||||||
|
device_short_name: str
|
||||||
|
device_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
|
||||||
|
date_created: str = Field(default_factory=lambda: datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower())
|
||||||
|
|
||||||
|
# Optional Fields
|
||||||
|
device_type: Optional[str] = None
|
||||||
|
device_serial_number: Optional[str] = None
|
||||||
|
device_firmware_version: Optional[str] = None
|
||||||
|
date_updated: Optional[str] = None
|
||||||
|
|
||||||
|
capture_files: Optional[List[str]] = []
|
||||||
|
|
||||||
|
def __init__(self, device_name: str, /, **data: Any):
|
||||||
|
super().__init__(**data)
|
||||||
|
self.device_name = device_name
|
||||||
|
self.device_short_name = device_name.lower().replace(" ", "_")
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def load_from_json(cls, file_path: Path):
|
||||||
|
assert file_path.is_file()
|
||||||
|
with file_path.open('r') as file:
|
||||||
|
metadata_json = json.load(file)
|
||||||
|
metadata_model_obj = cls.model_validate_json(metadata_json)
|
||||||
|
return metadata_model_obj
|
||||||
|
|
||||||
|
def save_to_json(self, file_path: Path):
|
||||||
|
if file_path.is_file():
|
||||||
|
print(f"File {file_path} already exists, update instead.")
|
||||||
|
return ReturnCodes.FILE_ALREADY_EXISTS
|
||||||
|
metadata = self.model_dump_json(indent=2)
|
||||||
|
with file_path.open('w') as file:
|
||||||
|
json.dump(metadata, file)
|
||||||
|
return ReturnCodes.SUCCESS
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def update_metadata_in_json(cls, file_path: Path, **kwargs):
|
||||||
|
# TODO Maybe not needed at all.
|
||||||
|
assert file_path.is_file()
|
||||||
|
for field in IMMUTABLE_FIELDS:
|
||||||
|
if field in kwargs:
|
||||||
|
print(f"Field {field} is immutable")
|
||||||
|
return ReturnCodes.IMMUTABLE
|
||||||
|
metadata = cls.load_from_json(file_path)
|
||||||
|
for field, value in kwargs.items():
|
||||||
|
if field in metadata.model_fields_set:
|
||||||
|
setattr(metadata, field, value)
|
||||||
|
metadata.date_updated = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
|
||||||
|
pass
|
||||||
39
code/kydcap/subcommands/initialize_device_root_dir.py
Normal file
39
code/kydcap/subcommands/initialize_device_root_dir.py
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
import pathlib
|
||||||
|
|
||||||
|
from kydcap.config import DEVICE_METADATA_FILE
|
||||||
|
from kydcap.models.device_metadata import DeviceMetadata
|
||||||
|
|
||||||
|
|
||||||
|
def setup_init_root_dir_parser(subparsers):
|
||||||
|
parser = subparsers.add_parser("init-device-root", aliases=["idr"])
|
||||||
|
parser.add_argument("root_dir", type=pathlib.Path, default=pathlib.Path.cwd())
|
||||||
|
group = parser.add_mutually_exclusive_group()
|
||||||
|
group.add_argument("--dynamic", action="store_false", help="enable guided setup")
|
||||||
|
group.add_argument("-n", "--name", action="store", required=True, type=str, help="name of device")
|
||||||
|
parser.set_defaults(func=handle_idr)
|
||||||
|
|
||||||
|
|
||||||
|
def handle_idr(args):
|
||||||
|
root_dir = args.root_dir
|
||||||
|
device_name = None
|
||||||
|
if args.dynamic:
|
||||||
|
response = "N"
|
||||||
|
while response == "N":
|
||||||
|
name = input("Please enter name of device: ")
|
||||||
|
# TODO extended config for other fields like apps, firmware etc.
|
||||||
|
response = input(f"Confirm device name: {name} [y/N]")
|
||||||
|
device_name = name
|
||||||
|
else:
|
||||||
|
device_name = args.name
|
||||||
|
root_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
root_dir.chdir()
|
||||||
|
dev_metadata_model = DeviceMetadata(device_name)
|
||||||
|
file_path = root_dir / device_name / DEVICE_METADATA_FILE
|
||||||
|
assert not file_path.exists(), f"{file_path} already exists"
|
||||||
|
if args.dynamic:
|
||||||
|
response = input(f"Confirm device metadata: {dev_metadata_model.model_dump()} [y/N]")
|
||||||
|
if response.lower() != "y":
|
||||||
|
assert False, "TODO implement dynamic setup"
|
||||||
|
code = dev_metadata_model.save_to_json(file_path)
|
||||||
|
print(f"Device metadata saved to {file_path}")
|
||||||
|
return code
|
||||||
91
code/kydcap/subcommands/sniff.py
Normal file
91
code/kydcap/subcommands/sniff.py
Normal file
@ -0,0 +1,91 @@
|
|||||||
|
import subprocess
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from kydcap.config import *
|
||||||
|
from kydcap.models.device_metadata import DeviceMetadata
|
||||||
|
|
||||||
|
|
||||||
|
def setup_sniff_parser(subparsers):
|
||||||
|
parser = subparsers.add_parser('sniff', help='Sniff packets with tcpdump')
|
||||||
|
# metadata args
|
||||||
|
parser.add_argument("-a", "--ip-address=", help="IP address of the device to sniff", dest="device_ip")
|
||||||
|
# tcpdump args
|
||||||
|
parser_sniff_tcpdump = parser.add_argument_group('tcpdump arguments')
|
||||||
|
parser_sniff_tcpdump.add_argument("-i", "--interface=", help="Interface to capture on.", dest="capture_interface",
|
||||||
|
default="any")
|
||||||
|
parser_sniff_tcpdump.add_argument("-I", "--monitor-mode", help="Put interface into monitor mode",
|
||||||
|
action="store_true")
|
||||||
|
parser_sniff_tcpdump.add_argument("-n", help="Deactivate name resolution. Option is set by default.",
|
||||||
|
action="store_true", dest="no_name_resolution")
|
||||||
|
parser_sniff_tcpdump.add_argument("-#", "--number",
|
||||||
|
help="Print packet number at beginning of line. Set by default.",
|
||||||
|
action="store_true")
|
||||||
|
parser_sniff_tcpdump.add_argument("-e", help="Print link layer headers. Option is set by default.",
|
||||||
|
action="store_true", dest="print_link_layer")
|
||||||
|
parser_sniff_tcpdump.add_argument("-t", action="count", default=0,
|
||||||
|
help="Please see tcpdump manual for details. Unused by default.")
|
||||||
|
# parser_sniff_tcpdump.add_argument("--filter",type=str,default="ip help=f"pcap filter expression. \
|
||||||
|
# Defaults is '{default}'")
|
||||||
|
# shared args
|
||||||
|
cap_size_group = parser.add_mutually_exclusive_group(required=False)
|
||||||
|
cap_size_group.add_argument("-c", "--count", type=int, help="Number of packets to capture.", default=0)
|
||||||
|
cap_size_group.add_argument("--mins", type=int, help="Time in minutes to capture.", default=60)
|
||||||
|
parser.set_defaults(func=handle_sniff)
|
||||||
|
# return parser
|
||||||
|
# parser.add_default(func=handle_sniff(args=sniff_args))
|
||||||
|
|
||||||
|
|
||||||
|
def cwd_is_device_root_dir() -> bool:
|
||||||
|
device_metadata_file = Path.cwd() / DEVICE_METADATA_FILE
|
||||||
|
return device_metadata_file.exists()
|
||||||
|
|
||||||
|
|
||||||
|
def start_guided_device_root_dir_setup():
|
||||||
|
assert False, "Not implemented"
|
||||||
|
|
||||||
|
|
||||||
|
def handle_metadata():
|
||||||
|
assert not cwd_is_device_root_dir()
|
||||||
|
print(f"Unable to find {DEVICE_METADATA_FILE} in current working directory")
|
||||||
|
print("You need to setup a device root directory before using this command")
|
||||||
|
response = input("Would you like to be guided through the setup? [y/n]")
|
||||||
|
if response.lower() == "y":
|
||||||
|
start_guided_device_root_dir_setup()
|
||||||
|
else:
|
||||||
|
print("'kydcap init-device-root --help' for more information.")
|
||||||
|
exit(ReturnCodes.ABORTED)
|
||||||
|
# device_id = handle_capture_metadata()
|
||||||
|
return ReturnCodes.SUCCESS
|
||||||
|
|
||||||
|
|
||||||
|
def handle_capture_metadata():
|
||||||
|
device_metadata_json = Path.cwd() / DEVICE_METADATA_FILE
|
||||||
|
device_metadata = DeviceMetadata.load_from_json(device_metadata_json)
|
||||||
|
device_id = device_metadata.device_id
|
||||||
|
return device_id
|
||||||
|
|
||||||
|
|
||||||
|
def handle_sniff(args):
|
||||||
|
if not cwd_is_device_root_dir():
|
||||||
|
handle_metadata()
|
||||||
|
else:
|
||||||
|
cmd = ['sudo tcpdump', '-i', args.capture_interface]
|
||||||
|
if args.monitor_mode:
|
||||||
|
cmd.append('-I')
|
||||||
|
if args.no_name_resolution:
|
||||||
|
cmd.append('-n')
|
||||||
|
if args.number:
|
||||||
|
cmd.append('-#')
|
||||||
|
if args.print_link_layer:
|
||||||
|
cmd.append('-e')
|
||||||
|
if args.count:
|
||||||
|
cmd.append('-c')
|
||||||
|
cmd.append(str(args.count))
|
||||||
|
elif args.mins:
|
||||||
|
pass
|
||||||
|
print('Executing: ' + ' '.join(cmd))
|
||||||
|
# TODO maybe dump this into file -> put into device metadata
|
||||||
|
start_time = datetime.now().strftime('%H:%M:%S')
|
||||||
|
subprocess.run(cmd)
|
||||||
|
stop_time = datetime.now().strftime('%H:%M:%S')
|
||||||
|
return ReturnCodes.SUCCESS
|
||||||
0
code/kydcap/utils/__init__.py
Normal file
0
code/kydcap/utils/__init__.py
Normal file
40
code/kydcap/utils/capture_metadata_utils.py
Normal file
40
code/kydcap/utils/capture_metadata_utils.py
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
import json
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from kydcap.config import ReturnCodes
|
||||||
|
|
||||||
|
|
||||||
|
def set_device_ip_address(ip_addr: str, file_path: Path):
|
||||||
|
assert ip_addr is not None
|
||||||
|
assert file_path.is_file()
|
||||||
|
with file_path.open('r') as f:
|
||||||
|
data = json.load(f)
|
||||||
|
current_ip = data["device_ip_address"]
|
||||||
|
if current_ip is not None:
|
||||||
|
print(f"Device IP Address is set to {current_ip}")
|
||||||
|
response = input(f"Do you want to change the recorded IP address to {ip_addr}? [Y/N] ")
|
||||||
|
if response.upper() == "N":
|
||||||
|
print("Aborting change to device IP address")
|
||||||
|
return ReturnCodes.ABORTED
|
||||||
|
with file_path.open('w') as f:
|
||||||
|
json.dump(data, f)
|
||||||
|
return ReturnCodes.SUCCESS
|
||||||
|
|
||||||
|
|
||||||
|
def set_device_mac_address(mac_addr: str, file_path: Path):
|
||||||
|
assert mac_addr is not None
|
||||||
|
assert file_path.is_file()
|
||||||
|
with file_path.open('r') as f:
|
||||||
|
data = json.load(f)
|
||||||
|
current_mac = data["device_mac_address"]
|
||||||
|
if current_mac is not None:
|
||||||
|
print(f"Device MAC Address is set to {current_mac}")
|
||||||
|
response = input(f"Do you want to change the recorded MAC address to {mac_addr}? [Y/N] ")
|
||||||
|
if response.upper() == "N":
|
||||||
|
print("Aborting change to device MAC address")
|
||||||
|
return ReturnCodes.ABORTED
|
||||||
|
with file_path.open('w') as f:
|
||||||
|
json.dump(data, f)
|
||||||
|
return ReturnCodes.SUCCESS
|
||||||
|
|
||||||
|
# TODO finnish for other fields in capture metadata
|
||||||
49
code/kydcap/utils/device_metadata_utils.py
Normal file
49
code/kydcap/utils/device_metadata_utils.py
Normal file
@ -0,0 +1,49 @@
|
|||||||
|
import json
|
||||||
|
from datetime import datetime
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from kydcap.config import ReturnCodes
|
||||||
|
|
||||||
|
|
||||||
|
def update_firmware_version(version: str, file_path: Path):
|
||||||
|
assert file_path.is_file()
|
||||||
|
with file_path.open('r') as file:
|
||||||
|
metadata = json.load(file)
|
||||||
|
metadata['device_firmware_version'] = version
|
||||||
|
metadata['date_updated'] = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
|
||||||
|
with file_path.open('w') as file:
|
||||||
|
json.dump(metadata, file)
|
||||||
|
return ReturnCodes.SUCCESS
|
||||||
|
|
||||||
|
|
||||||
|
def add_capture_file_reference(capture_file_reference: str, file_path: Path):
|
||||||
|
assert file_path.is_file()
|
||||||
|
with file_path.open('r') as file:
|
||||||
|
metadata = json.load(file)
|
||||||
|
metadata['capture_files'] = capture_file_reference
|
||||||
|
metadata['date_updated'] = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
|
||||||
|
with file_path.open('w') as file:
|
||||||
|
json.dump(metadata, file)
|
||||||
|
return ReturnCodes.SUCCESS
|
||||||
|
|
||||||
|
|
||||||
|
def update_device_serial_number(device_id: str, file_path: Path):
|
||||||
|
assert file_path.is_file()
|
||||||
|
with file_path.open('r') as file:
|
||||||
|
metadata = json.load(file)
|
||||||
|
metadata['device_id'] = device_id
|
||||||
|
metadata['date_updated'] = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
|
||||||
|
with file_path.open('w') as file:
|
||||||
|
json.dump(metadata, file)
|
||||||
|
return ReturnCodes.SUCCESS
|
||||||
|
|
||||||
|
|
||||||
|
def update_device_type(device_type: str, file_path: Path):
|
||||||
|
assert file_path.is_file()
|
||||||
|
with file_path.open('r') as file:
|
||||||
|
metadata = json.load(file)
|
||||||
|
metadata['device_type'] = device_type
|
||||||
|
metadata['date_updated'] = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
|
||||||
|
with file_path.open('w') as file:
|
||||||
|
json.dump(metadata, file)
|
||||||
|
return ReturnCodes.SUCCESS
|
||||||
28
code/kydcap/utils/tcpdump_utils.py
Normal file
28
code/kydcap/utils/tcpdump_utils.py
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
import shutil
|
||||||
|
import subprocess
|
||||||
|
|
||||||
|
|
||||||
|
def check_installed() -> bool:
|
||||||
|
"""Check if tcpdump is installed and available on the system path."""
|
||||||
|
return shutil.which('tcpdump') is not None
|
||||||
|
|
||||||
|
|
||||||
|
def ensure_installed():
|
||||||
|
"""Ensure that tcpdump is installed, raise an error if not."""
|
||||||
|
if not check_installed():
|
||||||
|
raise RuntimeError("tcpdump is not installed. Please install it to continue.")
|
||||||
|
|
||||||
|
|
||||||
|
def list_interfaces() -> str:
|
||||||
|
"""List available network interfaces using tcpdump."""
|
||||||
|
ensure_installed()
|
||||||
|
try:
|
||||||
|
result = subprocess.run(['tcpdump', '--list-interfaces'], capture_output=True, text=True, check=True)
|
||||||
|
return result.stdout
|
||||||
|
except subprocess.CalledProcessError as e:
|
||||||
|
print(f"Failed to list interfaces: {e}")
|
||||||
|
return ""
|
||||||
|
|
||||||
|
|
||||||
|
def start_tcpdump():
|
||||||
|
return None
|
||||||
@ -38,7 +38,7 @@ IoT.db2/
|
|||||||
└── ....
|
└── ....
|
||||||
IoT.db3/
|
IoT.db3/
|
||||||
├── Measurements/
|
├── Measurements/
|
||||||
│ ├── m1/
|
│ ├── m1/ (Specification of device in this substructure)
|
||||||
│ │ ├── follows from above
|
│ │ ├── follows from above
|
||||||
│ │ └── ...
|
│ │ └── ...
|
||||||
│ ├── m2
|
│ ├── m2
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user