Prepare main branch for rebase

This commit is contained in:
Sebastian Lenzlinger 2024-05-07 21:19:04 +02:00
parent 822b49ed8b
commit de88c05349
16 changed files with 2 additions and 500 deletions

View File

@ -1,40 +0,0 @@
#!/usr/bin/env python3
import argparse
from kydcap.subcommands.sniff import setup_sniff_parser
from kydcap.subcommands.initialize_device_root_dir import setup_init_root_dir_parser
CAP_DIR_PREFIX = ...
######################
# Argparse setup
######################
def setup_argparse():
# create top level parser
root_parser = argparse.ArgumentParser(prog="kydcap")
subparsers = root_parser.add_subparsers(title="subcommands", required=True, dest="command")
setup_sniff_parser(subparsers)
setup_init_root_dir_parser(subparsers)
return root_parser
def main():
parser = setup_argparse()
args = parser.parse_args()
print(args)
if args.command:
try:
args.func(args)
except KeyboardInterrupt:
print("Received keyboard interrupt. Exiting...")
exit(1)
except Exception as e:
print(f"Error: {e}")
# create_capture_directory(args.device_name)
if __name__ == "__main__":
main()

View File

@ -1,20 +0,0 @@
from datetime import datetime
from enum import Flag, unique, global_enum
DEVICE_METADATA_FILE = "device-metadata.json"
CAPTURE_METADATA_FILE = "capture-metadata.json"
TODAY_DATE_STRING = datetime.now().strftime("%d%b%Y").lower()
@unique
@global_enum
class ReturnCodes(Flag):
SUCCESS = 0
ABORTED = 1
FAILURE = 2
UNKNOWN = 3
FILE_NOT_FOUND = 4
FILE_ALREADY_EXISTS = 5
INVALID_ARGUMENT = 6
INVALID_ARGUMENT_VALUE = 7

View File

@ -1,47 +0,0 @@
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Optional, Any
from pydantic import BaseModel, Field
from kydcap.config import ReturnCodes
class KydcapCaptureMetadata(BaseModel):
# Required Fields
device_id: str
capture_id: uuid.UUID = Field(default_factory=lambda: str(uuid.uuid4()))
capture_date: str = Field(default_factory=lambda: datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower())
# Statistics
start_time: str
stop_time: str
packet_count: Optional[int]
# Optional Fields
device_ip_address: Optional[str] = None
device_mac_address: Optional[str] = None
app: Optional[str] = None
app_version: Optional[str] = None
firmware_version: Optional[str] = None
def __init__(self, device_id: str, start_time: str, stop_time: str, /, **data: Any):
super().__init__(**data) # Pycharms orders
assert isinstance(device_id, str)
assert isinstance(start_time, str)
assert isinstance(stop_time, str)
self.device_id = device_id
self.start_time = start_time
self.stop_time = stop_time
def save_to_json(self, file_path: Path):
if file_path.is_file():
print(f"File {file_path} already exists, update instead.")
return ReturnCodes.FILE_ALREADY_EXISTS
metadata = self.model_dump_json(indent=2)
with file_path.open('w') as file:
json.dump(metadata, file)
return ReturnCodes.SUCCESS

View File

