WHAT
This commit is contained in:
38
archive/capture_metadata_utils.py
Normal file
38
archive/capture_metadata_utils.py
Normal file
@@ -0,0 +1,38 @@
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
from iottb.definitions import ReturnCodes
|
||||
|
||||
|
||||
def set_device_ip_address(ip_addr: str, file_path: Path):
|
||||
assert ip_addr is not None
|
||||
assert file_path.is_file()
|
||||
with file_path.open('r') as f:
|
||||
data = json.load(f)
|
||||
current_ip = data['device_ip_address']
|
||||
if current_ip is not None:
|
||||
print(f'Device IP Address is set to {current_ip}')
|
||||
response = input(f'Do you want to change the recorded IP address to {ip_addr}? [Y/N] ')
|
||||
if response.upper() == 'N':
|
||||
print('Aborting change to device IP address')
|
||||
return ReturnCodes.ABORTED
|
||||
with file_path.open('w') as f:
|
||||
json.dump(data, f)
|
||||
return ReturnCodes.SUCCESS
|
||||
|
||||
|
||||
def set_device_mac_address(mac_addr: str, file_path: Path):
|
||||
assert mac_addr is not None
|
||||
assert file_path.is_file()
|
||||
with file_path.open('r') as f:
|
||||
data = json.load(f)
|
||||
current_mac = data['device_mac_address']
|
||||
if current_mac is not None:
|
||||
print(f'Device MAC Address is set to {current_mac}')
|
||||
response = input(f'Do you want to change the recorded MAC address to {mac_addr}? [Y/N] ')
|
||||
if response.upper() == 'N':
|
||||
print('Aborting change to device MAC address')
|
||||
return ReturnCodes.ABORTED
|
||||
with file_path.open('w') as f:
|
||||
json.dump(data, f)
|
||||
return ReturnCodes.SUCCESS
|
||||
51
archive/device_metadata_utils.py
Normal file
51
archive/device_metadata_utils.py
Normal file
@@ -0,0 +1,51 @@
|
||||
import json
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
from iottb.definitions import ReturnCodes
|
||||
|
||||
|
||||
def update_firmware_version(version: str, file_path: Path):
|
||||
assert file_path.is_file()
|
||||
with file_path.open('r') as file:
|
||||
metadata = json.load(file)
|
||||
metadata['device_firmware_version'] = version
|
||||
metadata['date_updated'] = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
|
||||
with file_path.open('w') as file:
|
||||
json.dump(metadata, file)
|
||||
return ReturnCodes.SUCCESS
|
||||
|
||||
|
||||
def add_capture_file_reference(capture_file_reference: str, file_path: Path):
|
||||
assert file_path.is_file()
|
||||
with file_path.open('r') as file:
|
||||
metadata = json.load(file)
|
||||
metadata['capture_files'] = capture_file_reference
|
||||
metadata['date_updated'] = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
|
||||
with file_path.open('w') as file:
|
||||
json.dump(metadata, file)
|
||||
return ReturnCodes.SUCCESS
|
||||
|
||||
|
||||
def update_device_serial_number(device_id: str, file_path: Path):
|
||||
assert file_path.is_file()
|
||||
with file_path.open('r') as file:
|
||||
metadata = json.load(file)
|
||||
metadata['device_id'] = device_id
|
||||
metadata['date_updated'] = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
|
||||
with file_path.open('w') as file:
|
||||
json.dump(metadata, file)
|
||||
return ReturnCodes.SUCCESS
|
||||
|
||||
|
||||
def update_device_type(device_type: str, file_path: Path):
|
||||
assert file_path.is_file()
|
||||
with file_path.open('r') as file:
|
||||
metadata = json.load(file)
|
||||
metadata['device_type'] = device_type
|
||||
metadata['date_updated'] = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
|
||||
with file_path.open('w') as file:
|
||||
json.dump(metadata, file)
|
||||
return ReturnCodes.SUCCESS
|
||||
|
||||
|
||||
32
archive/functions_dump.py
Normal file
32
archive/functions_dump.py
Normal file
@@ -0,0 +1,32 @@
|
||||
def setup_sniff_tcpdump_parser(parser_sniff):
|
||||
# arguments which will be passed to tcpdump
|
||||
parser_sniff_tcpdump = parser_sniff.add_argument_group('tcpdump arguments')
|
||||
# TODO: tcpdump_parser.add_argument('-c', '--count', re)
|
||||
parser_sniff_tcpdump.add_argument('-a', '--ip-address=', help='IP address of the device to sniff', dest='device_ip')
|
||||
parser_sniff_tcpdump.add_argument('-i', '--interface=', help='Interface of the capture device.', dest='capture_interface',default='')
|
||||
parser_sniff_tcpdump.add_argument('-I', '--monitor-mode', help='Put interface into monitor mode',
|
||||
action='store_true')
|
||||
parser_sniff_tcpdump.add_argument('-n', help='Deactivate name resolution. Option is set by default.',
|
||||
action='store_true')
|
||||
parser_sniff_tcpdump.add_argument('-#', '--number',
|
||||
help='Print packet number at beginning of line. Set by default.',
|
||||
action='store_true')
|
||||
parser_sniff_tcpdump.add_argument('-e', help='Print link layer headers. Option is set by default.',
|
||||
action='store_true')
|
||||
parser_sniff_tcpdump.add_argument('-t', action='count', default=0,
|
||||
help='Please see tcpdump manual for details. Unused by default.')
|
||||
|
||||
|
||||
def setup_sniff_parser(subparsers):
|
||||
# create parser for 'sniff' command
|
||||
parser_sniff = subparsers.add_parser('sniff', help='Start tcpdump capture.')
|
||||
setup_sniff_tcpdump_parser(parser_sniff)
|
||||
setup_pcap_filter_parser(parser_sniff)
|
||||
cap_size_group = parser_sniff.add_mutually_exclusive_group(required=True)
|
||||
cap_size_group.add_argument('-c', '--count', type=int, help='Number of packets to capture.', default=0)
|
||||
cap_size_group.add_argument('--mins', type=int, help='Time in minutes to capture.', default=60)
|
||||
|
||||
|
||||
def setup_pcap_filter_parser(parser_sniff):
|
||||
parser_pcap_filter = parser_sniff.add_argument_parser('pcap-filter expression')
|
||||
pass
|
||||
19
archive/metadata.py
Normal file
19
archive/metadata.py
Normal file
@@ -0,0 +1,19 @@
|
||||
import json
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
class Metadata:
|
||||
def __init__(self, name):
|
||||
self.device = name
|
||||
self.timestamp = datetime.now().timestamp()
|
||||
self.capture_id = uuid.uuid4().hex
|
||||
self.capture_mode = ... # TODO: eg. promiscuous/monitor/other
|
||||
self.host_ip = ...
|
||||
self.host_mac = ...
|
||||
self.protocol = ...
|
||||
|
||||
|
||||
def create_metadata(filename, unique_id, device_details):
|
||||
date_string = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
|
||||
meta_filename = f'meta_{date_string}_{unique_id}.json'
|
||||
69
archive/metadata_utils.py
Normal file
69
archive/metadata_utils.py
Normal file
@@ -0,0 +1,69 @@
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from iottb.models.device_metadata_model import DeviceMetadata
|
||||
from iottb.definitions import DEVICE_METADATA_FILE
|
||||
|
||||
|
||||
def write_device_metadata_to_file(metadata: DeviceMetadata, device_path: Path):
|
||||
'''Write the device metadata to a JSON file in the specified directory.'''
|
||||
meta_file_path = device_path / 'meta.json'
|
||||
meta_file_path.write_text(metadata.json(indent=2))
|
||||
|
||||
|
||||
def confirm_device_metadata(metadata: DeviceMetadata) -> bool:
|
||||
'''Display device metadata for user confirmation.'''
|
||||
print(metadata.json(indent=2))
|
||||
return input('Confirm device metadata? (y/n): ').strip().lower() == 'y'
|
||||
|
||||
|
||||
def get_device_metadata_from_user() -> DeviceMetadata:
|
||||
'''Prompt the user to enter device details and return a populated DeviceMetadata object.'''
|
||||
device_name = input('Device name: ')
|
||||
device_short_name = device_name.lower().replace(' ', '-')
|
||||
return DeviceMetadata(device_name=device_name, device_short_name=device_short_name)
|
||||
|
||||
|
||||
def initialize_device_root_dir(device_name: str) -> Path:
|
||||
'''Create and return the path for the device directory.'''
|
||||
device_path = Path.cwd() / device_name
|
||||
device_path.mkdir(exist_ok=True)
|
||||
return device_path
|
||||
|
||||
|
||||
def write_metadata(metadata: BaseModel, device_name: str):
|
||||
'''Write device metadata to a JSON file.'''
|
||||
meta_path = Path.cwd() / device_name / DEVICE_METADATA_FILE
|
||||
meta_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with meta_path.open('w') as f:
|
||||
json.dump(metadata.dict(), f, indent=4)
|
||||
|
||||
|
||||
def get_device_metadata(file_path: Path) -> DeviceMetadata | None:
|
||||
'''Fetch device metadata from a JSON file.'''
|
||||
|
||||
if dev_metadata_exists(file_path):
|
||||
with file_path.open('r') as f:
|
||||
device_metadata_json = json.load(f)
|
||||
try:
|
||||
device_metadata = DeviceMetadata.from_json(device_metadata_json)
|
||||
return device_metadata
|
||||
except ValueError as e:
|
||||
print(f'Validation error for device metadata: {e}')
|
||||
else:
|
||||
# TODO Decide what to do (e.g. search for file etc)
|
||||
print(f'No device metadata at {file_path}')
|
||||
return None
|
||||
|
||||
|
||||
def search_device_metadata(filename=DEVICE_METADATA_FILE, start_dir=Path.cwd(), max_parents=3) -> Path:
|
||||
pass # TODO
|
||||
|
||||
|
||||
def dev_metadata_exists(file_path: Path) -> bool:
|
||||
if file_path.is_file():
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
Reference in New Issue
Block a user