UNFINNISHED: Script to create csv of integrated and clean datasets.

This commit is contained in:
Sebastian Lenzlinger 2023-12-03 16:34:40 +01:00
parent 1ef7bbe39b
commit c33ca87aaf
4 changed files with 252 additions and 48 deletions

View File

@ -1,14 +1,19 @@
import data_utils as du import data_utils as du
from datetime import datetime as dt
import os import os
import requests
import pandas as pd import pandas as pd
from datetime import datetime
import time
from shapely.geometry import Point
import logging import logging
logging.basicConfig(level=logging.DEBUG, filename='integrate.log', logging.basicConfig(level=logging.DEBUG, filename='integrate.log',
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('integrate.py') logger = logging.getLogger('integrate.py')
stream_handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
foot_bike_urls_file = '../docs/foot_bike_zaehlung_urls.txt' foot_bike_urls_file = '../docs/foot_bike_zaehlung_urls.txt'
miv_file_urls = '../docs/verkehrszaehlung_moto_urls.txt' miv_file_urls = '../docs/verkehrszaehlung_moto_urls.txt'
@ -24,9 +29,70 @@ integrated_dir = 'datasets/integrated/'
weekday_names = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday'] weekday_names = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
fb_data_types = {
'ID': 'int',
'NORD': 'int',
'OST': 'int',
'DATE': 'str',
'HRS': 'int',
'VELO_IN': 'int',
'VELO_OUT': 'int',
'FUSS_IN': 'int',
'FUSS_OUT': 'int',
'Weekday_en': 'str'
}
def process_foot_bike_data(): miv_data_types = {
fb_df_unified = du.create_unified_df(foot_bike_urls_file, foot_bike_file_u_string, data_dir, files_present=True) 'MSID': 'str',
'ZSID': 'str',
'Achse': 'str',
'NKoord': 'int',
'EKoord': 'int',
'Richtung': 'str',
'AnzFahrzeuge': 'int',
'AnzFahrzeugeStatus': 'str',
'Datum': 'str',
'Hrs': 'int',
'Weekday_en': 'str'
}
acc_data_types = {
'AccidentUID': 'str',
'AccidentYear': 'int',
'AccidentMonth': 'int',
'AccidentWeekDay_en': 'str',
'AccidentHour': 'int',
'NKoord': 'int',
'EKoord': 'int',
'AccidentType_en': 'str',
'AccidentType': 'str',
'AccidentSeverityCategory': 'str',
'AccidentInvolvingPedestrian': 'bool',
'AccidentInvolvingBicycle': 'bool',
'AccidentInvolvingMotorcycle': 'bool',
'RoadType': 'str',
'RoadType_en': 'str',
'Geometry': 'str' # TODO: Figure out what dtype this needs to be for postgres
}
def ensure_dirs_exist(data_dir, integrated_dir):
"""
This should be called before anything else to make sure that the relevant directories exists.
:param data_dir: directory where the datasets are stored
:param integrated_dir: directory where the integrated data will be stored
:return:
"""
logger.debug(f'data_dir: {data_dir}\n integrated_dir: {integrated_dir}')
logger.info("Ensuring needed directories exist.")
os.makedirs(data_dir, exist_ok=True)
logger.debug("data_dir created.")
os.makedirs(integrated_dir, exist_ok=True)
logger.debug("integrated_dir created")
def process_foot_bike_data(files_present=True):
fb_df_unified = du.create_unified_df(foot_bike_urls_file, foot_bike_file_u_string, data_dir, files_present=files_present)
fb_df_unified[['DATE', "TIME"]] = fb_df_unified['DATUM'].str.split('T', expand=True) fb_df_unified[['DATE', "TIME"]] = fb_df_unified['DATUM'].str.split('T', expand=True)
fb_df_unified[['HRS', 'MINS']] = fb_df_unified['TIME'].str.split(':', expand=True) fb_df_unified[['HRS', 'MINS']] = fb_df_unified['TIME'].str.split(':', expand=True)
## Evt brauchen wir doch FK_ZAEHLER ## Evt brauchen wir doch FK_ZAEHLER
@ -44,28 +110,32 @@ def process_foot_bike_data():
fb_df_grouped['Weekday_en'] = days.map(lambda x: weekday_names[x]) fb_df_grouped['Weekday_en'] = days.map(lambda x: weekday_names[x])
cleaned_fb_df = fb_df_grouped cleaned_fb_df = fb_df_grouped
cleaned_fb_df['ID'] = cleaned_fb_df.index + 1 cleaned_fb_df['ID'] = cleaned_fb_df.index + 1
cleaned_fb_df = cleaned_fb_df[['ID', 'NORD', 'OST', 'DATE', 'HRS', 'VELO_IN', 'VELO_OUT', 'FUSS_IN',
'FUSS_OUT', 'Weekday_en']]
# Ensure datatype of df and sql table match
cleaned_fb_df = cleaned_fb_df.astype(fb_data_types)
return cleaned_fb_df return cleaned_fb_df
def process_miv_data(): def process_miv_data(files_present=True):
miv_df_unified = du.create_unified_df(miv_file_urls, motor_file_u_string, data_dir, files_present=True) miv_df_unified = du.create_unified_df(miv_file_urls, motor_file_u_string, data_dir, files_present=files_present)
miv_df_unified[['Date', "Time"]] = miv_df_unified['MessungDatZeit'].str.split('T', expand=True) miv_df_unified[['Datum', "Time"]] = miv_df_unified['MessungDatZeit'].str.split('T', expand=True)
miv_df_unified[['Hrs', 'Mins', 'Sec']] = miv_df_unified['Time'].str.split(':', expand=True) miv_df_unified[['Hrs', 'Mins', 'Sec']] = miv_df_unified['Time'].str.split(':', expand=True)
miv_cols_to_keep = ['MSID','ZSID','Achse', 'EKoord', 'NKoord', 'Richtung', 'AnzFahrzeuge', 'AnzFahrzeugeStatus', miv_cols_to_keep = ['MSID','ZSID','Achse', 'EKoord', 'NKoord', 'Richtung', 'AnzFahrzeuge', 'AnzFahrzeugeStatus',
'Date', 'Hrs'] 'Datum', 'Hrs']
miv_df_cols_dropped = miv_df_unified[miv_cols_to_keep] miv_df_cols_dropped = miv_df_unified[miv_cols_to_keep]
dt_obj = pd.to_datetime(miv_df_cols_dropped['Date']) dt_obj = pd.to_datetime(miv_df_cols_dropped['Datum'])
days = dt_obj.dt.weekday days = dt_obj.dt.weekday
miv_df_cols_dropped['Weekday_en'] = days.map(lambda x: weekday_names[x]) miv_df_cols_dropped['Weekday_en'] = days.map(lambda x: weekday_names[x])
miv_df_cols_dropped['AnzFahrzeuge'] = miv_df_cols_dropped['AnzFahrzeuge'].fillna(0).astype(int)
# Convert row type to int so they match other cleaned_miv_df = miv_df_cols_dropped[['MSID', 'ZSID', 'Achse', 'NKoord', 'EKoord', 'Richtung', 'AnzFahrzeuge',
miv_df_cols_dropped['EKoord'] = miv_df_cols_dropped['EKoord'].astype(int) 'AnzFahrzeugeStatus', 'Datum', 'Hrs', 'Weekday_en']]
miv_df_cols_dropped['NKoord'] = miv_df_cols_dropped['NKoord'].astype(int)
cleaned_miv_df = miv_df_cols_dropped cleaned_miv_df = cleaned_miv_df.astype(miv_data_types)
return cleaned_miv_df return cleaned_miv_df
@ -73,19 +143,82 @@ def process_accident_data(file_present: bool = True):
if not file_present: if not file_present:
du.process_urls(data_dir, accident_file_url) du.process_urls(data_dir, accident_file_url)
acc_df_unified = du.load_dataframes_from_geojson_files(data_dir, accident_file_u_string) acc_df_unified = du.load_dataframes_from_geojson_files(data_dir, accident_file_u_string)
acc_cols_to_keep = ['AccidentUID', 'AccidentHour', 'AccidentYear', 'AccidentWeekDay_en', 'AccidentType', acc_cols_to_keep = ['AccidentUID', 'AccidentYear', 'AccidentMonth', 'AccidentWeekDay_en','AccidentHour',
'AccidentLocation_CHLV95_N', 'AccidentLocation_CHLV95_E', 'AccidentType_en', 'AccidentType',
'AccidentSeverityCategory', 'AccidentInvolvingPedestrian', 'AccidentInvolvingBicycle', 'AccidentSeverityCategory', 'AccidentInvolvingPedestrian', 'AccidentInvolvingBicycle',
'AccidentInvolvingMotorcycle', 'RoadType', 'RoadType_en', 'AccidentLocation_CHLV95_E', 'AccidentInvolvingMotorcycle', 'RoadType', 'RoadType_en',
'AccidentLocation_CHLV95_N', 'AccidentMonth', 'geometry'] 'Geometry']
cleaned_acc_df = acc_df_unified[acc_cols_to_keep] cleaned_acc_df = acc_df_unified[acc_cols_to_keep]
cleaned_acc_df.rename(columns={ cleaned_acc_df.rename(columns={
'AccidentLocation_CHLV95_E': 'EKoord', 'AccidentLocation_CHLV95_E': 'EKoord',
'AccidentLocation_CHLV95_N': 'NKoord', 'AccidentLocation_CHLV95_N': 'NKoord',
}, inplace=True) }, inplace=True)
cleaned_acc_df = cleaned_acc_df.astype(acc_data_types)
return cleaned_acc_df return cleaned_acc_df
def process_all_data_sources(fb_present=True, miv_present=True, accident_present=True):
"""
Process all data sources and turn them in to csv files. After this function is called there
should be csv files of the cleaned and integrated data sources
:param fb_present: bool, are the files present in local file system
:param miv_present: bool, are the files present in local file system
:param accident_present: bool, are the files present in local file system
:return:
"""
ensure_dirs_exist(data_dir, integrated_dir)
logger.info("Started processing all data sources.")
start_time = time.time()
logger.info("Start processing pedestrian and bicycle data (FootBikeCount)")
fb_count_df = process_foot_bike_data(fb_present)
logger.debug(f'FB Head:{fb_count_df.head()}\n FB dtypes: {fb_count_df.dtypes}')
fb_file_path = os.path.join(integrated_dir, 'FootBikeCount.csv')
logger.debug(f'FB Cleaned File Path: {fb_file_path}')
fb_count_df.to_csv(fb_file_path, index=False)
logger.info("FB integrated csv created.")
logger.info(f'Time taken for FootBikeCount: {start_time-time.time()}')
start_time2 = time.time()
logger.info("Start processing motorized vehicle data (MivCount)")
miv_count_df = process_miv_data(miv_present)
logger.debug(f'MIV Head:{miv_count_df.head()}\n MIV dtypes: {miv_count_df.dtypes}')
miv_file_path = os.path.join(integrated_dir, 'MivCount.csv')
logger.debug(f'MIV Cleaned File Path: {miv_file_path}')
miv_count_df.to_csv(miv_file_path, index=False)
logger.info("MIV integrated csv created.")
logger.info(f'Time taken for MivCount: {start_time2-time.time()}')
def fb_to_integrated(files_present=True):
start_time = time.time()
logger.info("Start processing pedestrian and bicycle data (FootBikeCount)")
fb_count_df = process_foot_bike_data(files_present)
logger.debug(f'FB Head:{fb_count_df.head()}\n FB dtypes: {fb_count_df.dtypes}')
fb_file_path = os.path.join(integrated_dir, 'FootBikeCount.csv')
logger.debug(f'FB Cleaned File Path: {fb_file_path}')
fb_count_df.to_csv(fb_file_path, index=False)
logger.info("FB integrated csv created.")
end_time = time.time()
logger.info(f'Time taken for FootBikeCount: {end_time-start_time}')
def miv_to_integrated_csv(miv_present=True):
start_time2 = time.time()
logger.info("Start processing motorized vehicle data (MivCount)")
miv_count_df = process_miv_data(miv_present)
logger.debug(f'MIV Head:{miv_count_df.head()}\n MIV dtypes: {miv_count_df.dtypes}')
miv_file_path = os.path.join(integrated_dir, 'MivCount.csv')
logger.debug(f'MIV Cleaned File Path: {miv_file_path}')
miv_count_df.to_csv(miv_file_path, index=False)
logger.info("MIV integrated csv created.")
end_time = time.time()
logger.info(f'Time taken for MivCount: {end_time-start_time2}')
if __name__ == '__main__': if __name__ == '__main__':
acc_df = process_accident_data(True) #process_all_data_sources(True, True, True)
print(acc_df.dtypes) miv_to_integrated_csv()
print(acc_df.head(100))

