Refactor and add Logger
This commit is contained in:
0
code/iottb/utils/__init__.py
Normal file
0
code/iottb/utils/__init__.py
Normal file
38
code/iottb/utils/capture_metadata_utils.py
Normal file
38
code/iottb/utils/capture_metadata_utils.py
Normal file
@@ -0,0 +1,38 @@
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
from iottb.definitions import ReturnCodes
|
||||
|
||||
|
||||
def set_device_ip_address(ip_addr: str, file_path: Path):
|
||||
assert ip_addr is not None
|
||||
assert file_path.is_file()
|
||||
with file_path.open('r') as f:
|
||||
data = json.load(f)
|
||||
current_ip = data["device_ip_address"]
|
||||
if current_ip is not None:
|
||||
print(f"Device IP Address is set to {current_ip}")
|
||||
response = input(f"Do you want to change the recorded IP address to {ip_addr}? [Y/N] ")
|
||||
if response.upper() == "N":
|
||||
print("Aborting change to device IP address")
|
||||
return ReturnCodes.ABORTED
|
||||
with file_path.open('w') as f:
|
||||
json.dump(data, f)
|
||||
return ReturnCodes.SUCCESS
|
||||
|
||||
|
||||
def set_device_mac_address(mac_addr: str, file_path: Path):
|
||||
assert mac_addr is not None
|
||||
assert file_path.is_file()
|
||||
with file_path.open('r') as f:
|
||||
data = json.load(f)
|
||||
current_mac = data["device_mac_address"]
|
||||
if current_mac is not None:
|
||||
print(f"Device MAC Address is set to {current_mac}")
|
||||
response = input(f"Do you want to change the recorded MAC address to {mac_addr}? [Y/N] ")
|
||||
if response.upper() == "N":
|
||||
print("Aborting change to device MAC address")
|
||||
return ReturnCodes.ABORTED
|
||||
with file_path.open('w') as f:
|
||||
json.dump(data, f)
|
||||
return ReturnCodes.SUCCESS
|
||||
44
code/iottb/utils/capture_utils.py
Normal file
44
code/iottb/utils/capture_utils.py
Normal file
@@ -0,0 +1,44 @@
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
from iottb.models.device_metadata_model import dir_contains_device_metadata
|
||||
from iottb.utils.utils import get_iso_date
|
||||
|
||||
|
||||
def get_capture_uuid():
|
||||
return str(uuid.uuid4())
|
||||
|
||||
|
||||
def get_capture_date_folder(device_root: Path):
|
||||
today_iso = get_iso_date()
|
||||
today_folder = device_root / today_iso
|
||||
if dir_contains_device_metadata(device_root):
|
||||
if not today_folder.is_dir():
|
||||
try:
|
||||
today_folder.mkdir()
|
||||
except FileExistsError:
|
||||
print(f"Folder {today_folder} already exists")
|
||||
return today_folder
|
||||
raise FileNotFoundError(f"Given path {device_root} is not a device root directory")
|
||||
|
||||
|
||||
def get_capture_src_folder(device_folder: Path):
|
||||
assert device_folder.is_dir(), f"Given path {device_folder} is not a folder"
|
||||
today_iso = get_iso_date()
|
||||
max_sequence_number = 1
|
||||
for d in device_folder.iterdir():
|
||||
if d.is_dir() and d.name.startswith(f'{today_iso}_capture_'):
|
||||
name = d.name
|
||||
num = int(name.split("_")[2])
|
||||
max_sequence_number = max(max_sequence_number, num)
|
||||
|
||||
next_sequence_number = max_sequence_number + 1
|
||||
return device_folder.joinpath(f"{today_iso}_capture_{next_sequence_number:03}")
|
||||
|
||||
|
||||
def make_capture_src_folder(capture_src_folder: Path):
|
||||
try:
|
||||
capture_src_folder.mkdir()
|
||||
except FileExistsError:
|
||||
print(f"Folder {capture_src_folder} already exists")
|
||||
finally:
|
||||
return capture_src_folder
|
||||
51
code/iottb/utils/device_metadata_utils.py
Normal file
51
code/iottb/utils/device_metadata_utils.py
Normal file
@@ -0,0 +1,51 @@
|
||||
import json
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
from iottb.definitions import ReturnCodes
|
||||
|
||||
|
||||
def update_firmware_version(version: str, file_path: Path):
|
||||
assert file_path.is_file()
|
||||
with file_path.open('r') as file:
|
||||
metadata = json.load(file)
|
||||
metadata['device_firmware_version'] = version
|
||||
metadata['date_updated'] = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
|
||||
with file_path.open('w') as file:
|
||||
json.dump(metadata, file)
|
||||
return ReturnCodes.SUCCESS
|
||||
|
||||
|
||||
def add_capture_file_reference(capture_file_reference: str, file_path: Path):
|
||||
assert file_path.is_file()
|
||||
with file_path.open('r') as file:
|
||||
metadata = json.load(file)
|
||||
metadata['capture_files'] = capture_file_reference
|
||||
metadata['date_updated'] = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
|
||||
with file_path.open('w') as file:
|
||||
json.dump(metadata, file)
|
||||
return ReturnCodes.SUCCESS
|
||||
|
||||
|
||||
def update_device_serial_number(device_id: str, file_path: Path):
|
||||
assert file_path.is_file()
|
||||
with file_path.open('r') as file:
|
||||
metadata = json.load(file)
|
||||
metadata['device_id'] = device_id
|
||||
metadata['date_updated'] = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
|
||||
with file_path.open('w') as file:
|
||||
json.dump(metadata, file)
|
||||
return ReturnCodes.SUCCESS
|
||||
|
||||
|
||||
def update_device_type(device_type: str, file_path: Path):
|
||||
assert file_path.is_file()
|
||||
with file_path.open('r') as file:
|
||||
metadata = json.load(file)
|
||||
metadata['device_type'] = device_type
|
||||
metadata['date_updated'] = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
|
||||
with file_path.open('w') as file:
|
||||
json.dump(metadata, file)
|
||||
return ReturnCodes.SUCCESS
|
||||
|
||||
|
||||
41
code/iottb/utils/tcpdump_utils.py
Normal file
41
code/iottb/utils/tcpdump_utils.py
Normal file
@@ -0,0 +1,41 @@
|
||||
import ipaddress
|
||||
import shutil
|
||||
import subprocess
|
||||
from typing import Optional
|
||||
|
||||
|
||||
def check_installed() -> bool:
|
||||
"""Check if tcpdump is installed and available on the system path."""
|
||||
return shutil.which('tcpdump') is not None
|
||||
|
||||
|
||||
def ensure_installed():
|
||||
"""Ensure that tcpdump is installed, raise an error if not."""
|
||||
if not check_installed():
|
||||
raise RuntimeError("tcpdump is not installed. Please install it to continue.")
|
||||
|
||||
|
||||
def list_interfaces() -> str:
|
||||
"""List available network interfaces using tcpdump."""
|
||||
ensure_installed()
|
||||
try:
|
||||
result = subprocess.run(['tcpdump', '--list-interfaces'], capture_output=True, text=True, check=True)
|
||||
return result.stdout
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"Failed to list interfaces: {e}")
|
||||
return ""
|
||||
|
||||
|
||||
def is_valid_ipv4(ip: str) -> bool:
|
||||
try:
|
||||
ipaddress.IPv4Address(ip)
|
||||
return True
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
def str_to_ipv4(ip: str) -> (bool, Optional[ipaddress]):
|
||||
try:
|
||||
address = ipaddress.IPv4Address(ip)
|
||||
return address == ipaddress.IPv4Address(ip), address
|
||||
except ipaddress.AddressValueError:
|
||||
return False, None
|
||||
18
code/iottb/utils/utils.py
Normal file
18
code/iottb/utils/utils.py
Normal file
@@ -0,0 +1,18 @@
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from iottb.definitions import TODAY_DATE_STRING, DEVICE_METADATA_FILE, CAPTURE_METADATA_FILE
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def get_iso_date():
|
||||
return datetime.now().strftime('%Y-%m-%d')
|
||||
|
||||
|
||||
def subfolder_exists(parent: Path, child: str):
|
||||
return parent.joinpath(child).exists()
|
||||
|
||||
|
||||
def generate_unique_string_with_prefix(prefix: str):
|
||||
return prefix + "_" + str(uuid.uuid4())
|
||||
|
||||
|
||||
Reference in New Issue
Block a user