Files
SRS_evac_sim/archive/source/test/_2_testing.py
Varyngoth 676659e5b9 First
2026-01-28 13:31:49 -04:00

387 lines
13 KiB
Python

import jupedsim as jps
import shapely
from dataclasses import dataclass
from typing import List, Dict, Tuple
import numpy as np
import matplotlib
matplotlib.use('QtAgg')
import matplotlib.pyplot as plt
from PyQt6 import QtWidgets
import sys
import config
sys.path.insert(0,str(config.GEO_DIR))
from geo_current import geo_current
@dataclass
class AgentSetup:
id:int
grade:str
door:int
speed:float
radius:float
spawn:Tuple[float,float]
@dataclass
class SimSetup:
doorways:Dict[int,shapely.Polygon]
grades:Dict[str,Dict]
min_spacing:float=0.6
total_sim_time:float=200.0
door_capacity:int=10
walkable_area:shapely.Polygon
exit_area:shapely.Polygon
class EvacSim:
def __init__(self,setup:SimSetup):
self.setup = setup
self.all_agents = []
self.all_spawn_events = []
self.simulation = None
self.exit_id = None
self.doorway_system = {}
def run(self):
self.all_agents = self.agent_params()
self.setup_sim_env()
self.spawn_events = self.get_spawn_events()
self.run_sim()
def agent_params(self)->List[AgentSetup]:
agent_id = 1
rng = np.random.default_rng(seed=42)
all_agents = []
for grade in self.setup.grades.keys():
spawn_time = rng.uniform(0.0,115.0)
self.setup.grades[grade]["Spawn Time"] = spawn_time
gr_agent_num = int(self.setup.grades[grade]["Pop Current"])
door = int(self.setup.grades[grade]["Door"])
current_agent = 0
for num in range(gr_agent_num):
speed = rng.normal(
loc=self.setup.grades[grade]["Speed Mean"],
scale=self.setup.grades[grade]["Speed Std Dev"],
size=1)
radius = self.setup.grades[grade]["Radius"]
new_agent = AgentSetup(
id=agent_id,
grade=grade,
door=door,
speed=speed,
radius = radius,
)
all_agents.append(new_agent)
agent_id += 1
current_agent += 1
return all_agents
def setup_sim_env(self):
walkable_area = self.setup.walkable_area
model = jps.CollisionFreeSpeedModel()
self.simulation = jps.Simulation(
model=model,geometry=walkable_area)
self.exit_id = self.simulation.add_exit_stage(
self.setup.exit_polygon)
def doorway_system(self, door_id: int, door_polygon: shapely.Polygon):
def get_waiting_area(self,door_polygon:shapely.Polygon)->shapely.Polygon:
waiting_area = door_polygon.buffer(2.0, join_style=2)
waiting_area = waiting_area.difference(door_polygon)
if waiting_area.geom_type == 'MultiPolygon':
waiting_area = max(waiting_area.geoms, key=lambda p: p.area)
return waiting_area
waiting_area = get_waiting_area(door_polygon)
waiting_set_id = self.simulation.add_waiting_set_stage(waiting_area)
door_centroid = door_polygon.centroid
queue_waypoints = [
self.simulation.add_waypoint_stage((door_centroid.x, door_centroid.y - 1.0), 0.5),
self.simulation.add_waypoint_stage((door_centroid.x, door_centroid.y), 0.5),
self.simulation.add_waypoint_stage((door_centroid.x, door_centroid.y + 1.0), 0.5)
]
journey_stages = [waiting_set_id] + queue_waypoints + [self.exit_id]
journey = jps.JourneyDescription(journey_stages)
journey_id = self.simulation.add_journey(journey)
self.doorway_info[door_id] = {
"waiting_area": waiting_area,
"waiting_set_id": waiting_set_id,
"queue_waypoints": queue_waypoints,
"journey_id": journey_id,
"door_polygon": door_polygon
}
for door_id, door_polygon in self.setup.doorways.items():
self.doorway_system(door_id, door_polygon)
def get_spawn_events(self)->List[Dict]:
events = []
agents_by_grade = {}
def get_spawn_point(self,door:int,num_points:int)->List[Tuple[float,float]]:
polygon = self.setup.doorways[door]
min_x,min_y,max_x,max_y = polygon.bounds
points = []
attempts = 0
max_attempts = num_points * 100
while len(points) < num_points and attempts < max_attempts:
x = random.uniform(min_x, max_x)
y = random.uniform(min_y, max_y)
point = shapely.Point(x, y)
if polygon.contains(point):
too_close = False
for existing in points:
if np.sqrt((x - existing[0])**2 + (y - existing[1])**2) < self.setup.min_spacing:
too_close = True
break
if not too_close:
points.append((x, y))
attempts += 1
return points[:num_points]
for agent in self.all_agents:
for grade_name, grade_info in self.setup.grades.items():
if agent.door == grade_info["Door"]:
if grade_name not in agents_by_grade:
agents_by_grade[grade_name] = []
agents_by_grade[grade_name].append(agent)
break
for grade_name, grade_info in self.setup.grades.items():
door_id = grade_info["Door"]
spawn_time = grade_info["Spawn Time"]
grade_agents = agents_by_group.get(grade_name, [])
if not grade_agents:
continue
door_polygon = self.setup.doorways[door_id]
spawn_positions = self.get_spawn_point(
door_polygon,
len(group_agents))
for agent, position in zip(group_agents, spawn_positions):
events.append({
"time": spawn_time,
"agent": agent,
"position": position,
"grade": grade_name,
"door": door_id
})
events.sort(key=lambda x: x["time"])
return events
def run_sim(self):
spawned_event_indices = set()
agents_in_door_area = {door_id: 0 for door_id in self.config.door_polygons.keys()}
event_index = 0
print("\nStarting Simulation Loop...")
print(f"Total Simulation Time: {self.config.total_simulation_time}s")
print(f"Door Capacity: {self.config.door_capacity} agents per door")
while self.simulation.elapsed_time() < self.config.total_simulation_time:
current_time = self.simulation.elapsed_time()
self._process_spawn_events(
current_time,
event_index,
spawned_event_indices,
agents_in_door_area)
while (event_index < len(self.spawn_events) and \
self.spawn_events[event_index]["time"] <= current_time and \
event_index in spawned_event_indices):
event_index += 1
self.simulation.iterate()
print(f"\nSimulation completed at {self.simulation.elapsed_time():.2f} seconds")
def process_spawn_events(
self,
current_time: float,
event_index: int,
spawned_events: set,
agents_in_door_area: Dict
):
while (event_idx < len(self.spawn_events) and \
self.spawn_events[event_idx]["time"] <= current_time and \
event_idx not in spawned_events):
event = self.spawn_events[event_idx]
door_id = event["door"]
agent = event["agent"]
if agents_in_door_area[door_id] < self.setup.door_capacity:
self.spawn_agent(event,door_id,agent)
agents_in_door_area[door_id] += 1
spawned_events.add(event_idx)
event_index += 1
def spawn_agent(self,event:Dict,door_id:int,agent:AgentSetup):
journey_id = self.doorway_systems[door_id]["journey_id"]
agent_params = jps.CollisionFreeSpeedModelAgentParameters(
position=event["position"],
journey_id=journey_id,
stage_id=self.doorway_system[door_id]["waiting_set_id"],
radius=agent.radius,
v0=agent.speed,
)
agent_id = self.simulation.add_agent(agent_params)
# Optional: Log spawning
if agent_id % 50 == 0: # Log every 50th agent
print(f" Spawned agent {agent_id} (group: {event['group']}, door: {door_id})")
def start_sim_run():
print("Evacuation Simulation")
print("-" * 40)
geometry,[door0,door1,door2,exit_door] = geo_current(full_plot = True)
door_polygons = {
1: door0,
2: door1,
3: door2
}
grade_data = {
"Kindergarden":{
"Door":0,
"Pop Current":34,
"Pop Mean":31.43,
"Pop Std Dev":5.65,
"Speed Mean":1.21,
"Speed Std Dev":0.24,
"Radius":0.407,
"Spawn Time":None
},
"Grade 1":{
"Door":0,
"Pop Current":26,
"Pop Mean":32.57,
"Pop Std Dev":6.27,
"Speed Mean":1.35,
"Speed Std Dev":0.26,
"Radius":0.407,
"Spawn Time":None
},
"Grade 2":{
"Door":0,
"Pop Current":42,
"Pop Mean":34.43,
"Pop Std Dev":6.80,
"Speed Mean":1.42,
"Speed Std Dev":0.28,
"Radius":0.407,
"Spawn Time":None
},
"Grade 3":{
"Door":0,
"Pop Current":39,
"Pop Mean":35.43,
"Pop Std Dev":5.19,
"Speed Mean":1.48,
"Speed Std Dev":0.23,
"Radius":0.407,
"Spawn Time":None
},
"Grade 4":{
"Door":1,
"Pop Current":30,
"Pop Mean":34.86,
"Pop Std Dev":6.77,
"Speed Mean":1.58,
"Speed Std Dev":0.26,
"Radius":0.417,
"Spawn Time":None
},
"Grade 5":{
"Door":1,
"Pop Current":43,
"Pop Mean":36.71,
"Pop Std Dev":7.09,
"Speed Mean":1.59,
"Speed Std Dev":0.24,
"Radius":0.434,
"Spawn Time":None
},
"Grade 6":{
"Door":1,
"Pop Current":29,
"Pop Mean":37.71,
"Pop Std Dev":6.99,
"Speed Mean":1.65,
"Speed Std Dev":0.24,
"Radius":0.454,
"Spawn Time":None
},
"Grade 7":{
"Door":2,
"Pop Current":45,
"Pop Mean":40.43,
"Pop Std Dev":6.02,
"Speed Mean":1.61,
"Speed Std Dev":0.25,
"Radius":0.471,
"Spawn Time":None
},
"Grade 8":{
"Door":2,
"Pop Current":36,
"Pop Mean":40.43,
"Pop Std Dev":5.50,
"Speed Mean":1.66,
"Speed Std Dev":0.24,
"Radius":0.488,
"Spawn Time":None
},
"Grade 9":{
"Door":2,
"Pop Current":44,
"Pop Mean":44.14,
"Pop Std Dev":4.85,
"Speed Mean":1.60,
"Speed Std Dev":0.24,
"Radius":0.500,
"Spawn Time":None
},
"Grade 10":{
"Door":2,
"Pop Current":36,
"Pop Mean":46.29,
"Pop Std Dev":6.29,
"Speed Mean":1.57,
"Speed Std Dev":0.23,
"Radius":0.507,
"Spawn Time":None
},
"Grade 11":{
"Door":2,
"Pop Current":54,
"Pop Mean":48.29,
"Pop Std Dev":3.30,
"Speed Mean":1.51,
"Speed Std Dev":0.22,
"Radius":0.515,
"Spawn Time":None
},
"Grade 12":{
"Door":2,
"Pop Current":46,
"Pop Mean":43.71,
"Pop Std Dev":6.02,
"Speed Mean":1.54,
"Speed Std Dev":0.23,
"Radius":0.520,
"Spawn Time":None
}}
config = SimSetup(
doorways=door_polygons,
grades=grade_data,
total_simulation_time=180.0,
door_capacity=10,
walkable_area=geometry,
exit_area=exit_door
)
sim = EvacSim(config)
return sim.run()
if __name__ == "__main__":
simulation = start_sim_run()
print(f"\nFinal simulation state:")
print(f" Elapsed time: {simulation.elapsed_time():.2f}s")