71
src/setup_tables.sql Normal file
View File

@ -0,0 +1,71 @@
CREATE EXTENSION IF NOT EXISTS postgis;
DROP TABLE IF EXISTS FootBikeCount;
CREATE TABLE FootBikeCount (
ID INTEGER ,
NORD INTEGER ,
OST INT ,
DATE VARCHAR(10) ,
HRS INTEGER ,
VELO_IN INTEGER ,
VELO_OUT INTEGER ,
FUSS_IN INTEGER ,
FUSS_OUT INTEGER ,
Weekday_en VARCHAR(10) ,
PRIMARY KEY(ID) ,
CHECK (Weekday_en IN ('Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday')),
CHECK (Hrs BETWEEN 0 AND 23)
);
DROP TABLE IF EXISTS MivCount;
CREATE TABLE MivCount (
MSID VARCHAR(256) ,
ZSID VARCHAR(256) ,
Achse VARCHAR(256) ,
NKoord INTEGER ,
EKoord INTEGER ,
Richtung VARCHAR(10) ,
AnzFahrzeuge INTEGER ,
AnzFahrzeugeStatus VARCHAR(20) ,
Datum VARCHAR(10) ,
Hrs Integer ,
Weekday_en VARCHAR(10),
PRIMARY KEY (MSID),
CHECK (Weekday_en IN ('Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday')),
CHECK (Hrs BETWEEN 0 AND 23)
);
DROP TABLE IF EXISTS Accidents;
CREATE TABLE Accidents (
AccidentUID VARCHAR(32) ,
AccidentYear INTEGER ,
AccidentMonth INTEGER,
AccidentWeekDay_en VARCHAR(10) ,
AccidentHour INTEGER ,
NKoord INTEGER ,
EKoord INTEGER ,
AccidentType_en VARCHAR(256) ,
AccidentType VARCHAR(4) ,
AccidentSeverityCategory VARCHAR(4) ,
AccidentInvolvingPedestrian BOOLEAN ,
AccidentInvolvingBicycle BOOLEAN ,
AccidentInvolvingMotorcycle BOOLEAN ,
RoadType VARCHAR(5) ,
RoadType_en VARCHAR(256) ,
Geometry geometry(Point) ,
PRIMARY KEY (AccidentUID) ,
CHECK ( AccidentHour BETWEEN 0 AND 23) ,
CHECK (AccidentWeekDay_en IN ('Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday'))
);
COPY FootBikeCount FROM '/Users/seb/Projects/repos/group-1/src/datasets/integrated/FootBikeCount.csv'
DELIMITER ','
CSV HEADER;