@ -1,66 +0,0 @@
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Optional, List, Any
# kydcap modules
from kydcap.config import ReturnCodes
# 3rd party libs
from pydantic import BaseModel, Field
IMMUTABLE_FIELDS = {"device_name", "device_short_name", "device_id", "date_created"}
class DeviceMetadata(BaseModel):
# Required fields
device_name: str
device_short_name: str
device_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
date_created: str = Field(default_factory=lambda: datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower())
# Optional Fields
device_type: Optional[str] = None
device_serial_number: Optional[str] = None
device_firmware_version: Optional[str] = None
date_updated: Optional[str] = None
capture_files: Optional[List[str]] = []
def __init__(self, device_name: str, /, **data: Any):
super().__init__(**data)
self.device_name = device_name
self.device_short_name = device_name.lower().replace(" ", "_")
@classmethod
def load_from_json(cls, file_path: Path):
assert file_path.is_file()
with file_path.open('r') as file:
metadata_json = json.load(file)
metadata_model_obj = cls.model_validate_json(metadata_json)
return metadata_model_obj
def save_to_json(self, file_path: Path):
if file_path.is_file():
print(f"File {file_path} already exists, update instead.")
return ReturnCodes.FILE_ALREADY_EXISTS
metadata = self.model_dump_json(indent=2)
with file_path.open('w') as file:
json.dump(metadata, file)
return ReturnCodes.SUCCESS
@classmethod
def update_metadata_in_json(cls, file_path: Path, **kwargs):
# TODO Maybe not needed at all.
assert file_path.is_file()
for field in IMMUTABLE_FIELDS:
if field in kwargs:
print(f"Field {field} is immutable")
return ReturnCodes.IMMUTABLE
metadata = cls.load_from_json(file_path)
for field, value in kwargs.items():
if field in metadata.model_fields_set:
setattr(metadata, field, value)
metadata.date_updated = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
pass

View File

@ -1,55 +0,0 @@
#!/usr/bin/env bash
# Note, this is not my original work. Source: https://linuxtldr.com/changing-interface-mode/
function list_nic_info () {
ip addr show
}
function enable_monm_iw () {
interface=$1
sudo ip link set "$interface" down
sudo iw "$interface" set monitor control
sudo ip link set "$interface" up
}
function disable_monm_iw () {
interface=$1
sudo ip link set "$interface" down
sudo iw "$interface" set type managed
sudo ip link set "$interface" up
}
function enable_monm_iwconfig () {
interface=$1
sudo ifconfig "$interface" down
sudo iwconfig "$interface" mode monitor
sudo ifconfig "$interface" up
}
function disable_monm_iwconfig () {
interface=$1
sudo ifconfig "$interface" down
sudo iwconfig "$interface" mode managed
sudo ifconfig "$interface" up
}
function enable_monm_acng () {
interface=$1
sudo airmon-ng check
sudo airmon-ng check kill
sudo airmon-ng start "$interface"
}
function disable_monm_acng () {
interface="${1}mon"
sudo airmon-ng stop "$interface"
sudo systemctl restart NetworkManager
}
if declare -f "$1" > /dev/null
then
"$@"
else
echo "Unknown function '$1'" >&2
exit 1
fi

View File

@ -1,40 +0,0 @@
import pathlib
from kydcap.config import DEVICE_METADATA_FILE
from kydcap.models.device_metadata_model import DeviceMetadata
def setup_init_root_dir_parser(subparsers):
parser = subparsers.add_parser("init-device-root", aliases=["idr"])
parser.add_argument("--root_dir", type=pathlib.Path, default=pathlib.Path.cwd())
group = parser.add_mutually_exclusive_group()
group.add_argument("--dynamic", action="store_true", help="enable guided setup", default=False)
group.add_argument("-n", "--name", action="store", type=str, help="name of device")
parser.set_defaults(func=handle_idr)
def handle_idr(args):
print("Entered kydcap initialize-device-root")
root_dir = args.root_dir
device_name = None
if args.dynamic:
response = "N"
while response == "N":
name = input("Please enter name of device: ")
# TODO extended config for other fields like apps, firmware etc.
response = input(f"Confirm device name: {name} [y/N]")
device_name = name
else:
device_name = args.name
root_dir.mkdir(parents=True, exist_ok=True)
root_dir.chdir()
dev_metadata_model = DeviceMetadata(device_name)
file_path = root_dir / device_name / DEVICE_METADATA_FILE
assert not file_path.exists(), f"{file_path} already exists"
if args.dynamic:
response = input(f"Confirm device metadata: {dev_metadata_model.model_dump()} [y/N]")
if response.lower() != "y":
assert False, "TODO implement dynamic setup"
code = dev_metadata_model.save_to_json(file_path)
print(f"Device metadata saved to {file_path}")
return code

