387 lines
13 KiB
Python
387 lines
13 KiB
Python
import jupedsim as jps
|
|
import shapely
|
|
from dataclasses import dataclass
|
|
from typing import List, Dict, Tuple
|
|
import numpy as np
|
|
import matplotlib
|
|
matplotlib.use('QtAgg')
|
|
import matplotlib.pyplot as plt
|
|
from PyQt6 import QtWidgets
|
|
import sys
|
|
import config
|
|
sys.path.insert(0,str(config.GEO_DIR))
|
|
from geo_current import geo_current
|
|
|
|
@dataclass
|
|
class AgentSetup:
|
|
id:int
|
|
grade:str
|
|
door:int
|
|
speed:float
|
|
radius:float
|
|
spawn:Tuple[float,float]
|
|
|
|
@dataclass
|
|
class SimSetup:
|
|
doorways:Dict[int,shapely.Polygon]
|
|
grades:Dict[str,Dict]
|
|
min_spacing:float=0.6
|
|
total_sim_time:float=200.0
|
|
door_capacity:int=10
|
|
walkable_area:shapely.Polygon
|
|
exit_area:shapely.Polygon
|
|
|
|
class EvacSim:
|
|
def __init__(self,setup:SimSetup):
|
|
self.setup = setup
|
|
self.all_agents = []
|
|
self.all_spawn_events = []
|
|
self.simulation = None
|
|
self.exit_id = None
|
|
self.doorway_system = {}
|
|
|
|
def run(self):
|
|
self.all_agents = self.agent_params()
|
|
self.setup_sim_env()
|
|
self.spawn_events = self.get_spawn_events()
|
|
self.run_sim()
|
|
|
|
def agent_params(self)->List[AgentSetup]:
|
|
agent_id = 1
|
|
rng = np.random.default_rng(seed=42)
|
|
all_agents = []
|
|
for grade in self.setup.grades.keys():
|
|
spawn_time = rng.uniform(0.0,115.0)
|
|
self.setup.grades[grade]["Spawn Time"] = spawn_time
|
|
gr_agent_num = int(self.setup.grades[grade]["Pop Current"])
|
|
door = int(self.setup.grades[grade]["Door"])
|
|
current_agent = 0
|
|
for num in range(gr_agent_num):
|
|
speed = rng.normal(
|
|
loc=self.setup.grades[grade]["Speed Mean"],
|
|
scale=self.setup.grades[grade]["Speed Std Dev"],
|
|
size=1)
|
|
radius = self.setup.grades[grade]["Radius"]
|
|
new_agent = AgentSetup(
|
|
id=agent_id,
|
|
grade=grade,
|
|
door=door,
|
|
speed=speed,
|
|
radius = radius,
|
|
)
|
|
all_agents.append(new_agent)
|
|
agent_id += 1
|
|
current_agent += 1
|
|
return all_agents
|
|
|
|
def setup_sim_env(self):
|
|
walkable_area = self.setup.walkable_area
|
|
model = jps.CollisionFreeSpeedModel()
|
|
self.simulation = jps.Simulation(
|
|
model=model,geometry=walkable_area)
|
|
self.exit_id = self.simulation.add_exit_stage(
|
|
self.setup.exit_polygon)
|
|
def doorway_system(self, door_id: int, door_polygon: shapely.Polygon):
|
|
def get_waiting_area(self,door_polygon:shapely.Polygon)->shapely.Polygon:
|
|
waiting_area = door_polygon.buffer(2.0, join_style=2)
|
|
waiting_area = waiting_area.difference(door_polygon)
|
|
|
|
if waiting_area.geom_type == 'MultiPolygon':
|
|
waiting_area = max(waiting_area.geoms, key=lambda p: p.area)
|
|
return waiting_area
|
|
|
|
waiting_area = get_waiting_area(door_polygon)
|
|
waiting_set_id = self.simulation.add_waiting_set_stage(waiting_area)
|
|
|
|
door_centroid = door_polygon.centroid
|
|
queue_waypoints = [
|
|
self.simulation.add_waypoint_stage((door_centroid.x, door_centroid.y - 1.0), 0.5),
|
|
self.simulation.add_waypoint_stage((door_centroid.x, door_centroid.y), 0.5),
|
|
self.simulation.add_waypoint_stage((door_centroid.x, door_centroid.y + 1.0), 0.5)
|
|
]
|
|
journey_stages = [waiting_set_id] + queue_waypoints + [self.exit_id]
|
|
journey = jps.JourneyDescription(journey_stages)
|
|
journey_id = self.simulation.add_journey(journey)
|
|
|
|
self.doorway_info[door_id] = {
|
|
"waiting_area": waiting_area,
|
|
"waiting_set_id": waiting_set_id,
|
|
"queue_waypoints": queue_waypoints,
|
|
"journey_id": journey_id,
|
|
"door_polygon": door_polygon
|
|
}
|
|
for door_id, door_polygon in self.setup.doorways.items():
|
|
self.doorway_system(door_id, door_polygon)
|
|
|
|
def get_spawn_events(self)->List[Dict]:
|
|
events = []
|
|
agents_by_grade = {}
|
|
def get_spawn_point(self,door:int,num_points:int)->List[Tuple[float,float]]:
|
|
polygon = self.setup.doorways[door]
|
|
min_x,min_y,max_x,max_y = polygon.bounds
|
|
points = []
|
|
attempts = 0
|
|
max_attempts = num_points * 100
|
|
while len(points) < num_points and attempts < max_attempts:
|
|
x = random.uniform(min_x, max_x)
|
|
y = random.uniform(min_y, max_y)
|
|
point = shapely.Point(x, y)
|
|
if polygon.contains(point):
|
|
too_close = False
|
|
for existing in points:
|
|
if np.sqrt((x - existing[0])**2 + (y - existing[1])**2) < self.setup.min_spacing:
|
|
too_close = True
|
|
break
|
|
if not too_close:
|
|
points.append((x, y))
|
|
attempts += 1
|
|
return points[:num_points]
|
|
for agent in self.all_agents:
|
|
for grade_name, grade_info in self.setup.grades.items():
|
|
if agent.door == grade_info["Door"]:
|
|
if grade_name not in agents_by_grade:
|
|
agents_by_grade[grade_name] = []
|
|
agents_by_grade[grade_name].append(agent)
|
|
break
|
|
for grade_name, grade_info in self.setup.grades.items():
|
|
door_id = grade_info["Door"]
|
|
spawn_time = grade_info["Spawn Time"]
|
|
grade_agents = agents_by_group.get(grade_name, [])
|
|
if not grade_agents:
|
|
continue
|
|
door_polygon = self.setup.doorways[door_id]
|
|
spawn_positions = self.get_spawn_point(
|
|
door_polygon,
|
|
len(group_agents))
|
|
for agent, position in zip(group_agents, spawn_positions):
|
|
events.append({
|
|
"time": spawn_time,
|
|
"agent": agent,
|
|
"position": position,
|
|
"grade": grade_name,
|
|
"door": door_id
|
|
})
|
|
events.sort(key=lambda x: x["time"])
|
|
return events
|
|
|
|
def run_sim(self):
|
|
spawned_event_indices = set()
|
|
agents_in_door_area = {door_id: 0 for door_id in self.config.door_polygons.keys()}
|
|
event_index = 0
|
|
|
|
print("\nStarting Simulation Loop...")
|
|
print(f"Total Simulation Time: {self.config.total_simulation_time}s")
|
|
print(f"Door Capacity: {self.config.door_capacity} agents per door")
|
|
|
|
while self.simulation.elapsed_time() < self.config.total_simulation_time:
|
|
current_time = self.simulation.elapsed_time()
|
|
self._process_spawn_events(
|
|
current_time,
|
|
event_index,
|
|
spawned_event_indices,
|
|
agents_in_door_area)
|
|
while (event_index < len(self.spawn_events) and \
|
|
self.spawn_events[event_index]["time"] <= current_time and \
|
|
event_index in spawned_event_indices):
|
|
event_index += 1
|
|
self.simulation.iterate()
|
|
|
|
print(f"\nSimulation completed at {self.simulation.elapsed_time():.2f} seconds")
|
|
|
|
def process_spawn_events(
|
|
self,
|
|
current_time: float,
|
|
event_index: int,
|
|
spawned_events: set,
|
|
agents_in_door_area: Dict
|
|
):
|
|
while (event_idx < len(self.spawn_events) and \
|
|
self.spawn_events[event_idx]["time"] <= current_time and \
|
|
event_idx not in spawned_events):
|
|
event = self.spawn_events[event_idx]
|
|
door_id = event["door"]
|
|
agent = event["agent"]
|
|
if agents_in_door_area[door_id] < self.setup.door_capacity:
|
|
self.spawn_agent(event,door_id,agent)
|
|
agents_in_door_area[door_id] += 1
|
|
spawned_events.add(event_idx)
|
|
event_index += 1
|
|
|
|
def spawn_agent(self,event:Dict,door_id:int,agent:AgentSetup):
|
|
journey_id = self.doorway_systems[door_id]["journey_id"]
|
|
|
|
agent_params = jps.CollisionFreeSpeedModelAgentParameters(
|
|
position=event["position"],
|
|
journey_id=journey_id,
|
|
stage_id=self.doorway_system[door_id]["waiting_set_id"],
|
|
radius=agent.radius,
|
|
v0=agent.speed,
|
|
)
|
|
|
|
agent_id = self.simulation.add_agent(agent_params)
|
|
|
|
# Optional: Log spawning
|
|
if agent_id % 50 == 0: # Log every 50th agent
|
|
print(f" Spawned agent {agent_id} (group: {event['group']}, door: {door_id})")
|
|
|
|
|
|
def start_sim_run():
|
|
print("Evacuation Simulation")
|
|
print("-" * 40)
|
|
geometry,[door0,door1,door2,exit_door] = geo_current(full_plot = True)
|
|
door_polygons = {
|
|
1: door0,
|
|
2: door1,
|
|
3: door2
|
|
}
|
|
grade_data = {
|
|
"Kindergarden":{
|
|
"Door":0,
|
|
"Pop Current":34,
|
|
"Pop Mean":31.43,
|
|
"Pop Std Dev":5.65,
|
|
"Speed Mean":1.21,
|
|
"Speed Std Dev":0.24,
|
|
"Radius":0.407,
|
|
"Spawn Time":None
|
|
},
|
|
"Grade 1":{
|
|
"Door":0,
|
|
"Pop Current":26,
|
|
"Pop Mean":32.57,
|
|
"Pop Std Dev":6.27,
|
|
"Speed Mean":1.35,
|
|
"Speed Std Dev":0.26,
|
|
"Radius":0.407,
|
|
"Spawn Time":None
|
|
},
|
|
"Grade 2":{
|
|
"Door":0,
|
|
"Pop Current":42,
|
|
"Pop Mean":34.43,
|
|
"Pop Std Dev":6.80,
|
|
"Speed Mean":1.42,
|
|
"Speed Std Dev":0.28,
|
|
"Radius":0.407,
|
|
"Spawn Time":None
|
|
},
|
|
"Grade 3":{
|
|
"Door":0,
|
|
"Pop Current":39,
|
|
"Pop Mean":35.43,
|
|
"Pop Std Dev":5.19,
|
|
"Speed Mean":1.48,
|
|
"Speed Std Dev":0.23,
|
|
"Radius":0.407,
|
|
"Spawn Time":None
|
|
},
|
|
"Grade 4":{
|
|
"Door":1,
|
|
"Pop Current":30,
|
|
"Pop Mean":34.86,
|
|
"Pop Std Dev":6.77,
|
|
"Speed Mean":1.58,
|
|
"Speed Std Dev":0.26,
|
|
"Radius":0.417,
|
|
"Spawn Time":None
|
|
},
|
|
"Grade 5":{
|
|
"Door":1,
|
|
"Pop Current":43,
|
|
"Pop Mean":36.71,
|
|
"Pop Std Dev":7.09,
|
|
"Speed Mean":1.59,
|
|
"Speed Std Dev":0.24,
|
|
"Radius":0.434,
|
|
"Spawn Time":None
|
|
},
|
|
"Grade 6":{
|
|
"Door":1,
|
|
"Pop Current":29,
|
|
"Pop Mean":37.71,
|
|
"Pop Std Dev":6.99,
|
|
"Speed Mean":1.65,
|
|
"Speed Std Dev":0.24,
|
|
"Radius":0.454,
|
|
"Spawn Time":None
|
|
},
|
|
"Grade 7":{
|
|
"Door":2,
|
|
"Pop Current":45,
|
|
"Pop Mean":40.43,
|
|
"Pop Std Dev":6.02,
|
|
"Speed Mean":1.61,
|
|
"Speed Std Dev":0.25,
|
|
"Radius":0.471,
|
|
"Spawn Time":None
|
|
},
|
|
"Grade 8":{
|
|
"Door":2,
|
|
"Pop Current":36,
|
|
"Pop Mean":40.43,
|
|
"Pop Std Dev":5.50,
|
|
"Speed Mean":1.66,
|
|
"Speed Std Dev":0.24,
|
|
"Radius":0.488,
|
|
"Spawn Time":None
|
|
},
|
|
"Grade 9":{
|
|
"Door":2,
|
|
"Pop Current":44,
|
|
"Pop Mean":44.14,
|
|
"Pop Std Dev":4.85,
|
|
"Speed Mean":1.60,
|
|
"Speed Std Dev":0.24,
|
|
"Radius":0.500,
|
|
"Spawn Time":None
|
|
},
|
|
"Grade 10":{
|
|
"Door":2,
|
|
"Pop Current":36,
|
|
"Pop Mean":46.29,
|
|
"Pop Std Dev":6.29,
|
|
"Speed Mean":1.57,
|
|
"Speed Std Dev":0.23,
|
|
"Radius":0.507,
|
|
"Spawn Time":None
|
|
},
|
|
"Grade 11":{
|
|
"Door":2,
|
|
"Pop Current":54,
|
|
"Pop Mean":48.29,
|
|
"Pop Std Dev":3.30,
|
|
"Speed Mean":1.51,
|
|
"Speed Std Dev":0.22,
|
|
"Radius":0.515,
|
|
"Spawn Time":None
|
|
},
|
|
"Grade 12":{
|
|
"Door":2,
|
|
"Pop Current":46,
|
|
"Pop Mean":43.71,
|
|
"Pop Std Dev":6.02,
|
|
"Speed Mean":1.54,
|
|
"Speed Std Dev":0.23,
|
|
"Radius":0.520,
|
|
"Spawn Time":None
|
|
}}
|
|
|
|
config = SimSetup(
|
|
doorways=door_polygons,
|
|
grades=grade_data,
|
|
total_simulation_time=180.0,
|
|
door_capacity=10,
|
|
walkable_area=geometry,
|
|
exit_area=exit_door
|
|
)
|
|
|
|
sim = EvacSim(config)
|
|
return sim.run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
simulation = start_sim_run()
|
|
print(f"\nFinal simulation state:")
|
|
print(f" Elapsed time: {simulation.elapsed_time():.2f}s")
|
|
|