Refactor userspace detector.

This commit is contained in:
Sebastian Lenzlinger 2023-06-07 00:51:07 +02:00
parent dba2deb1e5
commit e942206b12
5 changed files with 378 additions and 57 deletions

1
.gitignore vendored
View File

@ -1,4 +1,5 @@
__pycache__/
*.pdf *.pdf
/docs/research/ /docs/research/
/build/ /build/

View File

@ -4,43 +4,41 @@ import os
CONFIG_FILE = 'config.json' CONFIG_FILE = 'config.json'
def load_config(): def load_config():
"""
Load the configuration from the JSON file or create a new one if it doesn't exist
Returns:
dict: The configuration data
"""
config = {} config = {}
# Check if the configuration file exists
if os.path.exists(CONFIG_FILE): if os.path.exists(CONFIG_FILE):
try: try:
with open(CONFIG_FILE, 'r') as file: with open(CONFIG_FILE, 'r') as file:
config = json.load(file) config = json.load(file)
except (IOError, json.JSONDecodeError) as e: except:
print(f"Error loading configuration: {e}") print("[-] Error: Failed to load config file")
else:
config = {
'white_listed_programs': [],
'auto_kill_programs': [],
'kbd_names': ['kbd']
}
save_config(config)
return config return config
def save_config(config): def save_config(config):
"""
Save the configuration to the JSON file
Args:
config (dict): The configuration data
"""
try: try:
with open(CONFIG_FILE, 'w') as file: with open(CONFIG_FILE, 'w') as file:
json.dump(config, file, indent=4) json.dump(config, file, indent=4)
except IOError as e: except:
print(f"Error saving configuration: {e}") print("[-] Error: Failed to save config file")
# Load the configuration
config_data = load_config()
# Access and modify the settings
whitelist = config_data.get('whitelist', [])
autokill_list = config_data.get('autokill_list', [])
other_setting = config_data.get('other_setting')
# Add a process to the whitelist
whitelist.append(9999)
# Remove a process from the autokill list
if 1234 in autokill_list:
autokill_list.remove(1234)
# Modify the other_setting value
config_data['other_setting'] = 'new_value'
# Save the modified configuration back to the JSON file
save_config(config_data)

240
src/keylogger_detector.py Executable file
View File

