Preliminary: Completed Integration Scripts. Add scripts that load data into a database of choice. Some config is still manual.

This commit is contained in:
Sebastian Lenzlinger 2023-12-03 21:40:29 +01:00
parent fcfb3f028b
commit b79ee792b6
7 changed files with 96 additions and 43 deletions

View File

@ -6,7 +6,7 @@ import geopandas as gpd
from concurrent.futures import ThreadPoolExecutor as tpe from concurrent.futures import ThreadPoolExecutor as tpe
import logging import logging
logging.basicConfig(level=logging.DEBUG, filename='data_utils.log', format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.basicConfig(level=logging.DEBUG, filename='logs/data_utils.log', format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('data_utils.py') logger = logging.getLogger('data_utils.py')
stream_handler = logging.StreamHandler() stream_handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

View File

@ -1,38 +1,74 @@
import os import logging
import pandas as pd
import psycopg2 import psycopg2
from psycopg2 import sql import subprocess
logging.basicConfig(level=logging.DEBUG, filename='logs/fill_db.log',
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('fill_db.py')
stream_handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
integrated_dir = 'datasets/integrated/' integrated_dir = 'datasets/integrated/'
accident_geojson_file = 'datasets/integrated/Accidents.geojson'
accident_loader_script = 'load_accidents_into_db.sh'
accident_table_name = 'accidents'
# Set up info needed to connect to db
db_info = { db_info = {
'host': 'localhost', 'host': 'localhost',
'database': 'test-db23', 'database': 'test-db23',
'port': '5432', 'port': '5432',
'user': 'seb', 'user': 'seb',
'password': '', 'password': '',
'sslmode': 'disable'
} }
setup_tables_script = 'setup_tables.sql'
csv_table_maps = [ load_csvs_into_db_script = 'load_csvs_into_db.sql'
{'file': os.path.join(integrated_dir, 'FootBikeCount.csv'), 'table': 'FootBikeCount'},
{'file': os.path.join(integrated_dir, 'MivCount.csv'), 'table': 'MivCount'}
]
db_connection = psycopg2.connect(**db_info)
def csv_to_existing_table(csv_file_path, table_name):
df = pd.read_csv(csv_file_path) def run_sql(script, db_info):
curs = db_connection.cursor() db_connection = psycopg2.connect(**db_info)
df.to_sql(table_name, db_connection, if_exists='append', index_label=False) db_cursor = db_connection.cursor()
db_connection.commit()
curs.close() with open(script, 'r') as sql_file:
sql_script = sql_file.read()
try:
db_cursor.execute(sql_script)
db_connection.commit()
logger.info(f'{script} executed successfully')
except Exception as e:
db_connection.rollback()
logger.exception(f'Error executing {sql_script}: {e}')
finally:
db_cursor.close()
db_connection.close()
for i in csv_table_maps: def run_geojson_loader_script(script, *args):
csv_to_existing_table(i['file'], i['table'])
try:
cmd = ['bash', script] + list(args)
res = subprocess.run(cmd, check=True, text=True, capture_output=True)
logger.info(f'{script} executed successfully. Output: {res.stdout}')
except subprocess.CalledProcessError as e:
logger.exception(f'Error executing {script}: {e}')
logger.info(f"Remember to set the correct permissions for the script: chmod +x {script}")
if __name__ == '__main__':
run_sql(setup_tables_script, db_info)
logger.info("Finnished setting up tables.")
run_sql(load_csvs_into_db_script, db_info)
logger.info("Finnished loading csv into db.")
run_geojson_loader_script(accident_loader_script,
accident_geojson_file,
db_info['database'],
db_info['user'],
db_info['password'],
db_info['host'],
db_info['port'],
accident_table_name)
logger.info('Finished loading geojson into db using bash script.')
db_connection.close()

View File

@ -8,7 +8,7 @@ import re
import logging import logging
logging.basicConfig(level=logging.DEBUG, filename='integrate.log', logging.basicConfig(level=logging.DEBUG, filename='logs/integrate.log',
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('integrate.py') logger = logging.getLogger('integrate.py')
stream_handler = logging.StreamHandler() stream_handler = logging.StreamHandler()
@ -27,6 +27,7 @@ accident_file_u_string = 'RoadTrafficAccidentLocations.json'
data_dir = 'datasets/' data_dir = 'datasets/'
integrated_dir = 'datasets/integrated/' integrated_dir = 'datasets/integrated/'
logs_dir = 'logs/'
weekday_names = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday'] weekday_names = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
@ -91,6 +92,8 @@ def ensure_dirs_exist(data_dir, integrated_dir):
logger.debug("data_dir created.") logger.debug("data_dir created.")
os.makedirs(integrated_dir, exist_ok=True) os.makedirs(integrated_dir, exist_ok=True)
logger.debug("integrated_dir created") logger.debug("integrated_dir created")
os.makedirs(logs_dir, exist_ok=True)
logger.debug("logs_dir created")
def process_foot_bike_data(files_present=True): def process_foot_bike_data(files_present=True):
@ -181,6 +184,8 @@ def process_all_data_sources(fb_present=True, miv_present=True, accident_present
miv_to_integrated_csv(miv_present) miv_to_integrated_csv(miv_present)
acc_to_cleaned_geojson(accident_present)
def fb_to_integrated(files_present=True): def fb_to_integrated(files_present=True):
@ -229,6 +234,6 @@ def acc_to_cleaned_geojson(acc_present=True):
if __name__ == '__main__': if __name__ == '__main__':
# process_all_data_sources(True, True, True) process_all_data_sources(True, True, True)
# miv_to_integrated_csv() # miv_to_integrated_csv()
acc_to_cleaned_geojson() # acc_to_cleaned_geojson()

View File

@ -0,0 +1,15 @@
#!/bin/bash
# Define parameters
GEOJSON_FILE=$1
DB_NAME=$2
DB_USER=$3
DB_PASSWORD=$4
DB_HOST=$5
DB_PORT=$6
TARGET_TABLE=$7
# Run ogr2ogr command
ogr2ogr -f "PostgreSQL" PG:"dbname='$DB_NAME' host='$DB_HOST' port='$DB_PORT' user='$DB_USER' password='$DB_PASSWORD'" "$GEOJSON_FILE" -nln $TARGET_TABLE -append
echo "GeoJSON data has been imported into $TARGET_TABLE"

View File

@ -0,0 +1,7 @@
COPY FootBikeCount FROM '/Users/seb/Projects/repos/group-1/src/datasets/integrated/FootBikeCount.csv'
DELIMITER ','
CSV HEADER;
COPY MivCount FROM '/Users/seb/Projects/repos/group-1/src/datasets/integrated/MivCount.csv'
DELIMITER ','
CSV HEADER;

View File

@ -1,3 +0,0 @@
import data_utils

View File

@ -2,6 +2,11 @@ CREATE EXTENSION IF NOT EXISTS postgis;
DROP TABLE IF EXISTS FootBikeCount; DROP TABLE IF EXISTS FootBikeCount;
DROP TABLE IF EXISTS Accidents;
DROP TABLE IF EXISTS MivCount;
CREATE TABLE FootBikeCount ( CREATE TABLE FootBikeCount (
ID INTEGER , ID INTEGER ,
NORD INTEGER , NORD INTEGER ,
@ -21,7 +26,7 @@ CREATE TABLE FootBikeCount (
); );
DROP TABLE IF EXISTS MivCount;
CREATE TABLE MivCount ( CREATE TABLE MivCount (
ID INTEGER , ID INTEGER ,
@ -43,8 +48,6 @@ CREATE TABLE MivCount (
); );
DROP TABLE IF EXISTS Accidents;
CREATE TABLE Accidents ( CREATE TABLE Accidents (
AccidentUID VARCHAR(256) , AccidentUID VARCHAR(256) ,
AccidentYear INTEGER , AccidentYear INTEGER ,
@ -66,14 +69,4 @@ CREATE TABLE Accidents (
PRIMARY KEY (AccidentUID) , PRIMARY KEY (AccidentUID) ,
CHECK ( AccidentHour BETWEEN 0 AND 23) , CHECK ( AccidentHour BETWEEN 0 AND 23) ,
CHECK (AccidentWeekDay_en IN ('Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday')) CHECK (AccidentWeekDay_en IN ('Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday'))
); );
COPY FootBikeCount FROM '/Users/seb/Projects/repos/group-1/src/datasets/integrated/FootBikeCount.csv'
DELIMITER ','
CSV HEADER;
COPY MivCount FROM '/Users/seb/Projects/repos/group-1/src/datasets/integrated/MivCount.csv'
DELIMITER ','
CSV HEADER;
COPY Accidents FROM '/Users/seb/Projects/repos/group-1/src/datasets/integrated/Accidents.geojson' WITH (FORMAT 'geojson');