# -*- coding: utf-8 -*- """ Created on Sat Nov 8 22:59:23 2025 @author: ethan """ #! /usr/bin/env python3 # SPDX-License-Identifier: LGPL-3.0-or-later import numpy as np import pandas as pd import pathlib import jupedsim as jps from shapely import Polygon from shapely.plotting import plot_polygon from collections import deque def main(): grades = { # mean and standard deviation for # average population size # and average speed by grade levels "Kindergarden":np.array([31.43,5.65,1.21,0.24]), "Grade 1":np.array([32.57,6.27,1.35,0.26]), "Grade 2":np.array([34.43,6.80,1.42,0.28]), "Grade 3":np.array([35.43,5.19,1.48,0.23]), "Grade 4":np.array([34.86,6.77,1.58,0.26]), "Grade 5":np.array([36.71,7.09,1.59,0.24]), "Grade 6":np.array([37.71,6.99,1.65,0.24]), "Grade 7":np.array([40.43,6.02,1.61,0.25]), "Grade 8":np.array([40.43,5.50,1.66,0.24]), "Grade 9":np.array([44.14,4.85,1.60,0.24]), "Grade 10":np.array([46.29,6.29,1.57,0.23]), "Grade 11":np.array([48.29,3.30,1.51,0.22]), "Grade 12":np.array([43.71,6.02,1.54,0.23]) } df=pd.DataFrame({ "Grade Level":( list(grades.keys())), "Pop Mean":[ grades[j][0] for j in grades], "Pop Std Dev":[ grades[j][1] for j in grades], "Speed Mean":[ grades[j][2] for j in grades], "Speed Std Dev":[ grades[j][3] for j in grades]}) rng = np.random.default_rng(seed=42) A_crosswalk=Polygon([ (-1,1.499),(-1,3.327), (0,3.327),(11.214, 3.327), (11.214,1.499),(0,1.499)]) B_queue=Polygon([ (11.214, 0),(22.163, 0), (22.163, 4.826),(11.214, 4.826)]) C_road_adj_path=Polygon([ (21.787,4.826),(24.214,4.826), (24.214,40.431),(29.179,40.431), (29.179,42.511),(24.214,42.511), (21.787,42.511)]) D_path_k_3=Polygon([ (26.45,42.511),(26.45,52.84), (26.45,53.84),(29.179,53.84), (29.179,52.84),(29.179,42.511)]) E_path_4_6=Polygon([ (29.179,42.511),(54.351,42.511), (60.406,48.842),(60.406,51.22), (60.406,52.22),(63.49,52.22), (63.49,51.22),(63.49,47.866), (56.381,40.431),(29.179,40.431)]) F_path_7_12=Polygon([ (22.163, 0),(39.227, 5.516), (39.631, 4.267),(39.939,3.315), (45.099,4.983),(44.792,5.935), (43.169,10.954),(24.214,4.826), (22.163,4.826)]) G_extended_queue=Polygon([ (11.214,0),(12.924,0), (12.924,-4.569),(11.214,-4.569)]) H_angled_path=Polygon([ (21.787,13.192),(21.787,10.527), (17,4.826),(14.767,4.826)]) enter_k_3=Polygon([ (26.45,52.84),(29.179,52.84), (29.179,53.84),(26.45,53.84)]) enter_4_6=Polygon([ (60.406,51.22),(60.406,52.22), (63.49,52.22),(63.49,51.22)]) enter_7_12=Polygon([ (39.631, 4.267),(39.939,3.315), (45.099,4.983),(44.792,5.935)]) exit_polygon=Polygon([ (0,1.499),(0,3.327), (-1,3.327),(-1,1.499)]) plot_polygon( A_crosswalk,color="black",add_points=False) plot_polygon( B_queue,color="black",add_points=False) plot_polygon( C_road_adj_path, color="blue",add_points=False) plot_polygon( D_path_k_3, color="blue",add_points=False) plot_polygon( E_path_4_6, color="blue",add_points=False) plot_polygon( F_path_7_12, color="blue",add_points=False) plot_polygon( G_extended_queue, color="black",add_points=False) plot_polygon( H_angled_path, color="black",add_points=False) plot_polygon( enter_k_3, color="darkgreen",add_points=False) plot_polygon( enter_4_6, color="darkgreen",add_points=False) plot_polygon( enter_7_12, color="darkgreen",add_points=False) plot_polygon( exit_polygon, color="orangered",add_points=False) geometry = (A_crosswalk.union( B_queue).union( C_road_adj_path).union( D_path_k_3).union( E_path_4_6).union( F_path_7_12).union( G_extended_queue).union( H_angled_path)) trajectory_file = "SRS_evac.sqlite" simulation = jps.Simulation( model=jps.AnticipationVelocityModel(), geometry=geometry, trajectory_writer=jps.SqliteTrajectoryWriter( output_file=pathlib.Path(trajectory_file))) exit_id = simulation.add_exit_stage(exit_polygon) journey = jps.JourneyDescription([exit_id]) journey_id = simulation.add_journey(journey) grade_pop = {} door = {} door_polygon = { "K-3":enter_k_3, "4-6":enter_4_6, "7-12":enter_7_12, } platoon_agents = {} for i, grade in enumerate(df["Grade Level"]): grade_sample=rng.normal( loc=df["Pop Mean"][i], scale=df["Pop Std Dev"][i],size=1) grade_pop[grade] = int(np.ceil(grade_sample[0])) x = grade_pop[grade] if i < 4: door[grade] = "K-3" elif i <7: door[grade] = "4-6" else: door[grade] = "7-12" platoon_a_size = int(x/2) platoon_b_size = x - platoon_a_size platoon_a_id = (2*(i+1))-1 platoon_b_id = (2*(i+1)) platoon_agents[platoon_a_id] ={ "Grade Level": grade, "Platoon Size": platoon_a_size, "Entry Door":door_polygon[door[grade]] } platoon_agents[platoon_b_id] ={ "Grade Level": grade, "Platoon Size": platoon_b_size, "Entry Door":door_polygon[door[grade]] } spawned_total=0 #============================================================ agent_set = [] for platoon_id, platoon_data in platoon_agents.items(): spawn_time=float( rng.uniform(5,15)+rng.uniform(0,120)) spawn_time=min(spawn_time,120) remaining=int(platoon_data["Platoon Size"]) attempts = 0 max_attempts = 50 time_delay = 1.0 batch_size=max(1,min(10,remaining)) while remaining>0 and attempts0: attempts+=1 spawn_time+=time_delay attempts=0 batch_size=max(1,min(10,remaining)) except Exception as e: print( f"Error placing platoon {platoon_id}: {e}") print("Reducing batch and retrying") attempts+=1 batch_size=max(1,batch_size//2) spawn_time+=time_delay spawned_total+=placed_count #========================================= pending=sorted(agent_set,key=lambda a:a["Spawn Time"]) pending=deque(pending) max_iterations=20000 max_agents_per_step=50 spawn_tolerance=1e-8 while ((simulation.agent_count()>0 or len(pending)>0) and simulation.iteration_count()= max_retry: print(f"\n\nMax Retries:{max_retry}") break adj_pos=(pos[0]+rng.uniform(-0.1,0.1), pos[1]+rng.uniform(-0.1,0.1)) agent_params.position =adj_pos simulation.iterate() iter_count = simulation.iteration_count() print(f"\nTime Step: {iter_count}") print(f"Total Agents: {spawned_total}") print(f"Platoon: {platoon_agents.items()}") print("\nSimulation Completed!") print(f"Total Time Steps: {simulation.iteration_count()}") print(f"Elapsed Time: {simulation.elapsed_time()}") print(f"{trajectory_file = }") if __name__ == "__main__": main()