View File

@ -227,7 +227,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 8, "execution_count": 10,
"outputs": [ "outputs": [
{ {
"name": "stdout", "name": "stdout",
@ -245,43 +245,43 @@
"16699188 2682704.50\n", "16699188 2682704.50\n",
"16699189 2682704.50\n", "16699189 2682704.50\n",
"Name: EKoord, Length: 16699190, dtype: float64\n", "Name: EKoord, Length: 16699190, dtype: float64\n",
"Acc unique: 0 2684605\n", "Acc unique: <bound method Series.unique of 0 rt433\n",
"1 2682382\n", "1 rt433\n",
"2 2682791\n", "2 rt439\n",
"3 2681199\n", "3 rt433\n",
"4 2682479\n", "4 rt433\n",
" ... \n", " ... \n",
"55821 2682244\n", "55821 rt432\n",
"55822 2680029\n", "55822 rt433\n",
"55823 2684990\n", "55823 rt433\n",
"55824 2678025\n", "55824 rt433\n",
"55825 2684500\n", "55825 rt432\n",
"Name: EKoord, Length: 55826, dtype: object\n", "Name: RoadType, Length: 55826, dtype: object>\n",
"FB unique: 0 2678956\n", "FB unique: 0 2012-01-01\n",
"1 2678956\n", "1 2012-01-01\n",
"2 2678956\n", "2 2012-01-01\n",
"3 2678956\n", "3 2012-01-01\n",
"4 2678956\n", "4 2012-01-01\n",
" ... \n", " ... \n",
"3011488 2684578\n", "3011488 2019-07-13\n",
"3011489 2684578\n", "3011489 2019-07-13\n",
"3011490 2684578\n", "3011490 2019-07-13\n",
"3011491 2684578\n", "3011491 2019-07-13\n",
"3011492 2684578\n", "3011492 2019-07-13\n",
"Name: OST, Length: 3011493, dtype: int64\n" "Name: DATE, Length: 3011493, dtype: object\n"
] ]
} }
], ],
"source": [ "source": [
"print(\"MIV unqiue:\", miv_df['EKoord'])\n", "print(\"MIV unqiue:\", miv_df['EKoord'])\n",
"print(\"Acc unique:\", acc_df['EKoord'])\n", "print(\"Acc unique:\", acc_df['RoadType'].unique)\n",
"print(\"FB unique: \", fb_data['OST'])\n" "print(\"FB unique: \", fb_data['DATE'])\n"
], ],
"metadata": { "metadata": {
"collapsed": false, "collapsed": false,
"ExecuteTime": { "ExecuteTime": {
"end_time": "2023-12-03T12:33:32.280058Z", "end_time": "2023-12-03T15:03:13.580284Z",
"start_time": "2023-12-03T12:33:32.275419Z" "start_time": "2023-12-03T15:03:13.574959Z"
} }
}, },
"id": "f6d752ea17eda341" "id": "f6d752ea17eda341"