View File

@ -1,104 +0,0 @@
import subprocess
from pathlib import Path
from kydcap.config import *
from kydcap.models.device_metadata_model import DeviceMetadata
def setup_sniff_parser(subparsers):
parser = subparsers.add_parser('sniff', help='Sniff packets with tcpdump')
# metadata args
parser.add_argument("-a", "--ip-address=", help="IP address of the device to sniff", dest="device_ip")
# tcpdump args
parser_sniff_tcpdump = parser.add_argument_group('tcpdump arguments')
parser_sniff_tcpdump.add_argument("-i", "--interface=", help="Interface to capture on.", dest="capture_interface",
default="any")
parser_sniff_tcpdump.add_argument("-I", "--monitor-mode", help="Put interface into monitor mode",
action="store_true")
parser_sniff_tcpdump.add_argument("-n", help="Deactivate name resolution. Option is set by default.",
action="store_true", dest="no_name_resolution")
parser_sniff_tcpdump.add_argument("-#", "--number",
help="Print packet number at beginning of line. Set by default.",
action="store_true")
parser_sniff_tcpdump.add_argument("-e", help="Print link layer headers. Option is set by default.",
action="store_true", dest="print_link_layer")
parser_sniff_tcpdump.add_argument("-t", action="count", default=0,
help="Please see tcpdump manual for details. Unused by default.")
# parser_sniff_tcpdump.add_argument("--filter",type=str,default="ip help=f"pcap filter expression. \
# Defaults is '{default}'")
# shared args
cap_size_group = parser.add_mutually_exclusive_group(required=False)
cap_size_group.add_argument("-c", "--count", type=int, help="Number of packets to capture.", default=0)
cap_size_group.add_argument("--mins", type=int, help="Time in minutes to capture.", default=60)
parser.set_defaults(func=handle_sniff)
# return parser
# parser.add_default(func=handle_sniff(args=sniff_args))
def cwd_is_device_root_dir() -> bool:
device_metadata_file = Path.cwd() / DEVICE_METADATA_FILE
return device_metadata_file.exists()
def start_guided_device_root_dir_setup():
assert False, "Not implemented"
def handle_metadata():
assert not cwd_is_device_root_dir()
print(f"Unable to find {DEVICE_METADATA_FILE} in current working directory")
print("You need to setup a device root directory before using this command")
response = input("Would you like to be guided through the setup? [y/n]")
if response.lower() == "y":
start_guided_device_root_dir_setup()
else:
print("'kydcap init-device-root --help' for more information.")
exit(ReturnCodes.ABORTED)
# device_id = handle_capture_metadata()
return ReturnCodes.SUCCESS
def handle_capture_metadata():
device_metadata_json = Path.cwd() / DEVICE_METADATA_FILE
device_metadata = DeviceMetadata.load_from_json(device_metadata_json)
device_id = device_metadata.device_id
return device_id
def handle_sniff(args):
if not cwd_is_device_root_dir():
handle_metadata()
else:
cmd = ['sudo', 'tcpdump', '-i', args.capture_interface]
if args.monitor_mode:
cmd.append('-I')
if args.no_name_resolution:
cmd.append('-n')
if args.number:
cmd.append('-#')
if args.print_link_layer:
cmd.append('-e')
if args.count:
cmd.append('-c')
cmd.append(str(args.count))
elif args.mins:
pass
print('Complete command:' + ' '.join(cmd))
# TODO maybe dump this into file -> put into device metadata
# TODO generate pcap filename
# TODO construct capture metadata file
try:
start_time = datetime.now().strftime('%H:%M:%S')
subprocess.run(cmd)
stop_time = datetime.now().strftime('%H:%M:%S')
except KeyboardInterrupt:
print("Received keyboard interrupt.")
exit(ReturnCodes.ABORTED)
except subprocess.CalledProcessError as e:
print(f"Failed to capture packet: {e}")
exit(ReturnCodes.FAILURE)
except Exception as e:
print(f"Failed to capture packet: {e}")
exit(ReturnCodes.FAILURE)
return ReturnCodes.SUCCESS

