Why am I so slow???????????? Just sync commit. Slowly but surely getting allong with this refactoring.
This commit is contained in:
parent
7d9095f113
commit
bf33dfe3a8
@ -30,3 +30,30 @@ def setup_sniff_parser(subparsers):
|
|||||||
def setup_pcap_filter_parser(parser_sniff):
|
def setup_pcap_filter_parser(parser_sniff):
|
||||||
parser_pcap_filter = parser_sniff.add_argument_parser('pcap-filter expression')
|
parser_pcap_filter = parser_sniff.add_argument_parser('pcap-filter expression')
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def check_iottb_env():
|
||||||
|
# This makes the option '--root-dir' obsolescent # TODO How to streamline this?\
|
||||||
|
try:
|
||||||
|
iottb_home = environ['IOTTB_HOME'] # TODO WARN implicit declaration of env var name!
|
||||||
|
except KeyError:
|
||||||
|
logger.error(f"Environment variable 'IOTTB_HOME' is not set."
|
||||||
|
f"Setting environment variable 'IOTTB_HOME' to '~/{IOTTB_HOME_ABS}'")
|
||||||
|
environ['IOTTB_HOME'] = IOTTB_HOME_ABS
|
||||||
|
finally:
|
||||||
|
if not Path(IOTTB_HOME_ABS).exists():
|
||||||
|
print(f'"{IOTTB_HOME_ABS}" does not exist.')
|
||||||
|
response = input('Do you want to create it now? [y/N]')
|
||||||
|
logger.debug(f'response: {response}')
|
||||||
|
if response.lower() != 'y':
|
||||||
|
logger.debug(f'Not setting "IOTTB_HOME"')
|
||||||
|
print('TODO')
|
||||||
|
print("Aborting execution...")
|
||||||
|
return ReturnCodes.ABORTED
|
||||||
|
else:
|
||||||
|
print(f'Setting environment variable IOTTB_HOME""')
|
||||||
|
Path(IOTTB_HOME_ABS).mkdir(parents=True,
|
||||||
|
exist_ok=False) # Should always work since in 'not exist' code path
|
||||||
|
return ReturnCodes.SUCCESS
|
||||||
|
logger.info(f'"{IOTTB_HOME_ABS}" exists.')
|
||||||
|
# TODO: Check that it is a valid iottb dir or can we say it is valid by definition if?
|
||||||
|
return ReturnCodes.SUCCESS
|
||||||
|
|||||||
@ -4,27 +4,36 @@ from os import environ
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import logging
|
import logging
|
||||||
from iottb.subcommands.add_device import setup_init_device_root_parser
|
from iottb.subcommands.add_device import setup_init_device_root_parser
|
||||||
from iottb.subcommands.capture import setup_capture_parser
|
# from iottb.subcommands.capture import setup_capture_parser
|
||||||
|
from iottb.subcommands.sniff import setup_sniff_parser
|
||||||
from iottb.utils.tcpdump_utils import list_interfaces
|
from iottb.utils.tcpdump_utils import list_interfaces
|
||||||
from iottb.definitions import IOTTB_HOME_ABS, ReturnCodes
|
from iottb.definitions import IOTTB_HOME_ABS, ReturnCodes
|
||||||
from iottb.logger import setup_logging
|
from iottb.logger import setup_logging
|
||||||
|
|
||||||
logger = logging.getLogger('iottbLogger.__main__')
|
logger = logging.getLogger('iottbLogger.__main__')
|
||||||
logger.setLevel(logging.DEBUG)
|
logger.setLevel(logging.DEBUG)
|
||||||
|
|
||||||
|
|
||||||
######################
|
######################
|
||||||
# Argparse setup
|
# Argparse setup
|
||||||
######################
|
######################
|
||||||
def setup_argparse():
|
def setup_argparse():
|
||||||
# create top level parser
|
# create top level parser
|
||||||
root_parser = argparse.ArgumentParser(prog='iottb')
|
root_parser = argparse.ArgumentParser(prog='iottb')
|
||||||
subparsers = root_parser.add_subparsers(title='subcommands', required=True, dest='command')
|
|
||||||
|
|
||||||
# shared options
|
# shared options
|
||||||
root_parser.add_argument('--verbose', '-v', action='count', default=0)
|
root_parser.add_argument('--verbose', '-v', action='count', default=0)
|
||||||
# configure subcommands
|
|
||||||
setup_capture_parser(subparsers)
|
|
||||||
setup_init_device_root_parser(subparsers)
|
|
||||||
|
|
||||||
|
# Group of args w.r.t iottb.db creation
|
||||||
|
group = root_parser.add_argument_group('database options')
|
||||||
|
group.add_argument('--db-home', default=Path.home() / 'IoTtb.db')
|
||||||
|
group.add_argument('--config-home', default=Path.home() / '.config' / 'iottb.conf', type=Path, )
|
||||||
|
group.add_argument('--user', default=Path.home().stem, type=Path, )
|
||||||
|
|
||||||
|
# configure subcommands
|
||||||
|
subparsers = root_parser.add_subparsers(title='subcommands', required=True, dest='command')
|
||||||
|
# setup_capture_parser(subparsers)
|
||||||
|
setup_init_device_root_parser(subparsers)
|
||||||
|
setup_sniff_parser(subparsers)
|
||||||
# Utility to list interfaces directly with iottb instead of relying on external tooling
|
# Utility to list interfaces directly with iottb instead of relying on external tooling
|
||||||
|
|
||||||
interfaces_parser = subparsers.add_parser('list-interfaces', aliases=['li', 'if'],
|
interfaces_parser = subparsers.add_parser('list-interfaces', aliases=['li', 'if'],
|
||||||
@ -34,38 +43,44 @@ def setup_argparse():
|
|||||||
return root_parser
|
return root_parser
|
||||||
|
|
||||||
|
|
||||||
def check_iottb_env():
|
###
|
||||||
# This makes the option '--root-dir' obsolescent # TODO How to streamline this?\
|
# Where put ?!
|
||||||
|
###
|
||||||
|
class IoTdb:
|
||||||
|
def __init__(self, db_home=Path.home() / 'IoTtb.db', iottb_config=Path.home() / '.conf' / 'iottb.conf',
|
||||||
|
user=Path.home().stem):
|
||||||
|
self.db_home = db_home
|
||||||
|
self.config_home = iottb_config
|
||||||
|
self.default_filters_home = db_home / 'default_filters'
|
||||||
|
self.user = user
|
||||||
|
|
||||||
|
def create_db(self, mode=0o777, parents=False, exist_ok=False):
|
||||||
|
logger.info(f'Creating db at {self.db_home}')
|
||||||
try:
|
try:
|
||||||
iottb_home = environ['IOTTB_HOME'] # TODO WARN implicit declaration of env var name!
|
self.db_home.mkdir(mode=mode, parents=parents, exist_ok=exist_ok)
|
||||||
except KeyError:
|
except FileExistsError:
|
||||||
logger.error(f"Environment variable 'IOTTB_HOME' is not set."
|
logger.error(f'Database path already at {self.db_home} exists and is not a directory')
|
||||||
f"Setting environment variable 'IOTTB_HOME' to '~/{IOTTB_HOME_ABS}'")
|
|
||||||
environ['IOTTB_HOME'] = IOTTB_HOME_ABS
|
|
||||||
finally:
|
finally:
|
||||||
if not Path(IOTTB_HOME_ABS).exists():
|
logger.debug(f'Leaving finally clause in create_db')
|
||||||
print(f'"{IOTTB_HOME_ABS}" does not exist.')
|
|
||||||
response = input('Do you want to create it now? [y/N]')
|
def create_device_tree(self, mode=0o777, parents=False, exist_ok=False):
|
||||||
logger.debug(f'response: {response}')
|
logger.info(f'Creating device tree at {self.db_home / 'devices'}')
|
||||||
if response.lower() != 'y':
|
|
||||||
logger.debug(f'Not setting "IOTTB_HOME"')
|
def parse_db_config(self):
|
||||||
print('TODO')
|
pass
|
||||||
print("Aborting execution...")
|
|
||||||
return ReturnCodes.ABORTED
|
def parse_iottb_config(self):
|
||||||
else:
|
pass
|
||||||
print(f'Setting environment variable IOTTB_HOME""')
|
|
||||||
Path(IOTTB_HOME_ABS).mkdir(parents=True,
|
def get_known_devices(self):
|
||||||
exist_ok=False) # Should always work since in 'not exist' code path
|
pass
|
||||||
return ReturnCodes.SUCCESS
|
|
||||||
logger.info(f'"{IOTTB_HOME_ABS}" exists.')
|
|
||||||
# TODO: Check that it is a valid iottb dir or can we say it is valid by definition if?
|
def iottb_db_exists(db_home=Path.home() / 'IoTtb.db'):
|
||||||
return ReturnCodes.SUCCESS
|
res = db_home.is_dir()
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
if check_iottb_env() != ReturnCodes.SUCCESS:
|
|
||||||
exit(ReturnCodes.ABORTED)
|
|
||||||
|
|
||||||
logger.debug(f'Pre setup_argparse()')
|
logger.debug(f'Pre setup_argparse()')
|
||||||
parser = setup_argparse()
|
parser = setup_argparse()
|
||||||
logger.debug('Post setup_argparse().')
|
logger.debug('Post setup_argparse().')
|
||||||
|
|||||||
@ -1,10 +0,0 @@
|
|||||||
import subprocess
|
|
||||||
import logging
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
logging.basicConfig(level=logging.DEBUG)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
@ -1,4 +1,57 @@
|
|||||||
import subprocess
|
import subprocess
|
||||||
|
import logging
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger('iottbLogger.capture')
|
||||||
|
logger.setLevel(logging.DEBUG)
|
||||||
|
class Sniffer:
|
||||||
|
def __init__(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def setup_sniff_parser(subparsers):
|
||||||
|
parser = subparsers.add_parser('sniff', help='Sniff packets with tcpdump')
|
||||||
|
# metadata args
|
||||||
|
parser.add_argument('-a', '--addr', help='IP or MAC address of IoT device')
|
||||||
|
# tcpdump args
|
||||||
|
parser.add_argument('--app', help='Application name to sniff', default=None)
|
||||||
|
|
||||||
|
parser_sniff_tcpdump = parser.add_argument_group('tcpdump arguments')
|
||||||
|
|
||||||
|
parser_sniff_tcpdump.add_argument('-i', '--interface', help='Interface to capture on.', dest='capture_interface',
|
||||||
|
required=True)
|
||||||
|
parser_sniff_tcpdump.add_argument('-I', '--monitor-mode', help='Put interface into monitor mode',
|
||||||
|
action='store_true')
|
||||||
|
parser_sniff_tcpdump.add_argument('-n', help='Deactivate name resolution. True by default.',
|
||||||
|
action='store_true', dest='no_name_resolution')
|
||||||
|
parser_sniff_tcpdump.add_argument('-#', '--number',
|
||||||
|
help='Print packet number at beginning of line. True by default.',
|
||||||
|
action='store_true')
|
||||||
|
parser_sniff_tcpdump.add_argument('-e', help='Print link layer headers. True by default.',
|
||||||
|
action='store_true', dest='print_link_layer')
|
||||||
|
parser_sniff_tcpdump.add_argument('-t', action='count', default=0,
|
||||||
|
help='Please see tcpdump manual for details. Unused by default.')
|
||||||
|
|
||||||
|
cap_size_group = parser.add_mutually_exclusive_group(required=False)
|
||||||
|
cap_size_group.add_argument('-c', '--count', type=int, help='Number of packets to capture.', default=10)
|
||||||
|
cap_size_group.add_argument('--mins', type=int, help='Time in minutes to capture.', default=1)
|
||||||
|
|
||||||
|
parser.set_defaults(func=sniff)
|
||||||
|
|
||||||
|
|
||||||
|
def parse_addr(addr):
|
||||||
|
#TODO Implement
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def sniff(args):
|
||||||
|
if args.addr is None:
|
||||||
|
print('You must supply either a MAC or IP(v4) address to use this tool!')
|
||||||
|
logger.info("Exiting on account of missing MAC/IP.")
|
||||||
|
exit(1)
|
||||||
|
else:
|
||||||
|
(type, value) = parse_addr(args.addr)
|
||||||
|
#TODO Get this party started
|
||||||
|
|
||||||
def sniff_tcpdump(args, filter):
|
def sniff_tcpdump(args, filter):
|
||||||
pass
|
pass
|
||||||
|
|||||||
7
notes/scrible.py
Normal file
7
notes/scrible.py
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
class Config:
|
||||||
|
db_dir = Path.home()
|
||||||
|
app_config_dir = Path.home /.Config
|
||||||
|
db_name = 'IoTtb.db'
|
||||||
|
app_config_name = 'iottb.conf'
|
||||||
|
|
||||||
|
|
||||||
Loading…
x
Reference in New Issue
Block a user