@ -0,0 +1,240 @@
#!/usr/bin/env python3
import sys
from config import CONFIG_FILE, load_config, save_config
from utils import (
check_root,
check_packages,
get_keyboard_device_files,
get_real_path,
get_pids_using_file,
get_process_name,
kill_processes
)
# Global variables
auto_kill_option = False
verbose_option = False
safe_option = False
# Functions
def print_help():
print('Usage: python3 keylogger_detector.py [OPTIONS]')
print('Options:')
print(' -h, --help\t\t\tPrint this help message')
print(' -v, --verbose\t\t\tVerbose mode. Will cause additional information to be printed during execution')
print(' -a, --auto-kill\t\tAutomatically kill blacklisted processes')
print(' -s, --safe\t\t\tSafe mode. Asked to confirm before killing a process')
def set_input_options():
"""
Set input options based on command line arguments
Invalid arguments are ignored
Raises:
SystemExit: If -h or --help is passed as an argument, the help message is printed and the program exits
"""
global auto_kill_option, verbose_option, safe_option
if len(sys.argv) > 1:
for arg in sys.argv[1:]:
if arg == '-h' or arg == '--help':
print_help()
sys.exit(0)
elif arg == '-v' or arg == '--verbose':
verbose_option = True
elif arg == '-a' or arg == '--auto-kill':
auto_kill_option = True
elif arg == '-s' or arg == '--safe':
safe_option = True
def confirm_kill_procces(process_name, times=0):
"""
Prompt the user to confirm to kill a process.
Should be used only in safe mode.
Args:
process_name (str): Name of the process to kill
times (int) : Number of times prompt has been displayed but neither y nor n where given. Defaults to 0.
Use to limit promt attempts.
Returns:
bool: True if user confirms the kill, False otherwise.
Raises:
SystemExit: If the user has given invalid input more than 5 times, the program exits.
"""
if times > 5:
print('Too many invalid inputs. Exiting.')
sys.exit(1)
if times > 0:
print('Invalid input. Please enter y or n.')
print(f'Do you want to kill {process_name}? (y/n)')
answer = input()
if answer == 'y':
return True
elif answer == 'n':
return False
else:
return confirm_kill_procces(process_name, times+1)
def detect_keyloggers():
"""
Detect (userland) keylogger processes based on which processes have a keyboard file open (/dev/input/event*)
The main function of the program.
Will attempt to detect keyloggers based on the config file, command line arguments and user input.
Here the control flow and logic of the program are defined.
"""
############################
# 1. Setup and initialization
############################
global auto_kill_option, verbose_option, safe_option
global CONFIG_FILE
set_input_options()
if verbose_option:
print('[Verbose] Input options set')
check_root()
if verbose_option:
print('[Verbose] Root access checked')
check_packages()
if verbose_option:
print('[Verbose] Packages checked')
config = load_config()
if verbose_option:
print('[Verbose] Config file loaded')
white_listed_programs = config['white_listed_programs']
auto_kill_programs = config['auto_kill_programs']
kbd_names = config['kbd_names']
if verbose_option:
print('[Verbose] Config file parsed')
############################
# 2. Get device files mapped to keyboard
############################
keyboard_device_files = get_keyboard_device_files(kbd_names)
if verbose_option:
print('[Verbose] Keyboard device files found:', keyboard_device_files)
############################
# 3. Get pids using keyboard device files
############################
pids = []
for device_file in keyboard_device_files:
pids.append(get_pids_using_file(device_file))
pids = sorted(list(set(pids)))
if verbose_option:
print('[Verbose] Process IDs using keyboard device files:', pids)
############################
# 4. Get process names using pids
############################
process_names = []
name_pid_dict = {}
for pid in pids:
name = get_process_name(pid)
process_names.append(name)
name_pid_dict[name].add(pid)
process_names = sorted(list(set(process_names)))
if verbose_option:
print('[Verbose] Process names using keyboard device files:', process_names)
############################
# 5.If auto_kill option is set, kill auto-killable processes
############################
if auto_kill_option:
for name in process_names:
if name in auto_kill_programs:
if verbose_option:
print('[Verbose] Auto-killable process found:', name)
if safe_option:
if confirm_kill_procces(name):
kill_process(name_pid_dict[name])
else:
kill_process(name_pid_dict[name])
############################
# 6. Identify suspicious processes, i.e. those not whitelisted
############################
suspicious_processes = []
for name in process_names:
if name not in white_listed_programs:
suspicious_processes.append(name)
if verbose_option:
print('[Verbose] Suspicious processes found:', suspicious_processes)
print('[Verbose] Suspicious processes not killed:', [name for name in suspicious_processes if name not in auto_kill_programs])
print('[Verbose] Suspicious processes killed:', [name for name in suspicious_processes if name in auto_kill_programs])
############################
# 6.1 If no suspicious processes are found, exit
############################
if len(suspicious_processes) == 0:
print("[+] No suspicious processes found")
sys.exit(0)
############################
# 7. Prompt user to chose which processes (not covered by auto kill if set) to kill
############################
print('[-]The following suspicious processes were found:')
for name in suspicious_processes:
print(f'\t{name}')
print('Please enter the names of the processes to kill, separated by a space.')
print('To not kill any just hit enter.')
if safe_option:
print('[Info] You are in safe mode. In safe mode you will be asked to confirm each kill.')
else:
print('[Info] Please be aware that killing an important process may cause your system to crash.')
to_kill = input().split()
if len(to_kill) == 0:
print('[+] No processes killed.')
sys.exit(0)
if verbose_option:
print('[Verbose] Processes to kill:', to_kill)
if safe_option:
for name in to_kill:
for pid in name_pid_dict[name]:
if confirm_kill_procces(name):
kill_process(id)
if verbose_option:
print('[Verbose] Process killed:', name)
else:
for name in to_kill:
for pid in name_pid_dict[name]:
kill_process(id)
if verbose_option:
print('[Verbose] Process killed:', name)
to_kill = list(set(to_kill))
auto_kill_programs = list(set(auto_kill_programs)).append(to_kill)
config['auto_kill_programs'] = auto_kill_programs
white_listed_programs = list(set(white_listed_programs))
config['white_listed_programs'] = white_listed_programs
kbd_names = list(set(kbd_names))
config['kbd_names'] = kbd_names
save_config(config, CONFIG_FILE)
if verbose_option:
print('[Verbose] Config file saved')
if __name__ == '__main__':
detect_keyloggers()