View File

@ -1,40 +0,0 @@
import json
from pathlib import Path
from kydcap.config import ReturnCodes
def set_device_ip_address(ip_addr: str, file_path: Path):
assert ip_addr is not None
assert file_path.is_file()
with file_path.open('r') as f:
data = json.load(f)
current_ip = data["device_ip_address"]
if current_ip is not None:
print(f"Device IP Address is set to {current_ip}")
response = input(f"Do you want to change the recorded IP address to {ip_addr}? [Y/N] ")
if response.upper() == "N":
print("Aborting change to device IP address")
return ReturnCodes.ABORTED
with file_path.open('w') as f:
json.dump(data, f)
return ReturnCodes.SUCCESS
def set_device_mac_address(mac_addr: str, file_path: Path):
assert mac_addr is not None
assert file_path.is_file()
with file_path.open('r') as f:
data = json.load(f)
current_mac = data["device_mac_address"]
if current_mac is not None:
print(f"Device MAC Address is set to {current_mac}")
response = input(f"Do you want to change the recorded MAC address to {mac_addr}? [Y/N] ")
if response.upper() == "N":
print("Aborting change to device MAC address")
return ReturnCodes.ABORTED
with file_path.open('w') as f:
json.dump(data, f)
return ReturnCodes.SUCCESS
# TODO finnish for other fields in capture metadata

View File

@ -1,49 +0,0 @@
import json
from datetime import datetime
from pathlib import Path
from kydcap.config import ReturnCodes
def update_firmware_version(version: str, file_path: Path):
assert file_path.is_file()
with file_path.open('r') as file:
metadata = json.load(file)
metadata['device_firmware_version'] = version
metadata['date_updated'] = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
with file_path.open('w') as file:
json.dump(metadata, file)
return ReturnCodes.SUCCESS
def add_capture_file_reference(capture_file_reference: str, file_path: Path):
assert file_path.is_file()
with file_path.open('r') as file:
metadata = json.load(file)
metadata['capture_files'] = capture_file_reference
metadata['date_updated'] = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
with file_path.open('w') as file:
json.dump(metadata, file)
return ReturnCodes.SUCCESS
def update_device_serial_number(device_id: str, file_path: Path):
assert file_path.is_file()
with file_path.open('r') as file:
metadata = json.load(file)
metadata['device_id'] = device_id
metadata['date_updated'] = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
with file_path.open('w') as file:
json.dump(metadata, file)
return ReturnCodes.SUCCESS
def update_device_type(device_type: str, file_path: Path):
assert file_path.is_file()
with file_path.open('r') as file:
metadata = json.load(file)
metadata['device_type'] = device_type
metadata['date_updated'] = datetime.now().strftime('%d-%m-%YT%H:%M:%S').lower()
with file_path.open('w') as file:
json.dump(metadata, file)
return ReturnCodes.SUCCESS

View File

@ -1,29 +0,0 @@
import shutil
import subprocess
DEPENDENCIES =
def check_installed(tool) -> bool:
"""Check if tcpdump is installed and available on the system path."""
return shutil.which(f'{tool}') is not None
def ensure_installed(tool):
"""Ensure that tcpdump is installed, raise an error if not."""
if not check_installed(tool):
raise RuntimeError("tcpdump is not installed. Please install it to continue.")
def list_interfaces() -> str:
"""List available network interfaces using tcpdump."""
ensure_installed()
try:
result = subprocess.run(['tcpdump', '--list-interfaces'], capture_output=True, text=True, check=True)
return result.stdout
except subprocess.CalledProcessError as e:
print(f"Failed to list interfaces: {e}")
return ""
def start_tcpdump():
return None

View File

@ -1,10 +0,0 @@
import subprocess
def enable_monitor_mode(interface):
pass
def disable_monitor_mode(interface):
pass
def get_ap_channel(interface):
pass

View File

@ -0,0 +1,2 @@
def test_save_to_json():
assert False