Files
SRS_evac_sim/archive/SRS_modeling_2025-11-09.py
Varyngoth 676659e5b9 First
2026-01-28 13:31:49 -04:00

287 lines
10 KiB
Python

# -*- coding: utf-8 -*-
"""
Created on Sat Nov 8 22:59:23 2025
@author: ethan
"""
#! /usr/bin/env python3
# SPDX-License-Identifier: LGPL-3.0-or-later
import numpy as np
import pandas as pd
import pathlib
import jupedsim as jps
from shapely import Polygon
from shapely.plotting import plot_polygon
from collections import deque
def main():
grades = {
# mean and standard deviation for
# average population size
# and average speed by grade levels
"Kindergarden":np.array([31.43,5.65,1.21,0.24]),
"Grade 1":np.array([32.57,6.27,1.35,0.26]),
"Grade 2":np.array([34.43,6.80,1.42,0.28]),
"Grade 3":np.array([35.43,5.19,1.48,0.23]),
"Grade 4":np.array([34.86,6.77,1.58,0.26]),
"Grade 5":np.array([36.71,7.09,1.59,0.24]),
"Grade 6":np.array([37.71,6.99,1.65,0.24]),
"Grade 7":np.array([40.43,6.02,1.61,0.25]),
"Grade 8":np.array([40.43,5.50,1.66,0.24]),
"Grade 9":np.array([44.14,4.85,1.60,0.24]),
"Grade 10":np.array([46.29,6.29,1.57,0.23]),
"Grade 11":np.array([48.29,3.30,1.51,0.22]),
"Grade 12":np.array([43.71,6.02,1.54,0.23])
}
df=pd.DataFrame({
"Grade Level":(
list(grades.keys())),
"Pop Mean":[
grades[j][0] for j in grades],
"Pop Std Dev":[
grades[j][1] for j in grades],
"Speed Mean":[
grades[j][2] for j in grades],
"Speed Std Dev":[
grades[j][3] for j in grades]})
rng = np.random.default_rng(seed=42)
A_crosswalk=Polygon([
(-1,1.499),(-1,3.327),
(0,3.327),(11.214, 3.327),
(11.214,1.499),(0,1.499)])
B_queue=Polygon([
(11.214, 0),(22.163, 0),
(22.163, 4.826),(11.214, 4.826)])
C_road_adj_path=Polygon([
(21.787,4.826),(24.214,4.826),
(24.214,40.431),(29.179,40.431),
(29.179,42.511),(24.214,42.511),
(21.787,42.511)])
D_path_k_3=Polygon([
(26.45,42.511),(26.45,52.84),
(26.45,53.84),(29.179,53.84),
(29.179,52.84),(29.179,42.511)])
E_path_4_6=Polygon([
(29.179,42.511),(54.351,42.511),
(60.406,48.842),(60.406,51.22),
(60.406,52.22),(63.49,52.22),
(63.49,51.22),(63.49,47.866),
(56.381,40.431),(29.179,40.431)])
F_path_7_12=Polygon([
(22.163, 0),(39.227, 5.516),
(39.631, 4.267),(39.939,3.315),
(45.099,4.983),(44.792,5.935),
(43.169,10.954),(24.214,4.826),
(22.163,4.826)])
G_extended_queue=Polygon([
(11.214,0),(12.924,0),
(12.924,-4.569),(11.214,-4.569)])
H_angled_path=Polygon([
(21.787,13.192),(21.787,10.527),
(17,4.826),(14.767,4.826)])
enter_k_3=Polygon([
(26.45,52.84),(29.179,52.84),
(29.179,53.84),(26.45,53.84)])
enter_4_6=Polygon([
(60.406,51.22),(60.406,52.22),
(63.49,52.22),(63.49,51.22)])
enter_7_12=Polygon([
(39.631, 4.267),(39.939,3.315),
(45.099,4.983),(44.792,5.935)])
exit_polygon=Polygon([
(0,1.499),(0,3.327),
(-1,3.327),(-1,1.499)])
plot_polygon(
A_crosswalk,color="black",add_points=False)
plot_polygon(
B_queue,color="black",add_points=False)
plot_polygon(
C_road_adj_path, color="blue",add_points=False)
plot_polygon(
D_path_k_3, color="blue",add_points=False)
plot_polygon(
E_path_4_6, color="blue",add_points=False)
plot_polygon(
F_path_7_12, color="blue",add_points=False)
plot_polygon(
G_extended_queue, color="black",add_points=False)
plot_polygon(
H_angled_path, color="black",add_points=False)
plot_polygon(
enter_k_3, color="darkgreen",add_points=False)
plot_polygon(
enter_4_6, color="darkgreen",add_points=False)
plot_polygon(
enter_7_12, color="darkgreen",add_points=False)
plot_polygon(
exit_polygon, color="orangered",add_points=False)
geometry = (A_crosswalk.union(
B_queue).union(
C_road_adj_path).union(
D_path_k_3).union(
E_path_4_6).union(
F_path_7_12).union(
G_extended_queue).union(
H_angled_path))
trajectory_file = "SRS_evac.sqlite"
simulation = jps.Simulation(
model=jps.AnticipationVelocityModel(),
geometry=geometry,
trajectory_writer=jps.SqliteTrajectoryWriter(
output_file=pathlib.Path(trajectory_file)))
exit_id = simulation.add_exit_stage(exit_polygon)
journey = jps.JourneyDescription([exit_id])
journey_id = simulation.add_journey(journey)
grade_pop = {}
door = {}
door_polygon = {
"K-3":enter_k_3,
"4-6":enter_4_6,
"7-12":enter_7_12,
}
platoon_agents = {}
for i, grade in enumerate(df["Grade Level"]):
grade_sample=rng.normal(
loc=df["Pop Mean"][i],
scale=df["Pop Std Dev"][i],size=1)
grade_pop[grade] = int(np.ceil(grade_sample[0]))
x = grade_pop[grade]
if i < 4:
door[grade] = "K-3"
elif i <7:
door[grade] = "4-6"
else:
door[grade] = "7-12"
platoon_a_size = int(x/2)
platoon_b_size = x - platoon_a_size
platoon_a_id = (2*(i+1))-1
platoon_b_id = (2*(i+1))
platoon_agents[platoon_a_id] ={
"Grade Level": grade,
"Platoon Size": platoon_a_size,
"Entry Door":door_polygon[door[grade]]
}
platoon_agents[platoon_b_id] ={
"Grade Level": grade,
"Platoon Size": platoon_b_size,
"Entry Door":door_polygon[door[grade]]
}
spawned_total=0
#============================================================
agent_set = []
for platoon_id, platoon_data in platoon_agents.items():
spawn_time=float(
rng.uniform(5,15)+rng.uniform(0,120))
spawn_time=min(spawn_time,120)
remaining=int(platoon_data["Platoon Size"])
attempts = 0
max_attempts = 50
time_delay = 1.0
batch_size=max(1,min(10,remaining))
while remaining>0 and attempts<max_attempts:
n_try = min(batch_size,remaining)
try:
positions = jps.distribute_by_number(
polygon=platoon_data["Entry Door"],
number_of_agents=n_try,
distance_to_agents=0.45,
distance_to_polygon=0.3,
max_iterations=1500)
placed_count = len(positions)
if placed_count ==0:
attempts +=1
spawn_time+=time_delay
batch_size=max(1,batch_size//2)
continue
offset=0.1
for k, pos in enumerate(positions):
speed=float(rng.normal(
loc=df["Speed Mean"][i],
scale=df["Speed Std Dev"][i],
size=1)[0])
agent = {
"Grade Level":
platoon_data["Grade Level"],
"Entry Point":
(door[platoon_data["Grade Level"]]),
"Platoon":platoon_id,
"Position":(
float(pos[0]),float(pos[1])),
"Speed":speed,
"Spawn Time":float(
spawn_time+(k*offset))}
agent_set.append(agent)
remaining-=placed_count
if remaining>0:
attempts+=1
spawn_time+=time_delay
attempts=0
batch_size=max(1,min(10,remaining))
except Exception as e:
print(
f"Error placing platoon {platoon_id}: {e}")
print("Reducing batch and retrying")
attempts+=1
batch_size=max(1,batch_size//2)
spawn_time+=time_delay
spawned_total+=placed_count
#=========================================
pending=sorted(agent_set,key=lambda a:a["Spawn Time"])
pending=deque(pending)
max_iterations=20000
max_agents_per_step=50
spawn_tolerance=1e-8
while ((simulation.agent_count()>0 or len(pending)>0)
and simulation.iteration_count()<max_iterations):
current_time=simulation.elapsed_time()
agents_this_step=0
while (pending and (pending[0]["Spawn Time"]<=(
current_time+spawn_tolerance))
and agents_this_step<max_agents_per_step):
a = pending.popleft()
pos=tuple(a["Position"])
v0 = float(a["Speed"])
v0 = float(np.clip(v0,0.2,2.5))
agent_params = (
jps.AnticipationVelocityModelAgentParameters(
journey_id=journey_id,
stage_id=exit_id,
position=pos,
radius=0.25,
desired_speed=v0,
anticipation_time=0.5,
reaction_time=0.3,
wall_buffer_distance=0.08))
retry=0
max_retry=25
while retry < max_retry:
try:
simulation.add_agent(agent_params)
spawned_total +=1
agents_this_step+=1
break
except Exception as e:
print("Failed: add_agent")
print(f"For: pos={pos}, speed={v0}")
print(f"{e}: Rescheduling...")
retry +=1
if retry >= max_retry:
print(f"\n\nMax Retries:{max_retry}")
break
adj_pos=(pos[0]+rng.uniform(-0.1,0.1),
pos[1]+rng.uniform(-0.1,0.1))
agent_params.position =adj_pos
simulation.iterate()
iter_count = simulation.iteration_count()
print(f"\nTime Step: {iter_count}")
print(f"Total Agents: {spawned_total}")
print(f"Platoon: {platoon_agents.items()}")
print("\nSimulation Completed!")
print(f"Total Time Steps: {simulation.iteration_count()}")
print(f"Elapsed Time: {simulation.elapsed_time()}")
print(f"{trajectory_file = }")
if __name__ == "__main__":
main()