diff --git a/src/data_utils.py b/src/data_utils.py index 72b88c5..1dc7109 100644 --- a/src/data_utils.py +++ b/src/data_utils.py @@ -8,6 +8,10 @@ import logging logging.basicConfig(level=logging.DEBUG, filename='data_utils.log', format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger('data_utils.py') +stream_handler = logging.StreamHandler() +formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +stream_handler.setFormatter(formatter) +logger.addHandler(stream_handler) def download_csv(url, local_filename): @@ -54,24 +58,27 @@ def load_dataframe_from_csv(filepath): def load_dataframes_from_csv_files(data_dir, u_string): dataframes = [] - with tpe(max_workers=5) as executor: - for filename in os.listdir(data_dir): - if (u_string in filename) and filename.endswith('.csv'): - filepath = os.path.join(data_dir, filename) - future = executor.submit(load_dataframe_from_csv, filepath) - dataframes.append(future) - - dataframes = [future.result() for future in dataframes if future.result() is not None] - - return dataframes - - # for filename in os.listdir(data_dir): - # if (u_string in filename) and filename.endswith('.csv'): - # filepath = os.path.join(data_dir, filename) - # df = pd.read_csv(filepath, low_memory=False) - # dataframes.append(df) + # with tpe(max_workers=5) as executor: + # for filename in os.listdir(data_dir): + # if (u_string in filename) and filename.endswith('.csv'): + # filepath = os.path.join(data_dir, filename) + # future = executor.submit(load_dataframe_from_csv, filepath) + # dataframes.append(future) + # + # dataframes = [future.result() for future in dataframes if future.result() is not None] + # # return dataframes + for filename in os.listdir(data_dir): + if (u_string in filename) and filename.endswith('.csv'): + filepath = os.path.join(data_dir, filename) + df = pd.read_csv(filepath, low_memory=False) + logger.debug(f'Duplicate Rows for {filename}: {df[df.duplicated()].shape[0]}') + df = df.drop_duplicates() + logger.debug(f'Duplicate Rows after DROPPING for {filename}: {df[df.duplicated()].shape[0]}') + dataframes.append(df.drop_duplicates()) + return dataframes + def load_dataframes_from_geojson_files(data_dir, u_string): print('u_string', u_string) @@ -89,6 +96,7 @@ def load_dataframes_from_geojson_files(data_dir, u_string): def combine_dataframes(dataframes): if dataframes: combined_dataframe = pd.concat(dataframes, ignore_index=True) + logger.debug(f'Duplicate Rows after combining: {combined_dataframe[combined_dataframe.duplicated()]}') return combined_dataframe else: print("No dataframes to combine") diff --git a/src/fill_db.py b/src/fill_db.py new file mode 100644 index 0000000..fc626e6 --- /dev/null +++ b/src/fill_db.py @@ -0,0 +1,38 @@ +import os + +import pandas as pd +import psycopg2 +from psycopg2 import sql + +integrated_dir = 'datasets/integrated/' + +# Set up info needed to connect to db +db_info = { + 'host': '127.0.0.1', + 'database': 'zh-traffic', + 'port': '54322', + 'user': 'db23-db', + 'password': 'db23-project-role-PW@0', + 'sslmode': 'disable' +} + +csv_table_maps = [ + {'file': os.path.join(integrated_dir, 'FootBikeCount.csv'), 'table': 'FootBikeCount'}, + {'file': os.path.join(integrated_dir, 'MivCount.csv'), 'table': 'MivCount'} +] + +db_connection = psycopg2.connect(**db_info) + + +def csv_to_existing_table(csv_file_path, table_name): + df = pd.read_csv(csv_file_path) + curs = db_connection.cursor() + df.to_sql(table_name, db_connection, if_exists='append', index_label=False) + db_connection.commit() + curs.close() + + +for i in csv_table_maps: + csv_to_existing_table(i['file'], i['table']) + +db_connection.close() diff --git a/src/integrate.py b/src/integrate.py index ce03042..45cc14d 100644 --- a/src/integrate.py +++ b/src/integrate.py @@ -43,6 +43,7 @@ fb_data_types = { } miv_data_types = { + 'ID': 'int', 'MSID': 'str', 'ZSID': 'str', 'Achse': 'str', @@ -54,7 +55,6 @@ miv_data_types = { 'Datum': 'str', 'Hrs': 'int', 'Weekday_en': 'str', - 'MessungDatZeit': 'str' } acc_data_types = { @@ -125,17 +125,19 @@ def process_miv_data(files_present=True): miv_df_unified[['Hrs', 'Mins', 'Sec']] = miv_df_unified['Time'].str.split(':', expand=True) miv_cols_to_keep = ['MSID','ZSID','Achse', 'NKoord', 'EKoord', 'Richtung', 'AnzFahrzeuge', 'AnzFahrzeugeStatus', - 'Datum', 'Hrs', 'MessungDatZeit'] + 'Datum', 'Hrs',] miv_df_cols_dropped = miv_df_unified[miv_cols_to_keep] dt_obj = pd.to_datetime(miv_df_cols_dropped['Datum']) days = dt_obj.dt.weekday - miv_df_cols_dropped['Weekday_en'] = days.map(lambda x: weekday_names[x]) - miv_df_cols_dropped['AnzFahrzeuge'] = miv_df_cols_dropped['AnzFahrzeuge'].fillna(0).astype(int) - miv_df_cols_dropped['ZSID'] = miv_df_cols_dropped['ZSID'].fillna('Missing').astype(str) + miv_df_cols_dropped.loc[:, 'Weekday_en'] = days.map(lambda x: weekday_names[x]) - cleaned_miv_df = miv_df_cols_dropped[['MSID', 'ZSID', 'Achse', 'NKoord', 'EKoord', 'Richtung', 'AnzFahrzeuge', - 'AnzFahrzeugeStatus', 'Datum', 'Hrs', 'Weekday_en', 'MessungDatZeit']] + miv_df_cols_dropped.loc[:, 'AnzFahrzeuge'] = miv_df_cols_dropped['AnzFahrzeuge'].fillna(0).astype(int) + miv_df_cols_dropped[:, 'ZSID'] = miv_df_cols_dropped['ZSID'].fillna('Missing').astype(str) + miv_df_cols_dropped['ID'] = (miv_df_cols_dropped.index + 1).copy() + + cleaned_miv_df = miv_df_cols_dropped[['ID', 'MSID', 'ZSID', 'Achse', 'NKoord', 'EKoord', 'Richtung', 'AnzFahrzeuge', + 'AnzFahrzeugeStatus', 'Datum', 'Hrs', 'Weekday_en']] cleaned_miv_df = cleaned_miv_df.astype(miv_data_types) cleaned_miv_df = cleaned_miv_df.drop_duplicates() @@ -158,7 +160,6 @@ def process_accident_data(file_present: bool = True): }, inplace=True) cleaned_acc_df = cleaned_acc_df.astype(acc_data_types) - return cleaned_acc_df @@ -223,10 +224,5 @@ def miv_to_integrated_csv(miv_present=True): if __name__ == '__main__': - #process_all_data_sources(True, True, True) - miv_to_integrated_csv() - path = os.path.join(integrated_dir, 'MivCount.csv') - df = pd.read_csv(path) - df = df[['MSID', 'MessungDatZeit']] - duplicate_rows = df[df.duplicated()] - print(duplicate_rows.shape[0]) + process_all_data_sources(True, True, True) + # miv_to_integrated_csv() diff --git a/src/setup_tables.sql b/src/setup_tables.sql index 9879b94..8b4360c 100644 --- a/src/setup_tables.sql +++ b/src/setup_tables.sql @@ -24,8 +24,9 @@ CREATE TABLE FootBikeCount ( DROP TABLE IF EXISTS MivCount; CREATE TABLE MivCount ( - MSID VARCHAR(256) , - ZSID VARCHAR(256) NULL, + ID INTEGER , + MSID VARCHAR(10) , + ZSID VARCHAR(10) , Achse VARCHAR(256) , NKoord INTEGER , EKoord INTEGER , @@ -35,8 +36,8 @@ CREATE TABLE MivCount ( Datum VARCHAR(10) , Hrs Integer , Weekday_en VARCHAR(10), - MessungDatZeit VARCHAR(100), - PRIMARY KEY (MSID, Achse,Richtung, AnzFahrzeuge, Datum, Hrs), + + PRIMARY KEY (ID), CHECK (Weekday_en IN ('Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday')), CHECK (Hrs BETWEEN 0 AND 23) );