FINAL TOUCH: Add functions to get data from api link.
Refactor bash script into python function to ensure portability. Add sql queries to create "Contemporaneous" db table.
This commit is contained in:
parent
b79ee792b6
commit
e9b1d82517
@ -1,3 +1,4 @@
|
|||||||
|
import json
|
||||||
import os
|
import os
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
import requests
|
import requests
|
||||||
@ -115,6 +116,17 @@ def create_unified_df(urls_file, u_string, data_dir, files_present=False):
|
|||||||
return df_unified
|
return df_unified
|
||||||
|
|
||||||
|
|
||||||
|
def load_file_from_api(api_link, target_name, integrated_dir):
|
||||||
|
response = requests.get(api_link)
|
||||||
|
final_location = os.path.join(integrated_dir, target_name)
|
||||||
|
if response.status_code == 200:
|
||||||
|
logger.info(f"Succesfull get from {api_link}")
|
||||||
|
data = response.json()
|
||||||
|
with open(f'{final_location}.geojson', 'w') as file:
|
||||||
|
json.dump(data, file)
|
||||||
|
logger.info(f"{api_link} successfully downloaded and saved to {final_location}")
|
||||||
|
else:
|
||||||
|
logger.critical(f"Failed to get data. Status Code: {response.status_code}")
|
||||||
def save_dataframe_to_csv(df, integrated_dir, filename):
|
def save_dataframe_to_csv(df, integrated_dir, filename):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|||||||
@ -57,6 +57,24 @@ def run_geojson_loader_script(script, *args):
|
|||||||
logger.info(f"Remember to set the correct permissions for the script: chmod +x {script}")
|
logger.info(f"Remember to set the correct permissions for the script: chmod +x {script}")
|
||||||
|
|
||||||
|
|
||||||
|
def geojson_loader(*args):
|
||||||
|
geojson_file, db_name, db_user, db_password, db_host, db_port, target_table = args
|
||||||
|
cmd = [
|
||||||
|
"ogr2ogr",
|
||||||
|
"-f", "PostgreSQL",
|
||||||
|
f"PG:dbname='{db_name}' host='{db_host}' port='{db_port}' user='{db_user}' password='{db_password}'",
|
||||||
|
geojson_file,
|
||||||
|
"-nln", target_table,
|
||||||
|
"-append"
|
||||||
|
]
|
||||||
|
try:
|
||||||
|
# Run the command
|
||||||
|
res = subprocess.run(cmd, check=True, text=True, capture_output=True)
|
||||||
|
logger.info(f"ogr2ogr command executed successfully. Output: {res.stdout}")
|
||||||
|
except subprocess.CalledProcessError as e:
|
||||||
|
logger.exception(f"Error executing ogr2ogr command: {e}")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
run_sql(setup_tables_script, db_info)
|
run_sql(setup_tables_script, db_info)
|
||||||
logger.info("Finnished setting up tables.")
|
logger.info("Finnished setting up tables.")
|
||||||
|
|||||||
@ -1,35 +0,0 @@
|
|||||||
import os
|
|
||||||
import pandas as pd
|
|
||||||
from sqlalchemy import create_engine
|
|
||||||
|
|
||||||
integrated_dir = 'datasets/integrated/'
|
|
||||||
|
|
||||||
# Set up info needed to connect to db
|
|
||||||
db_info = {
|
|
||||||
'host': 'localhost',
|
|
||||||
'database': 'test-db23',
|
|
||||||
'port': '5432',
|
|
||||||
'user': 'seb',
|
|
||||||
'password': '',
|
|
||||||
}
|
|
||||||
|
|
||||||
csv_table_maps = [
|
|
||||||
{'file': os.path.join(integrated_dir, 'FootBikeCount.csv'), 'table': 'FootBikeCount'},
|
|
||||||
{'file': os.path.join(integrated_dir, 'MivCount.csv'), 'table': 'MivCount'}
|
|
||||||
]
|
|
||||||
|
|
||||||
# Create a SQLAlchemy engine
|
|
||||||
engine = create_engine(
|
|
||||||
f"postgresql://{db_info['user']}:{db_info['password']}@{db_info['host']}:{db_info['port']}/{db_info['database']}",
|
|
||||||
echo=True # Set echo to True to display SQL queries (optional)
|
|
||||||
)
|
|
||||||
|
|
||||||
def csv_to_existing_table(csv_file_path, table_name):
|
|
||||||
df = pd.read_csv(csv_file_path)
|
|
||||||
df.to_sql(table_name, engine, if_exists='append', index=False)
|
|
||||||
|
|
||||||
for i in csv_table_maps:
|
|
||||||
csv_to_existing_table(i['file'], i['table'])
|
|
||||||
|
|
||||||
# Close the SQLAlchemy engine
|
|
||||||
engine.dispose()
|
|
||||||
@ -29,6 +29,8 @@ data_dir = 'datasets/'
|
|||||||
integrated_dir = 'datasets/integrated/'
|
integrated_dir = 'datasets/integrated/'
|
||||||
logs_dir = 'logs/'
|
logs_dir = 'logs/'
|
||||||
|
|
||||||
|
signaled_speeds_json_api = 'https://www.ogd.stadt-zuerich.ch/wfs/geoportal/Signalisierte_Geschwindigkeiten?service=WFS&version=1.1.0&request=GetFeature&outputFormat=GeoJSON&typename=view_geoserver_tempo_ist'
|
||||||
|
|
||||||
weekday_names = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
|
weekday_names = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
|
||||||
|
|
||||||
fb_data_types = {
|
fb_data_types = {
|
||||||
@ -233,6 +235,10 @@ def acc_to_cleaned_geojson(acc_present=True):
|
|||||||
logger.info(f'Time taken for Accidents: {end_time - start_time3}')
|
logger.info(f'Time taken for Accidents: {end_time - start_time3}')
|
||||||
|
|
||||||
|
|
||||||
|
def load_tempo_geojson_from_api_to_local():
|
||||||
|
du.load_file_from_api(signaled_speeds_json_api, 'signaled_speeds.geojson', integrated_dir)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
process_all_data_sources(True, True, True)
|
process_all_data_sources(True, True, True)
|
||||||
# miv_to_integrated_csv()
|
# miv_to_integrated_csv()
|
||||||
|
|||||||
42
src/queries.sql
Normal file
42
src/queries.sql
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
select p.id, a.accidentuid, m.id
|
||||||
|
from footbikecount p, accidents a, mivcount m
|
||||||
|
where p.weekday_en = a.accidentweekday_en AND a.accidentweekday_en = m.weekday_en
|
||||||
|
AND p.weekday_en = m.weekday_en AND p.hrs = a.accidenthour AND a.accidenthour = m.hrs
|
||||||
|
AND p.hrs = m.hrs AND (p.ost - m.ekoord between -100 AND 100) AND (p.nord - m.nkoord between -100 AND 100);
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS Contemporaneous2;
|
||||||
|
|
||||||
|
CREATE TABLE Contemporaneous2 (
|
||||||
|
p_id INTEGER,
|
||||||
|
accidentuid VARCHAR(256),
|
||||||
|
m_id INTEGER,
|
||||||
|
weekday_en VARCHAR(10),
|
||||||
|
hrs INTEGER,
|
||||||
|
distance DOUBLE PRECISION
|
||||||
|
);
|
||||||
|
|
||||||
|
|
||||||
|
CREATE TABLE Intermediate2 AS
|
||||||
|
SELECT
|
||||||
|
p.id AS p_id,
|
||||||
|
a.accidentuid,
|
||||||
|
m.id AS m_id,
|
||||||
|
p.weekday_en,
|
||||||
|
p.hrs,
|
||||||
|
SQRT(POWER(p.ost - m.ekoord, 2) + POWER(p.nord - m.nkoord, 2)) AS distance
|
||||||
|
FROM
|
||||||
|
footbikecount p,
|
||||||
|
accidents a,
|
||||||
|
mivcount m
|
||||||
|
WHERE
|
||||||
|
p.weekday_en = a.accidentweekday_en
|
||||||
|
AND a.accidentweekday_en = m.weekday_en
|
||||||
|
AND p.weekday_en = m.weekday_en
|
||||||
|
AND p.hrs = a.accidenthour
|
||||||
|
AND a.accidenthour = m.hrs
|
||||||
|
AND p.hrs = m.hrs
|
||||||
|
AND (p.ost - m.ekoord BETWEEN -100 AND 100)
|
||||||
|
AND (p.nord - m.nkoord BETWEEN -100 AND 100);
|
||||||
|
|
||||||
|
INSERT INTO Contemporaneous2 (p_id, accidentuid, m_id, weekday_en, hrs, distance)
|
||||||
|
SELECT p_id, accidentuid, m_id, weekday_en, hrs, distance FROM Intermediate2;
|
||||||
Reference in New Issue
Block a user