Intermediate Commit

This commit is contained in:
Sebastian Lenzlinger 2023-12-03 20:46:54 +01:00
parent 45b4f7b998
commit fcfb3f028b
4 changed files with 76 additions and 33 deletions

View File

@ -8,11 +8,11 @@ integrated_dir = 'datasets/integrated/'
# Set up info needed to connect to db # Set up info needed to connect to db
db_info = { db_info = {
'host': '127.0.0.1', 'host': 'localhost',
'database': 'zh-traffic', 'database': 'test-db23',
'port': '54322', 'port': '5432',
'user': 'db23-db', 'user': 'seb',
'password': 'db23-project-role-PW@0', 'password': '',
'sslmode': 'disable' 'sslmode': 'disable'
} }

35
src/fill_db_alchemy.py Normal file
View File

@ -0,0 +1,35 @@
import os
import pandas as pd
from sqlalchemy import create_engine
integrated_dir = 'datasets/integrated/'
# Set up info needed to connect to db
db_info = {
'host': 'localhost',
'database': 'test-db23',
'port': '5432',
'user': 'seb',
'password': '',
}
csv_table_maps = [
{'file': os.path.join(integrated_dir, 'FootBikeCount.csv'), 'table': 'FootBikeCount'},
{'file': os.path.join(integrated_dir, 'MivCount.csv'), 'table': 'MivCount'}
]
# Create a SQLAlchemy engine
engine = create_engine(
f"postgresql://{db_info['user']}:{db_info['password']}@{db_info['host']}:{db_info['port']}/{db_info['database']}",
echo=True # Set echo to True to display SQL queries (optional)
)
def csv_to_existing_table(csv_file_path, table_name):
df = pd.read_csv(csv_file_path)
df.to_sql(table_name, engine, if_exists='append', index=False)
for i in csv_table_maps:
csv_to_existing_table(i['file'], i['table'])
# Close the SQLAlchemy engine
engine.dispose()

View File