View File

@ -13,37 +13,6 @@ auto_kill_option = False
verbose_option = False verbose_option = False
safe_option = False safe_option = False
# Load Configurations
def load_config():
config = {}
# Check if file exists
if os.path.exists(CONFIG_FILE):
try:
with open(CONFIG_FILE, 'r') as file:
config = json.load(file)
except:
print("[-] Error: Failed to load config file")
else:
config = {
'white_listed_programs': [],
'auto_kill_programs': [],
'kbd_names': ['kbd']
}
save_config(config) # Save the default configuration
return config
# Save new configurations to json file
def save_config(config):
try:
with open(CONFIG_FILE, 'w') as file:
json.dump(config, file)
except IOError as e:
print(f"[-] Error! Failed to save config file: {e}")
# Check if the user is in sudo mode # Check if the user is in sudo mode
def check_sudo(): def check_sudo():
if os.geteuid() != 0: if os.geteuid() != 0:

113
src/utils.py Executable file
View File

@ -0,0 +1,113 @@
import os # for path operations, getuid, kill
import subprocess # for executing shell commands
import signal # for sending signals to processes
import sys # for exit
def check_root():
"""
Check if script is run as root(sudo).
Raises:
SystemExit: If not run as root.
"""
if os.getuid() != 0:
print("[-] Please run as root.")
sys.exit(1)
def check_packages():
"""
Check if all required packages are installed.
Raises:
SystemExit: If any packges is missing.
"""
packages = ['fuser', 'which']
missing_packages = []
for package in packages:
if subprocess.call(['which', package]) != 0:
missing_packages.append(package)
if len(missing_packages) > 0:
print("[-] Missing packages: {}".format(', '.join(missing_packages)))
sys.exit(1)
def get_keyboard_device_files(names):
"""
Get paths corresponding to keyboard device files by searching /dev/input/by-path.
Uses get_real_path() to resolve symlinks.
Args:
names (list): List of strings to use for searching. e.g.['kbd']
Returns:
str: Path to keyboard device file.
"""
keyboard_device_files = []
for root, dirs, files in os.walk('/dev/input/by-path'):
for file in files:
if any(name in files for name in names):
keyboard_device_files.append(get_real_path(os.path.join(root, file)))
return keyboard_device_files
def get_real_path(path):
"""
Resolve a path of a file.
Args:
path (str): Path to a file. Possibly a symlink.
Returns:
str: The resolved (real) path.
"""
if os.path.islink(path):
return os.path.realpath(path)
else:
return path
def get_pids_using_file(path):
"""
Get all process IDs using a file. (Essentially a wrapper for fuser.)
Args:
path (str): Path to a file. Usually /dev/input/eventX.
Returns:
list: List of process IDs.
Raises:
SystemExit: If fuser fails to run.
"""
try:
pids = subprocess.check_output(['fuser', path]).decode('utf-8').split()
except subprocess.CalledProcessError:
print("[-] Error: fuser failed to run on", path)
sys.exit(1)
return pids
def get_process_name(pid):
"""
Get the name of a process.
Args:
pid (int): Process ID.
Returns:
str: Name of the process.
"""
with open('/proc/{}/comm'.format(pid)) as f:
return f.read().strip()
def kill_processes(pids):
"""
Kill processes.
Args:
pids (list): List of process IDs.
"""
for pid in pids:
try:
os.kill(int(pid), signal.SIGKILL)
except ProcessLookupError:
print("[-] Process {} not found.".format(pid))