Merge remote-tracking branch 'origin/dev-explore' into dev-explore

# Conflicts:
#	src/integrate.py
#	src/setup_tables.sql
This commit is contained in:
Sebastian Lenzlinger 2023-12-03 17:37:07 +01:00
commit 39b28a4b65
4 changed files with 78 additions and 35 deletions

View File

@ -8,6 +8,10 @@ import logging
logging.basicConfig(level=logging.DEBUG, filename='data_utils.log', format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('data_utils.py')
stream_handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
def download_csv(url, local_filename):
@ -54,24 +58,27 @@ def load_dataframe_from_csv(filepath):
def load_dataframes_from_csv_files(data_dir, u_string):
dataframes = []
with tpe(max_workers=5) as executor:
for filename in os.listdir(data_dir):
if (u_string in filename) and filename.endswith('.csv'):
filepath = os.path.join(data_dir, filename)
future = executor.submit(load_dataframe_from_csv, filepath)
dataframes.append(future)
dataframes = [future.result() for future in dataframes if future.result() is not None]
return dataframes
# with tpe(max_workers=5) as executor:
# for filename in os.listdir(data_dir):
# if (u_string in filename) and filename.endswith('.csv'):
# filepath = os.path.join(data_dir, filename)
# df = pd.read_csv(filepath, low_memory=False)
# dataframes.append(df)
# future = executor.submit(load_dataframe_from_csv, filepath)
# dataframes.append(future)
#
# dataframes = [future.result() for future in dataframes if future.result() is not None]
#
# return dataframes
for filename in os.listdir(data_dir):
if (u_string in filename) and filename.endswith('.csv'):
filepath = os.path.join(data_dir, filename)
df = pd.read_csv(filepath, low_memory=False)
logger.debug(f'Duplicate Rows for {filename}: {df[df.duplicated()].shape[0]}')
df = df.drop_duplicates()
logger.debug(f'Duplicate Rows after DROPPING for {filename}: {df[df.duplicated()].shape[0]}')
dataframes.append(df.drop_duplicates())
return dataframes
def load_dataframes_from_geojson_files(data_dir, u_string):
print('u_string', u_string)
@ -89,6 +96,7 @@ def load_dataframes_from_geojson_files(data_dir, u_string):
def combine_dataframes(dataframes):
if dataframes:
combined_dataframe = pd.concat(dataframes, ignore_index=True)
logger.debug(f'Duplicate Rows after combining: {combined_dataframe[combined_dataframe.duplicated()]}')
return combined_dataframe
else:
print("No dataframes to combine")

38
src/fill_db.py Normal file
View File

@ -0,0 +1,38 @@
import os
import pandas as pd
import psycopg2
from psycopg2 import sql
integrated_dir = 'datasets/integrated/'
# Set up info needed to connect to db
db_info = {
'host': '127.0.0.1',
'database': 'zh-traffic',
'port': '54322',
'user': 'db23-db',
'password': 'db23-project-role-PW@0',
'sslmode': 'disable'
}
csv_table_maps = [
{'file': os.path.join(integrated_dir, 'FootBikeCount.csv'), 'table': 'FootBikeCount'},
{'file': os.path.join(integrated_dir, 'MivCount.csv'), 'table': 'MivCount'}
]
db_connection = psycopg2.connect(**db_info)
def csv_to_existing_table(csv_file_path, table_name):
df = pd.read_csv(csv_file_path)
curs = db_connection.cursor()
df.to_sql(table_name, db_connection, if_exists='append', index_label=False)
db_connection.commit()
curs.close()
for i in csv_table_maps:
csv_to_existing_table(i['file'], i['table'])
db_connection.close()

View File

@ -43,6 +43,7 @@ fb_data_types = {
}
miv_data_types = {
'ID': 'int',
'MSID': 'str',
'ZSID': 'str',
'Achse': 'str',
@ -54,7 +55,6 @@ miv_data_types = {
'Datum': 'str',
'Hrs': 'int',
'Weekday_en': 'str',
'MessungDatZeit': 'str'
}
acc_data_types = {
@ -125,17 +125,19 @@ def process_miv_data(files_present=True):
miv_df_unified[['Hrs', 'Mins', 'Sec']] = miv_df_unified['Time'].str.split(':', expand=True)
miv_cols_to_keep = ['MSID','ZSID','Achse', 'NKoord', 'EKoord', 'Richtung', 'AnzFahrzeuge', 'AnzFahrzeugeStatus',
'Datum', 'Hrs', 'MessungDatZeit']
'Datum', 'Hrs',]
miv_df_cols_dropped = miv_df_unified[miv_cols_to_keep]
dt_obj = pd.to_datetime(miv_df_cols_dropped['Datum'])
days = dt_obj.dt.weekday
miv_df_cols_dropped['Weekday_en'] = days.map(lambda x: weekday_names[x])
miv_df_cols_dropped['AnzFahrzeuge'] = miv_df_cols_dropped['AnzFahrzeuge'].fillna(0).astype(int)
miv_df_cols_dropped['ZSID'] = miv_df_cols_dropped['ZSID'].fillna('Missing').astype(str)
miv_df_cols_dropped.loc[:, 'Weekday_en'] = days.map(lambda x: weekday_names[x])
cleaned_miv_df = miv_df_cols_dropped[['MSID', 'ZSID', 'Achse', 'NKoord', 'EKoord', 'Richtung', 'AnzFahrzeuge',
'AnzFahrzeugeStatus', 'Datum', 'Hrs', 'Weekday_en', 'MessungDatZeit']]
miv_df_cols_dropped.loc[:, 'AnzFahrzeuge'] = miv_df_cols_dropped['AnzFahrzeuge'].fillna(0).astype(int)
miv_df_cols_dropped[:, 'ZSID'] = miv_df_cols_dropped['ZSID'].fillna('Missing').astype(str)
miv_df_cols_dropped['ID'] = (miv_df_cols_dropped.index + 1).copy()
cleaned_miv_df = miv_df_cols_dropped[['ID', 'MSID', 'ZSID', 'Achse', 'NKoord', 'EKoord', 'Richtung', 'AnzFahrzeuge',
'AnzFahrzeugeStatus', 'Datum', 'Hrs', 'Weekday_en']]
cleaned_miv_df = cleaned_miv_df.astype(miv_data_types)
cleaned_miv_df = cleaned_miv_df.drop_duplicates()
@ -158,7 +160,6 @@ def process_accident_data(file_present: bool = True):
}, inplace=True)
cleaned_acc_df = cleaned_acc_df.astype(acc_data_types)
return cleaned_acc_df
@ -223,10 +224,5 @@ def miv_to_integrated_csv(miv_present=True):
if __name__ == '__main__':
#process_all_data_sources(True, True, True)
miv_to_integrated_csv()
path = os.path.join(integrated_dir, 'MivCount.csv')
df = pd.read_csv(path)
df = df[['MSID', 'MessungDatZeit']]
duplicate_rows = df[df.duplicated()]
print(duplicate_rows.shape[0])
process_all_data_sources(True, True, True)
# miv_to_integrated_csv()

View File

@ -24,8 +24,9 @@ CREATE TABLE FootBikeCount (
DROP TABLE IF EXISTS MivCount;
CREATE TABLE MivCount (
MSID VARCHAR(256) ,
ZSID VARCHAR(256) NULL,
ID INTEGER ,
MSID VARCHAR(10) ,
ZSID VARCHAR(10) ,
Achse VARCHAR(256) ,
NKoord INTEGER ,
EKoord INTEGER ,
@ -35,8 +36,8 @@ CREATE TABLE MivCount (
Datum VARCHAR(10) ,
Hrs Integer ,
Weekday_en VARCHAR(10),
MessungDatZeit VARCHAR(100),
PRIMARY KEY (MSID, Achse,Richtung, AnzFahrzeuge, Datum, Hrs),
PRIMARY KEY (ID),
CHECK (Weekday_en IN ('Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday')),
CHECK (Hrs BETWEEN 0 AND 23)
);