From b79ee792b69e9eba73d74c6196dbf75accfb2423 Mon Sep 17 00:00:00 2001 From: Sebastian Lenzlinger <74497638+sebaschi@users.noreply.github.com> Date: Sun, 3 Dec 2023 21:40:29 +0100 Subject: [PATCH] Preliminary: Completed Integration Scripts. Add scripts that load data into a database of choice. Some config is still manual. --- src/data_utils.py | 2 +- src/fill_db.py | 80 +++++++++++++++++++++++++---------- src/integrate.py | 11 +++-- src/load_accidents_into_db.sh | 15 +++++++ src/load_csvs_into_db.sql | 7 +++ src/prepare_for_db.py | 3 -- src/setup_tables.sql | 21 +++------ 7 files changed, 96 insertions(+), 43 deletions(-) create mode 100644 src/load_accidents_into_db.sh create mode 100644 src/load_csvs_into_db.sql delete mode 100644 src/prepare_for_db.py diff --git a/src/data_utils.py b/src/data_utils.py index 1dc7109..fa51384 100644 --- a/src/data_utils.py +++ b/src/data_utils.py @@ -6,7 +6,7 @@ import geopandas as gpd from concurrent.futures import ThreadPoolExecutor as tpe import logging -logging.basicConfig(level=logging.DEBUG, filename='data_utils.log', format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logging.basicConfig(level=logging.DEBUG, filename='logs/data_utils.log', format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger('data_utils.py') stream_handler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') diff --git a/src/fill_db.py b/src/fill_db.py index ec2b333..50f6d4c 100644 --- a/src/fill_db.py +++ b/src/fill_db.py @@ -1,38 +1,74 @@ -import os - -import pandas as pd +import logging import psycopg2 -from psycopg2 import sql +import subprocess + +logging.basicConfig(level=logging.DEBUG, filename='logs/fill_db.log', + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger('fill_db.py') +stream_handler = logging.StreamHandler() +formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +stream_handler.setFormatter(formatter) +logger.addHandler(stream_handler) integrated_dir = 'datasets/integrated/' +accident_geojson_file = 'datasets/integrated/Accidents.geojson' +accident_loader_script = 'load_accidents_into_db.sh' +accident_table_name = 'accidents' -# Set up info needed to connect to db db_info = { 'host': 'localhost', 'database': 'test-db23', 'port': '5432', 'user': 'seb', 'password': '', - 'sslmode': 'disable' } - -csv_table_maps = [ - {'file': os.path.join(integrated_dir, 'FootBikeCount.csv'), 'table': 'FootBikeCount'}, - {'file': os.path.join(integrated_dir, 'MivCount.csv'), 'table': 'MivCount'} -] - -db_connection = psycopg2.connect(**db_info) +setup_tables_script = 'setup_tables.sql' +load_csvs_into_db_script = 'load_csvs_into_db.sql' -def csv_to_existing_table(csv_file_path, table_name): - df = pd.read_csv(csv_file_path) - curs = db_connection.cursor() - df.to_sql(table_name, db_connection, if_exists='append', index_label=False) - db_connection.commit() - curs.close() + +def run_sql(script, db_info): + db_connection = psycopg2.connect(**db_info) + db_cursor = db_connection.cursor() + + with open(script, 'r') as sql_file: + sql_script = sql_file.read() + + try: + db_cursor.execute(sql_script) + db_connection.commit() + logger.info(f'{script} executed successfully') + except Exception as e: + db_connection.rollback() + logger.exception(f'Error executing {sql_script}: {e}') + finally: + db_cursor.close() + db_connection.close() -for i in csv_table_maps: - csv_to_existing_table(i['file'], i['table']) +def run_geojson_loader_script(script, *args): + + try: + cmd = ['bash', script] + list(args) + res = subprocess.run(cmd, check=True, text=True, capture_output=True) + logger.info(f'{script} executed successfully. Output: {res.stdout}') + except subprocess.CalledProcessError as e: + logger.exception(f'Error executing {script}: {e}') + logger.info(f"Remember to set the correct permissions for the script: chmod +x {script}") + + +if __name__ == '__main__': + run_sql(setup_tables_script, db_info) + logger.info("Finnished setting up tables.") + run_sql(load_csvs_into_db_script, db_info) + logger.info("Finnished loading csv into db.") + run_geojson_loader_script(accident_loader_script, + accident_geojson_file, + db_info['database'], + db_info['user'], + db_info['password'], + db_info['host'], + db_info['port'], + accident_table_name) + logger.info('Finished loading geojson into db using bash script.') -db_connection.close() diff --git a/src/integrate.py b/src/integrate.py index c025832..8c85bd3 100644 --- a/src/integrate.py +++ b/src/integrate.py @@ -8,7 +8,7 @@ import re import logging -logging.basicConfig(level=logging.DEBUG, filename='integrate.log', +logging.basicConfig(level=logging.DEBUG, filename='logs/integrate.log', format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger('integrate.py') stream_handler = logging.StreamHandler() @@ -27,6 +27,7 @@ accident_file_u_string = 'RoadTrafficAccidentLocations.json' data_dir = 'datasets/' integrated_dir = 'datasets/integrated/' +logs_dir = 'logs/' weekday_names = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday'] @@ -91,6 +92,8 @@ def ensure_dirs_exist(data_dir, integrated_dir): logger.debug("data_dir created.") os.makedirs(integrated_dir, exist_ok=True) logger.debug("integrated_dir created") + os.makedirs(logs_dir, exist_ok=True) + logger.debug("logs_dir created") def process_foot_bike_data(files_present=True): @@ -181,6 +184,8 @@ def process_all_data_sources(fb_present=True, miv_present=True, accident_present miv_to_integrated_csv(miv_present) + acc_to_cleaned_geojson(accident_present) + def fb_to_integrated(files_present=True): @@ -229,6 +234,6 @@ def acc_to_cleaned_geojson(acc_present=True): if __name__ == '__main__': - # process_all_data_sources(True, True, True) + process_all_data_sources(True, True, True) # miv_to_integrated_csv() - acc_to_cleaned_geojson() + # acc_to_cleaned_geojson() diff --git a/src/load_accidents_into_db.sh b/src/load_accidents_into_db.sh new file mode 100644 index 0000000..9593d63 --- /dev/null +++ b/src/load_accidents_into_db.sh @@ -0,0 +1,15 @@ +#!/bin/bash + +# Define parameters +GEOJSON_FILE=$1 +DB_NAME=$2 +DB_USER=$3 +DB_PASSWORD=$4 +DB_HOST=$5 +DB_PORT=$6 +TARGET_TABLE=$7 + +# Run ogr2ogr command +ogr2ogr -f "PostgreSQL" PG:"dbname='$DB_NAME' host='$DB_HOST' port='$DB_PORT' user='$DB_USER' password='$DB_PASSWORD'" "$GEOJSON_FILE" -nln $TARGET_TABLE -append + +echo "GeoJSON data has been imported into $TARGET_TABLE" diff --git a/src/load_csvs_into_db.sql b/src/load_csvs_into_db.sql new file mode 100644 index 0000000..77ceb25 --- /dev/null +++ b/src/load_csvs_into_db.sql @@ -0,0 +1,7 @@ +COPY FootBikeCount FROM '/Users/seb/Projects/repos/group-1/src/datasets/integrated/FootBikeCount.csv' + DELIMITER ',' + CSV HEADER; + +COPY MivCount FROM '/Users/seb/Projects/repos/group-1/src/datasets/integrated/MivCount.csv' + DELIMITER ',' + CSV HEADER; \ No newline at end of file diff --git a/src/prepare_for_db.py b/src/prepare_for_db.py deleted file mode 100644 index 499d1d6..0000000 --- a/src/prepare_for_db.py +++ /dev/null @@ -1,3 +0,0 @@ -import data_utils - - diff --git a/src/setup_tables.sql b/src/setup_tables.sql index 69f012a..3a9881e 100644 --- a/src/setup_tables.sql +++ b/src/setup_tables.sql @@ -2,6 +2,11 @@ CREATE EXTENSION IF NOT EXISTS postgis; DROP TABLE IF EXISTS FootBikeCount; +DROP TABLE IF EXISTS Accidents; + +DROP TABLE IF EXISTS MivCount; + + CREATE TABLE FootBikeCount ( ID INTEGER , NORD INTEGER , @@ -21,7 +26,7 @@ CREATE TABLE FootBikeCount ( ); -DROP TABLE IF EXISTS MivCount; + CREATE TABLE MivCount ( ID INTEGER , @@ -43,8 +48,6 @@ CREATE TABLE MivCount ( ); -DROP TABLE IF EXISTS Accidents; - CREATE TABLE Accidents ( AccidentUID VARCHAR(256) , AccidentYear INTEGER , @@ -66,14 +69,4 @@ CREATE TABLE Accidents ( PRIMARY KEY (AccidentUID) , CHECK ( AccidentHour BETWEEN 0 AND 23) , CHECK (AccidentWeekDay_en IN ('Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday')) -); - -COPY FootBikeCount FROM '/Users/seb/Projects/repos/group-1/src/datasets/integrated/FootBikeCount.csv' - DELIMITER ',' - CSV HEADER; - -COPY MivCount FROM '/Users/seb/Projects/repos/group-1/src/datasets/integrated/MivCount.csv' - DELIMITER ',' - CSV HEADER; - -COPY Accidents FROM '/Users/seb/Projects/repos/group-1/src/datasets/integrated/Accidents.geojson' WITH (FORMAT 'geojson'); +); \ No newline at end of file