@ -1,9 +1,10 @@
import data_utils as du import data_utils as du
import os import os
import pandas as pd import pandas as pd
from datetime import datetime import geopandas as gpd
import time import time
from shapely.geometry import Point from shapely.geometry import Point
import re
import logging import logging
@ -73,7 +74,7 @@ acc_data_types = {
'AccidentInvolvingMotorcycle': 'bool', 'AccidentInvolvingMotorcycle': 'bool',
'RoadType': 'str', 'RoadType': 'str',
'RoadType_en': 'str', 'RoadType_en': 'str',
'Geometry': 'str' # TODO: Figure out what dtype this needs to be for postgres 'geometry': 'str' # TODO: Figure out what dtype this needs to be for postgres
} }
@ -93,7 +94,8 @@ def ensure_dirs_exist(data_dir, integrated_dir):
def process_foot_bike_data(files_present=True): def process_foot_bike_data(files_present=True):
fb_df_unified = du.create_unified_df(foot_bike_urls_file, foot_bike_file_u_string, data_dir, files_present=files_present) fb_df_unified = du.create_unified_df(foot_bike_urls_file, foot_bike_file_u_string, data_dir,
files_present=files_present)
fb_df_unified[['DATE', "TIME"]] = fb_df_unified['DATUM'].str.split('T', expand=True) fb_df_unified[['DATE', "TIME"]] = fb_df_unified['DATUM'].str.split('T', expand=True)
fb_df_unified[['HRS', 'MINS']] = fb_df_unified['TIME'].str.split(':', expand=True) fb_df_unified[['HRS', 'MINS']] = fb_df_unified['TIME'].str.split(':', expand=True)
## Evt brauchen wir doch FK_ZAEHLER ## Evt brauchen wir doch FK_ZAEHLER
@ -152,7 +154,7 @@ def process_accident_data(file_present: bool = True):
'AccidentLocation_CHLV95_N', 'AccidentLocation_CHLV95_E', 'AccidentType_en', 'AccidentType', 'AccidentLocation_CHLV95_N', 'AccidentLocation_CHLV95_E', 'AccidentType_en', 'AccidentType',
'AccidentSeverityCategory', 'AccidentInvolvingPedestrian', 'AccidentInvolvingBicycle', 'AccidentSeverityCategory', 'AccidentInvolvingPedestrian', 'AccidentInvolvingBicycle',
'AccidentInvolvingMotorcycle', 'RoadType', 'RoadType_en', 'AccidentInvolvingMotorcycle', 'RoadType', 'RoadType_en',
'Geometry'] 'geometry']
cleaned_acc_df = acc_df_unified[acc_cols_to_keep] cleaned_acc_df = acc_df_unified[acc_cols_to_keep]
cleaned_acc_df.rename(columns={ cleaned_acc_df.rename(columns={
'AccidentLocation_CHLV95_E': 'EKoord', 'AccidentLocation_CHLV95_E': 'EKoord',
@ -168,32 +170,17 @@ def process_all_data_sources(fb_present=True, miv_present=True, accident_present
Process all data sources and turn them in to csv files. After this function is called there Process all data sources and turn them in to csv files. After this function is called there
should be csv files of the cleaned and integrated data sources should be csv files of the cleaned and integrated data sources
:param fb_present: bool, are the files present in local file system :param fb_present: bool, if the files present in local file system
:param miv_present: bool, are the files present in local file system :param miv_present: bool, if the files present in local file system
:param accident_present: bool, are the files present in local file system :param accident_present: bool, if the files present in local file system
:return: :return:
""" """
ensure_dirs_exist(data_dir, integrated_dir) ensure_dirs_exist(data_dir, integrated_dir)
logger.info("Started processing all data sources.") logger.info("Started processing all data sources.")
start_time = time.time() fb_to_integrated(fb_present)
logger.info("Start processing pedestrian and bicycle data (FootBikeCount)")
fb_count_df = process_foot_bike_data(fb_present) miv_to_integrated_csv(miv_present)
logger.debug(f'FB Head:{fb_count_df.head()}\n FB dtypes: {fb_count_df.dtypes}')
fb_file_path = os.path.join(integrated_dir, 'FootBikeCount.csv')
logger.debug(f'FB Cleaned File Path: {fb_file_path}')
fb_count_df.to_csv(fb_file_path, index=False)
logger.info("FB integrated csv created.")
logger.info(f'Time taken for FootBikeCount: {start_time-time.time()}')
start_time2 = time.time()
logger.info("Start processing motorized vehicle data (MivCount)")
miv_count_df = process_miv_data(miv_present)
logger.debug(f'MIV Head:{miv_count_df.head()}\n MIV dtypes: {miv_count_df.dtypes}')
miv_file_path = os.path.join(integrated_dir, 'MivCount.csv')
logger.debug(f'MIV Cleaned File Path: {miv_file_path}')
miv_count_df.to_csv(miv_file_path, index=False)
logger.info("MIV integrated csv created.")
logger.info(f'Time taken for MivCount: {start_time2-time.time()}')
def fb_to_integrated(files_present=True): def fb_to_integrated(files_present=True):
@ -223,6 +210,25 @@ def miv_to_integrated_csv(miv_present=True):
logger.info(f'Time taken for MivCount: {end_time-start_time2}') logger.info(f'Time taken for MivCount: {end_time-start_time2}')
def acc_to_cleaned_geojson(acc_present=True):
start_time3 = time.time()
logger.info("Start processing accident data (Accidents)")
acc_df = process_accident_data(acc_present)
logger.debug(f'ACC Head: { acc_df.head()}\n Acc dtypes: {acc_df.dtypes}')
acc_file_path = os.path.join(integrated_dir, 'Accidents.geojson')
logger.debug(f'Acc Cleaned file path: {acc_file_path}')
acc_df['geometry'] = acc_df['geometry'].apply(lambda row: re.findall(r"[-+]?\d*\.\d+|\d+", row))
# Create a Point object using the extracted coordinates
acc_df['geometry'] = acc_df['geometry'].apply(
lambda coords: Point(float(coords[0]), float(coords[1]), float(coords[2])))
acc_gdf = gpd.GeoDataFrame(acc_df, geometry='geometry')
acc_gdf.to_file(acc_file_path, driver='GeoJSON')
logger.info("ACC integrated csv created.")
end_time = time.time()
logger.info(f'Time taken for Accidents: {end_time - start_time3}')
if __name__ == '__main__': if __name__ == '__main__':
process_all_data_sources(True, True, True) # process_all_data_sources(True, True, True)
# miv_to_integrated_csv() # miv_to_integrated_csv()
acc_to_cleaned_geojson()

View File

@ -61,7 +61,7 @@ CREATE TABLE Accidents (
AccidentInvolvingMotorcycle BOOLEAN , AccidentInvolvingMotorcycle BOOLEAN ,
RoadType VARCHAR(5) , RoadType VARCHAR(5) ,
RoadType_en VARCHAR(256) , RoadType_en VARCHAR(256) ,
Geometry geometry(Point) , Geometry geometry(Point, 4326) ,
PRIMARY KEY (AccidentUID) , PRIMARY KEY (AccidentUID) ,
CHECK ( AccidentHour BETWEEN 0 AND 23) , CHECK ( AccidentHour BETWEEN 0 AND 23) ,
@ -74,4 +74,6 @@ COPY FootBikeCount FROM '/Users/seb/Projects/repos/group-1/src/datasets/integrat
COPY MivCount FROM '/Users/seb/Projects/repos/group-1/src/datasets/integrated/MivCount.csv' COPY MivCount FROM '/Users/seb/Projects/repos/group-1/src/datasets/integrated/MivCount.csv'
DELIMITER ',' DELIMITER ','
CSV HEADER; CSV HEADER;
COPY Accidents FROM '/Users/seb/Projects/repos/group-1/src/datasets/integrated/Accidents.geojson' WITH (FORMAT 'geojson');