92 lines
3.4 KiB
Python
92 lines
3.4 KiB
Python
import logging
|
|
from shapely.geometry import Point, LineString
|
|
from shapely import wkb
|
|
from shapely.geometry import shape
|
|
from db_connector import RemoteDB
|
|
import pandas as pd
|
|
from pyproj import Proj, transform
|
|
from geopy.distance import geodesic
|
|
|
|
speedLimits = ["T0", "T20", "T30", "T50","T60", "T80", "T100"]
|
|
|
|
|
|
def is_point_near_multilinestring(point, multilinestring, threshold_distance):
|
|
point_geometry = Point(point)
|
|
return point_geometry.distance(multilinestring) < threshold_distance
|
|
|
|
|
|
def calculate_accidents_per_sigspeed(db):
|
|
get_speeds_sql = """
|
|
SELECT wkb_geometry,
|
|
temporegime_technical
|
|
FROM signaled_speeds;
|
|
"""
|
|
|
|
result = db.execute_query(get_speeds_sql)
|
|
sig_speed_df = pd.DataFrame(result)
|
|
sig_speed_df.rename(columns={'wkb_geometry': 'geometry'}, inplace=True)
|
|
sig_speed_df['geometry'] = sig_speed_df['geometry'].apply(lambda x: wkb.loads(x, hex=True))
|
|
|
|
get_accidents = """
|
|
SELECT geometry
|
|
FROM accidents;
|
|
"""
|
|
result = db.execute_query(get_accidents)
|
|
accident_df = pd.DataFrame(result)
|
|
accident_df['geometry'] = accident_df['geometry'].apply(lambda x: wkb.loads(x, hex=True))
|
|
|
|
process_data(sig_speed_df, accident_df)
|
|
|
|
|
|
def process_data(sig_speed_df, accident_df):
|
|
result_df = pd.DataFrame(columns= ['TempoLim', 'Accidents_total', ])
|
|
for speed in speedLimits:
|
|
print("Checking for zone: " + speed)
|
|
filtered_df = sig_speed_df[sig_speed_df["temporegime_technical"].str.contains(speed, case=False, na=False)]
|
|
current_result = count_points_near_multilinestrings(accident_df, filtered_df, 0.00001)
|
|
result_df.loc[len(result_df)] = {'TempoLim': speed, 'Accidents_total': current_result}
|
|
print("FINAL RESULT")
|
|
print(result_df)
|
|
|
|
|
|
def count_points_near_multilinestrings(points_df, multilinestrings_df, threshold_distance):
|
|
result_counts = []
|
|
|
|
for idx, multilinestring_row in multilinestrings_df.iterrows():
|
|
multilinestring = multilinestring_row['geometry']
|
|
count_near = sum(points_df['geometry'].apply(
|
|
lambda point: is_point_near_multilinestring(point, multilinestring, threshold_distance)))
|
|
result_counts.append({'temporegime_technical': multilinestring_row['temporegime_technical'], 'CountNear': count_near})
|
|
result_df = pd.DataFrame(result_counts)
|
|
return result_df['CountNear'].sum()
|
|
|
|
def calculate_sigspeed_length(db):
|
|
for speed in speedLimits:
|
|
get_data_sql = f"""
|
|
SELECT wkb_geometry, temporegime_technical
|
|
FROM signaled_speeds
|
|
WHERE temporegime_technical = '{speed}';
|
|
"""
|
|
|
|
result = db.execute_query(get_data_sql)
|
|
result_df = pd.DataFrame(result)
|
|
result_df['wkb_geometry'] = result_df['wkb_geometry'].apply(lambda x: wkb.loads(x, hex=True))
|
|
sigspeed_length = result_df['wkb_geometry'].apply(lambda x:get_accumulated_distance(x)).sum()
|
|
sigspeed_length = str(round(sigspeed_length * 1000, 2)) + " km"
|
|
print("Length for " + speed + ": " + sigspeed_length)
|
|
|
|
def get_accumulated_distance(coords_str):
|
|
polyline_geometry = shape(coords_str)
|
|
return polyline_geometry.length
|
|
|
|
if __name__ == "__main__":
|
|
remote_db = RemoteDB()
|
|
|
|
try:
|
|
calculate_accidents_per_sigspeed(remote_db)
|
|
calculate_sigspeed_length(remote_db)
|
|
except Exception as e:
|
|
print(f"Exception {e} in calculations.py")
|
|
finally:
|
|
remote_db.close()
|