From c33ca87aafa7a6f97d5d7dec7fa988b6cde8a0b8 Mon Sep 17 00:00:00 2001 From: Sebastian Lenzlinger <74497638+sebaschi@users.noreply.github.com> Date: Sun, 3 Dec 2023 16:34:40 +0100 Subject: [PATCH] UNFINNISHED: Script to create csv of integrated and clean datasets. --- src/integrate.py | 171 ++++++++++++++++++--- src/{preparations.py => prepare_for_db.py} | 0 src/setup_tables.sql | 71 +++++++++ src/testArea.ipynb | 58 +++---- 4 files changed, 252 insertions(+), 48 deletions(-) rename src/{preparations.py => prepare_for_db.py} (100%) create mode 100644 src/setup_tables.sql diff --git a/src/integrate.py b/src/integrate.py index 2df95c1..9a7c03b 100644 --- a/src/integrate.py +++ b/src/integrate.py @@ -1,14 +1,19 @@ import data_utils as du -from datetime import datetime as dt import os -import requests import pandas as pd +from datetime import datetime +import time +from shapely.geometry import Point import logging logging.basicConfig(level=logging.DEBUG, filename='integrate.log', format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger('integrate.py') +stream_handler = logging.StreamHandler() +formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +stream_handler.setFormatter(formatter) +logger.addHandler(stream_handler) foot_bike_urls_file = '../docs/foot_bike_zaehlung_urls.txt' miv_file_urls = '../docs/verkehrszaehlung_moto_urls.txt' @@ -24,9 +29,70 @@ integrated_dir = 'datasets/integrated/' weekday_names = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday'] +fb_data_types = { + 'ID': 'int', + 'NORD': 'int', + 'OST': 'int', + 'DATE': 'str', + 'HRS': 'int', + 'VELO_IN': 'int', + 'VELO_OUT': 'int', + 'FUSS_IN': 'int', + 'FUSS_OUT': 'int', + 'Weekday_en': 'str' +} -def process_foot_bike_data(): - fb_df_unified = du.create_unified_df(foot_bike_urls_file, foot_bike_file_u_string, data_dir, files_present=True) +miv_data_types = { + 'MSID': 'str', + 'ZSID': 'str', + 'Achse': 'str', + 'NKoord': 'int', + 'EKoord': 'int', + 'Richtung': 'str', + 'AnzFahrzeuge': 'int', + 'AnzFahrzeugeStatus': 'str', + 'Datum': 'str', + 'Hrs': 'int', + 'Weekday_en': 'str' +} + +acc_data_types = { + 'AccidentUID': 'str', + 'AccidentYear': 'int', + 'AccidentMonth': 'int', + 'AccidentWeekDay_en': 'str', + 'AccidentHour': 'int', + 'NKoord': 'int', + 'EKoord': 'int', + 'AccidentType_en': 'str', + 'AccidentType': 'str', + 'AccidentSeverityCategory': 'str', + 'AccidentInvolvingPedestrian': 'bool', + 'AccidentInvolvingBicycle': 'bool', + 'AccidentInvolvingMotorcycle': 'bool', + 'RoadType': 'str', + 'RoadType_en': 'str', + 'Geometry': 'str' # TODO: Figure out what dtype this needs to be for postgres +} + + +def ensure_dirs_exist(data_dir, integrated_dir): + """ + This should be called before anything else to make sure that the relevant directories exists. + :param data_dir: directory where the datasets are stored + :param integrated_dir: directory where the integrated data will be stored + :return: + """ + logger.debug(f'data_dir: {data_dir}\n integrated_dir: {integrated_dir}') + logger.info("Ensuring needed directories exist.") + os.makedirs(data_dir, exist_ok=True) + logger.debug("data_dir created.") + os.makedirs(integrated_dir, exist_ok=True) + logger.debug("integrated_dir created") + + +def process_foot_bike_data(files_present=True): + fb_df_unified = du.create_unified_df(foot_bike_urls_file, foot_bike_file_u_string, data_dir, files_present=files_present) fb_df_unified[['DATE', "TIME"]] = fb_df_unified['DATUM'].str.split('T', expand=True) fb_df_unified[['HRS', 'MINS']] = fb_df_unified['TIME'].str.split(':', expand=True) ## Evt brauchen wir doch FK_ZAEHLER @@ -44,28 +110,32 @@ def process_foot_bike_data(): fb_df_grouped['Weekday_en'] = days.map(lambda x: weekday_names[x]) cleaned_fb_df = fb_df_grouped cleaned_fb_df['ID'] = cleaned_fb_df.index + 1 + cleaned_fb_df = cleaned_fb_df[['ID', 'NORD', 'OST', 'DATE', 'HRS', 'VELO_IN', 'VELO_OUT', 'FUSS_IN', + 'FUSS_OUT', 'Weekday_en']] + # Ensure datatype of df and sql table match + cleaned_fb_df = cleaned_fb_df.astype(fb_data_types) return cleaned_fb_df -def process_miv_data(): - miv_df_unified = du.create_unified_df(miv_file_urls, motor_file_u_string, data_dir, files_present=True) +def process_miv_data(files_present=True): + miv_df_unified = du.create_unified_df(miv_file_urls, motor_file_u_string, data_dir, files_present=files_present) - miv_df_unified[['Date', "Time"]] = miv_df_unified['MessungDatZeit'].str.split('T', expand=True) + miv_df_unified[['Datum', "Time"]] = miv_df_unified['MessungDatZeit'].str.split('T', expand=True) miv_df_unified[['Hrs', 'Mins', 'Sec']] = miv_df_unified['Time'].str.split(':', expand=True) miv_cols_to_keep = ['MSID','ZSID','Achse', 'EKoord', 'NKoord', 'Richtung', 'AnzFahrzeuge', 'AnzFahrzeugeStatus', - 'Date', 'Hrs'] + 'Datum', 'Hrs'] miv_df_cols_dropped = miv_df_unified[miv_cols_to_keep] - dt_obj = pd.to_datetime(miv_df_cols_dropped['Date']) + dt_obj = pd.to_datetime(miv_df_cols_dropped['Datum']) days = dt_obj.dt.weekday miv_df_cols_dropped['Weekday_en'] = days.map(lambda x: weekday_names[x]) + miv_df_cols_dropped['AnzFahrzeuge'] = miv_df_cols_dropped['AnzFahrzeuge'].fillna(0).astype(int) - # Convert row type to int so they match other - miv_df_cols_dropped['EKoord'] = miv_df_cols_dropped['EKoord'].astype(int) - miv_df_cols_dropped['NKoord'] = miv_df_cols_dropped['NKoord'].astype(int) + cleaned_miv_df = miv_df_cols_dropped[['MSID', 'ZSID', 'Achse', 'NKoord', 'EKoord', 'Richtung', 'AnzFahrzeuge', + 'AnzFahrzeugeStatus', 'Datum', 'Hrs', 'Weekday_en']] - cleaned_miv_df = miv_df_cols_dropped + cleaned_miv_df = cleaned_miv_df.astype(miv_data_types) return cleaned_miv_df @@ -73,19 +143,82 @@ def process_accident_data(file_present: bool = True): if not file_present: du.process_urls(data_dir, accident_file_url) acc_df_unified = du.load_dataframes_from_geojson_files(data_dir, accident_file_u_string) - acc_cols_to_keep = ['AccidentUID', 'AccidentHour', 'AccidentYear', 'AccidentWeekDay_en', 'AccidentType', + acc_cols_to_keep = ['AccidentUID', 'AccidentYear', 'AccidentMonth', 'AccidentWeekDay_en','AccidentHour', + 'AccidentLocation_CHLV95_N', 'AccidentLocation_CHLV95_E', 'AccidentType_en', 'AccidentType', 'AccidentSeverityCategory', 'AccidentInvolvingPedestrian', 'AccidentInvolvingBicycle', - 'AccidentInvolvingMotorcycle', 'RoadType', 'RoadType_en', 'AccidentLocation_CHLV95_E', - 'AccidentLocation_CHLV95_N', 'AccidentMonth', 'geometry'] + 'AccidentInvolvingMotorcycle', 'RoadType', 'RoadType_en', + 'Geometry'] cleaned_acc_df = acc_df_unified[acc_cols_to_keep] cleaned_acc_df.rename(columns={ 'AccidentLocation_CHLV95_E': 'EKoord', 'AccidentLocation_CHLV95_N': 'NKoord', }, inplace=True) + + cleaned_acc_df = cleaned_acc_df.astype(acc_data_types) + return cleaned_acc_df +def process_all_data_sources(fb_present=True, miv_present=True, accident_present=True): + """ + Process all data sources and turn them in to csv files. After this function is called there + should be csv files of the cleaned and integrated data sources + + :param fb_present: bool, are the files present in local file system + :param miv_present: bool, are the files present in local file system + :param accident_present: bool, are the files present in local file system + :return: + """ + ensure_dirs_exist(data_dir, integrated_dir) + logger.info("Started processing all data sources.") + start_time = time.time() + logger.info("Start processing pedestrian and bicycle data (FootBikeCount)") + fb_count_df = process_foot_bike_data(fb_present) + logger.debug(f'FB Head:{fb_count_df.head()}\n FB dtypes: {fb_count_df.dtypes}') + fb_file_path = os.path.join(integrated_dir, 'FootBikeCount.csv') + logger.debug(f'FB Cleaned File Path: {fb_file_path}') + fb_count_df.to_csv(fb_file_path, index=False) + logger.info("FB integrated csv created.") + logger.info(f'Time taken for FootBikeCount: {start_time-time.time()}') + + start_time2 = time.time() + logger.info("Start processing motorized vehicle data (MivCount)") + miv_count_df = process_miv_data(miv_present) + logger.debug(f'MIV Head:{miv_count_df.head()}\n MIV dtypes: {miv_count_df.dtypes}') + miv_file_path = os.path.join(integrated_dir, 'MivCount.csv') + logger.debug(f'MIV Cleaned File Path: {miv_file_path}') + miv_count_df.to_csv(miv_file_path, index=False) + logger.info("MIV integrated csv created.") + logger.info(f'Time taken for MivCount: {start_time2-time.time()}') + +def fb_to_integrated(files_present=True): + + start_time = time.time() + logger.info("Start processing pedestrian and bicycle data (FootBikeCount)") + fb_count_df = process_foot_bike_data(files_present) + logger.debug(f'FB Head:{fb_count_df.head()}\n FB dtypes: {fb_count_df.dtypes}') + fb_file_path = os.path.join(integrated_dir, 'FootBikeCount.csv') + logger.debug(f'FB Cleaned File Path: {fb_file_path}') + fb_count_df.to_csv(fb_file_path, index=False) + logger.info("FB integrated csv created.") + end_time = time.time() + logger.info(f'Time taken for FootBikeCount: {end_time-start_time}') + + +def miv_to_integrated_csv(miv_present=True): + + start_time2 = time.time() + logger.info("Start processing motorized vehicle data (MivCount)") + miv_count_df = process_miv_data(miv_present) + logger.debug(f'MIV Head:{miv_count_df.head()}\n MIV dtypes: {miv_count_df.dtypes}') + miv_file_path = os.path.join(integrated_dir, 'MivCount.csv') + logger.debug(f'MIV Cleaned File Path: {miv_file_path}') + miv_count_df.to_csv(miv_file_path, index=False) + logger.info("MIV integrated csv created.") + end_time = time.time() + logger.info(f'Time taken for MivCount: {end_time-start_time2}') + + if __name__ == '__main__': - acc_df = process_accident_data(True) - print(acc_df.dtypes) - print(acc_df.head(100)) + #process_all_data_sources(True, True, True) + miv_to_integrated_csv() diff --git a/src/preparations.py b/src/prepare_for_db.py similarity index 100% rename from src/preparations.py rename to src/prepare_for_db.py diff --git a/src/setup_tables.sql b/src/setup_tables.sql new file mode 100644 index 0000000..d510b00 --- /dev/null +++ b/src/setup_tables.sql @@ -0,0 +1,71 @@ +CREATE EXTENSION IF NOT EXISTS postgis; + +DROP TABLE IF EXISTS FootBikeCount; + +CREATE TABLE FootBikeCount ( + ID INTEGER , + NORD INTEGER , + OST INT , + DATE VARCHAR(10) , + HRS INTEGER , + VELO_IN INTEGER , + VELO_OUT INTEGER , + FUSS_IN INTEGER , + FUSS_OUT INTEGER , + Weekday_en VARCHAR(10) , + + PRIMARY KEY(ID) , + CHECK (Weekday_en IN ('Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday')), + CHECK (Hrs BETWEEN 0 AND 23) + + +); + +DROP TABLE IF EXISTS MivCount; + +CREATE TABLE MivCount ( + MSID VARCHAR(256) , + ZSID VARCHAR(256) , + Achse VARCHAR(256) , + NKoord INTEGER , + EKoord INTEGER , + Richtung VARCHAR(10) , + AnzFahrzeuge INTEGER , + AnzFahrzeugeStatus VARCHAR(20) , + Datum VARCHAR(10) , + Hrs Integer , + Weekday_en VARCHAR(10), + PRIMARY KEY (MSID), + CHECK (Weekday_en IN ('Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday')), + CHECK (Hrs BETWEEN 0 AND 23) +); + + +DROP TABLE IF EXISTS Accidents; + +CREATE TABLE Accidents ( + AccidentUID VARCHAR(32) , + AccidentYear INTEGER , + AccidentMonth INTEGER, + AccidentWeekDay_en VARCHAR(10) , + AccidentHour INTEGER , + NKoord INTEGER , + EKoord INTEGER , + AccidentType_en VARCHAR(256) , + AccidentType VARCHAR(4) , + AccidentSeverityCategory VARCHAR(4) , + AccidentInvolvingPedestrian BOOLEAN , + AccidentInvolvingBicycle BOOLEAN , + AccidentInvolvingMotorcycle BOOLEAN , + RoadType VARCHAR(5) , + RoadType_en VARCHAR(256) , + Geometry geometry(Point) , + + PRIMARY KEY (AccidentUID) , + CHECK ( AccidentHour BETWEEN 0 AND 23) , + CHECK (AccidentWeekDay_en IN ('Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday')) +); + +COPY FootBikeCount FROM '/Users/seb/Projects/repos/group-1/src/datasets/integrated/FootBikeCount.csv' + DELIMITER ',' + CSV HEADER; \ No newline at end of file diff --git a/src/testArea.ipynb b/src/testArea.ipynb index 270b2ac..eb3a6b9 100644 --- a/src/testArea.ipynb +++ b/src/testArea.ipynb @@ -227,7 +227,7 @@ }, { "cell_type": "code", - "execution_count": 8, + "execution_count": 10, "outputs": [ { "name": "stdout", @@ -245,43 +245,43 @@ "16699188 2682704.50\n", "16699189 2682704.50\n", "Name: EKoord, Length: 16699190, dtype: float64\n", - "Acc unique: 0 2684605\n", - "1 2682382\n", - "2 2682791\n", - "3 2681199\n", - "4 2682479\n", - " ... \n", - "55821 2682244\n", - "55822 2680029\n", - "55823 2684990\n", - "55824 2678025\n", - "55825 2684500\n", - "Name: EKoord, Length: 55826, dtype: object\n", - "FB unique: 0 2678956\n", - "1 2678956\n", - "2 2678956\n", - "3 2678956\n", - "4 2678956\n", - " ... \n", - "3011488 2684578\n", - "3011489 2684578\n", - "3011490 2684578\n", - "3011491 2684578\n", - "3011492 2684578\n", - "Name: OST, Length: 3011493, dtype: int64\n" + "Acc unique: \n", + "FB unique: 0 2012-01-01\n", + "1 2012-01-01\n", + "2 2012-01-01\n", + "3 2012-01-01\n", + "4 2012-01-01\n", + " ... \n", + "3011488 2019-07-13\n", + "3011489 2019-07-13\n", + "3011490 2019-07-13\n", + "3011491 2019-07-13\n", + "3011492 2019-07-13\n", + "Name: DATE, Length: 3011493, dtype: object\n" ] } ], "source": [ "print(\"MIV unqiue:\", miv_df['EKoord'])\n", - "print(\"Acc unique:\", acc_df['EKoord'])\n", - "print(\"FB unique: \", fb_data['OST'])\n" + "print(\"Acc unique:\", acc_df['RoadType'].unique)\n", + "print(\"FB unique: \", fb_data['DATE'])\n" ], "metadata": { "collapsed": false, "ExecuteTime": { - "end_time": "2023-12-03T12:33:32.280058Z", - "start_time": "2023-12-03T12:33:32.275419Z" + "end_time": "2023-12-03T15:03:13.580284Z", + "start_time": "2023-12-03T15:03:13.574959Z" } }, "id": "f6d752ea17eda341"