388 lines
14 KiB
Python
388 lines
14 KiB
Python
import jupedsim as jps
|
|
import shapely
|
|
import random
|
|
from dataclasses import dataclass
|
|
from typing import List, Dict, Tuple, Optional
|
|
import numpy as np
|
|
|
|
@dataclass
|
|
class AgentConfig:
|
|
id: int
|
|
grade: str
|
|
door: int
|
|
speed: float
|
|
radius: float
|
|
|
|
@dataclass
|
|
class SimulationConfig:
|
|
door_polygons: Dict[int, shapely.Polygon]
|
|
groups: Dict[str, Dict]
|
|
total_simulation_time: float = 300.0
|
|
door_capacity: int = 10
|
|
min_spacing: float = 0.6
|
|
|
|
"""
|
|
def __post_init__(self):
|
|
'''Set default walkable area and exit polygon if not provided.'''
|
|
if self.walkable_area_coords is None:
|
|
self.walkable_area_coords = [(0, 0), (50, 0), (50, 30), (0, 30)]
|
|
if self.exit_polygon_coords is None:
|
|
self.exit_polygon_coords = [(45, 10), (48, 10), (48, 20), (45, 20)]
|
|
"""
|
|
|
|
class PedestrianSimulation:
|
|
"""
|
|
Main class for running pedestrian simulations with door queues and scheduled spawning.
|
|
|
|
Architecture Flow:
|
|
1. Configuration Setup
|
|
↓
|
|
2. Agent Configuration Generation
|
|
↓
|
|
3. Geometry Preparation (Walkable area, Doors, Exit)
|
|
↓
|
|
4. Door System Setup (Waiting areas, Queues, Journeys)
|
|
↓
|
|
5. Spawn Event Precomputation
|
|
↓
|
|
6. Simulation Execution with Dynamic Spawning
|
|
↓
|
|
7. Results Analysis/Visualization
|
|
"""
|
|
|
|
def __init__(self, config: SimulationConfig):
|
|
self.config = config
|
|
self.simulation = None
|
|
self.door_systems = {}
|
|
self.all_agents = []
|
|
self.spawn_events = []
|
|
self.exit_id = None
|
|
|
|
def run(self) -> jps.Simulation:
|
|
"""
|
|
Main orchestrator: Run the complete simulation.
|
|
|
|
Steps:
|
|
1. Create agent configurations
|
|
2. Setup simulation environment
|
|
3. Precompute spawn events
|
|
4. Execute simulation with dynamic spawning
|
|
5. Return completed simulation object
|
|
"""
|
|
print("=" * 60)
|
|
print("PEDESTRIAN SIMULATION STARTING")
|
|
print("=" * 60)
|
|
|
|
self.all_agents = self._create_agent_configurations()
|
|
print(f"Created {len(self.all_agents)} agent configurations")
|
|
|
|
self._setup_simulation_environment()
|
|
print("Simulation environment setup complete")
|
|
|
|
self.spawn_events = self._precompute_spawn_events()
|
|
print(f"Precomputed {len(self.spawn_events)} spawn events")
|
|
|
|
self._execute_simulation()
|
|
print("Simulation execution complete")
|
|
return self.simulation
|
|
|
|
def _create_agent_configurations(self) -> List[AgentConfig]:
|
|
"""Create AgentConfig objects for all agents in all groups."""
|
|
all_agents = []
|
|
agent_id = 0
|
|
|
|
for group_name, group_info in self.config.groups.items():
|
|
door = group_info["door"]
|
|
size = group_info["size"]
|
|
|
|
for i in range(size):
|
|
grade = random.choice(["A", "B", "C", "D", "F"])
|
|
speed = random.uniform(1.0, 1.5) # m/s
|
|
radius = random.uniform(0.2, 0.3) # meters
|
|
|
|
all_agents.append(AgentConfig(
|
|
id=agent_id,
|
|
grade=grade,
|
|
door=door,
|
|
speed=speed,
|
|
radius=radius
|
|
))
|
|
agent_id += 1
|
|
|
|
return all_agents
|
|
# includes id, grade, door, speed, and radius
|
|
|
|
def _generate_spawn_points(self, polygon: shapely.Polygon,num_points: int) -> List[Tuple[float, float]]:
|
|
"""Generate non-overlapping spawn points within a polygon."""
|
|
points = []
|
|
min_x, min_y, max_x, max_y = polygon.bounds
|
|
|
|
attempts = 0
|
|
max_attempts = num_points * 100
|
|
|
|
while len(points) < num_points and attempts < max_attempts:
|
|
x = random.uniform(min_x, max_x)
|
|
y = random.uniform(min_y, max_y)
|
|
point = shapely.Point(x, y)
|
|
|
|
if polygon.contains(point):
|
|
too_close = False
|
|
for existing in points:
|
|
if np.sqrt((x - existing[0])**2 + (y - existing[1])**2) < self.config.min_spacing:
|
|
too_close = True
|
|
break
|
|
|
|
if not too_close:
|
|
points.append((x, y))
|
|
|
|
attempts += 1
|
|
return points[:num_points]
|
|
# get list of spawn point tuples to provide to agents
|
|
|
|
def _create_waiting_area(self, door_polygon: shapely.Polygon) -> shapely.Polygon:
|
|
"""Create a waiting area adjacent to the door polygon."""
|
|
waiting_area = door_polygon.buffer(2.0, join_style=2)
|
|
waiting_area = waiting_area.difference(door_polygon)
|
|
|
|
if waiting_area.geom_type == 'MultiPolygon':
|
|
waiting_area = max(waiting_area.geoms, key=lambda p: p.area)
|
|
|
|
return waiting_area
|
|
|
|
def _setup_simulation_environment(self):
|
|
"""Setup the simulation with door queues and waiting areas."""
|
|
# Create walkable area geometry
|
|
walkable_area = shapely.Polygon(self.config.walkable_area_coords)
|
|
|
|
# Create model and simulation
|
|
model = jps.CollisionFreeSpeedModel()
|
|
self.simulation = jps.Simulation(model=model, geometry=walkable_area)
|
|
|
|
# Define exit zone
|
|
exit_polygon = shapely.Polygon(self.config.exit_polygon_coords)
|
|
self.exit_id = self.simulation.add_exit_stage(exit_polygon)
|
|
|
|
# Create door systems
|
|
for door_id, door_polygon in self.config.door_polygons.items():
|
|
self._setup_door_system(door_id, door_polygon)
|
|
|
|
def _setup_door_system(self, door_id: int, door_polygon: shapely.Polygon):
|
|
"""Setup queue system for a specific door."""
|
|
# Create waiting area
|
|
waiting_area = self._create_waiting_area(door_polygon)
|
|
waiting_set_id = self.simulation.add_waiting_set_stage(waiting_area)
|
|
|
|
# Create queue waypoints
|
|
door_centroid = door_polygon.centroid
|
|
queue_waypoints = [
|
|
self.simulation.add_waypoint_stage((door_centroid.x, door_centroid.y - 1.0), 0.5),
|
|
self.simulation.add_waypoint_stage((door_centroid.x, door_centroid.y), 0.5),
|
|
self.simulation.add_waypoint_stage((door_centroid.x, door_centroid.y + 1.0), 0.5)
|
|
]
|
|
|
|
# Create journey
|
|
journey_stages = [waiting_set_id] + queue_waypoints + [self.exit_id]
|
|
journey = jps.JourneyDescription(journey_stages)
|
|
journey_id = self.simulation.add_journey(journey)
|
|
|
|
# Store door system
|
|
self.door_systems[door_id] = {
|
|
"waiting_area": waiting_area,
|
|
"waiting_set_id": waiting_set_id,
|
|
"queue_waypoints": queue_waypoints,
|
|
"journey_id": journey_id,
|
|
"door_polygon": door_polygon
|
|
}
|
|
|
|
def _precompute_spawn_events(self) -> List[Dict]:
|
|
"""Precompute all spawn events with positions and agent configurations."""
|
|
events = []
|
|
|
|
# Group agents by their assigned group
|
|
agents_by_group = {}
|
|
for agent in self.all_agents:
|
|
for group_name, group_info in self.config.groups.items():
|
|
if agent.door == group_info["door"]:
|
|
if group_name not in agents_by_group:
|
|
agents_by_group[group_name] = []
|
|
agents_by_group[group_name].append(agent)
|
|
break
|
|
|
|
# Create events for each group
|
|
for group_name, group_info in self.config.groups.items():
|
|
door_id = group_info["door"]
|
|
spawn_time = group_info["spawn_time"]
|
|
group_agents = agents_by_group.get(group_name, [])
|
|
|
|
if not group_agents:
|
|
continue
|
|
|
|
# Generate spawn positions
|
|
door_polygon = self.config.door_polygons[door_id]
|
|
spawn_positions = self._generate_spawn_points(
|
|
door_polygon,
|
|
len(group_agents)
|
|
)
|
|
|
|
# Create events
|
|
for agent, position in zip(group_agents, spawn_positions):
|
|
events.append({
|
|
"time": spawn_time,
|
|
"agent_config": agent,
|
|
"position": position,
|
|
"group": group_name,
|
|
"door": door_id
|
|
})
|
|
|
|
# Sort events by time
|
|
events.sort(key=lambda x: x["time"])
|
|
return events
|
|
|
|
def _execute_simulation(self):
|
|
"""Execute the simulation with dynamic spawning."""
|
|
spawned_event_indices = set()
|
|
agents_in_door_area = {door_id: 0 for door_id in self.config.door_polygons.keys()}
|
|
event_index = 0
|
|
'''
|
|
print("\nStarting simulation loop...")
|
|
print(f"Total simulation time: {self.config.total_simulation_time}s")
|
|
print(f"Door capacity: {self.config.door_capacity} agents per door")
|
|
'''
|
|
while self.simulation.elapsed_time() < self.config.total_simulation_time:
|
|
current_time = self.simulation.elapsed_time()
|
|
|
|
# Process spawn events
|
|
self._process_spawn_events(current_time, event_index, spawned_event_indices,
|
|
agents_in_door_area)
|
|
|
|
# Update event index
|
|
while (event_index < len(self.spawn_events) and
|
|
self.spawn_events[event_index]["time"] <= current_time and
|
|
event_index in spawned_event_indices):
|
|
event_index += 1
|
|
|
|
# Iterate simulation
|
|
self.simulation.iterate()
|
|
|
|
print(f"\nSimulation completed at {self.simulation.elapsed_time():.2f} seconds")
|
|
|
|
def _process_spawn_events(self, current_time: float, event_index: int,
|
|
spawned_event_indices: set, agents_in_door_area: Dict):
|
|
"""Process all spawn events that should occur at the current time."""
|
|
while (event_index < len(self.spawn_events) and
|
|
self.spawn_events[event_index]["time"] <= current_time and
|
|
event_index not in spawned_event_indices):
|
|
|
|
event = self.spawn_events[event_index]
|
|
door_id = event["door"]
|
|
agent_config = event["agent_config"]
|
|
|
|
# Check door capacity
|
|
if agents_in_door_area[door_id] < self.config.door_capacity:
|
|
self._spawn_agent(event, door_id, agent_config)
|
|
agents_in_door_area[door_id] += 1
|
|
spawned_event_indices.add(event_index)
|
|
|
|
# Move to next event
|
|
event_index += 1
|
|
|
|
def _spawn_agent(self, event: Dict, door_id: int, agent_config: AgentConfig):
|
|
"""Spawn a single agent into the simulation."""
|
|
journey_id = self.door_systems[door_id]["journey_id"]
|
|
|
|
agent_params = jps.CollisionFreeSpeedModelAgentParameters(
|
|
position=event["position"],
|
|
journey_id=journey_id,
|
|
stage_id=self.door_systems[door_id]["waiting_set_id"],
|
|
radius=agent_config.radius,
|
|
v0=agent_config.speed,
|
|
)
|
|
|
|
agent_id = self.simulation.add_agent(agent_params)
|
|
|
|
# Optional: Log spawning
|
|
if agent_id % 50 == 0: # Log every 50th agent
|
|
print(f" Spawned agent {agent_id} (group: {event['group']}, door: {door_id})")
|
|
|
|
|
|
# Example usage function
|
|
def create_and_run_simulation() -> PedestrianSimulation:
|
|
"""
|
|
Example function to create and run a complete simulation.
|
|
|
|
Returns:
|
|
PedestrianSimulation: The completed simulation object
|
|
"""
|
|
# Define door polygons
|
|
door_polygons = {
|
|
1: shapely.Polygon([(5, 5), (10, 5), (10, 15), (5, 15)]),
|
|
2: shapely.Polygon([(20, 5), (25, 5), (25, 15), (20, 15)]),
|
|
3: shapely.Polygon([(35, 5), (40, 5), (40, 15), (35, 15)]),
|
|
}
|
|
|
|
# Define groups (example with 4 groups, extend to 13 as needed)
|
|
groups = {
|
|
"group_1": {"door": 1, "spawn_time": 0.0, "size": 40},
|
|
"group_2": {"door": 2, "spawn_time": 5.0, "size": 35},
|
|
"group_3": {"door": 3, "spawn_time": 10.0, "size": 30},
|
|
"group_4": {"door": 1, "spawn_time": 15.0, "size": 25},
|
|
# Add 9 more groups to reach 13 total
|
|
}
|
|
|
|
# Create simulation configuration
|
|
config = SimulationConfig(
|
|
door_polygons=door_polygons,
|
|
groups=groups,
|
|
total_simulation_time=200.0, # Adjust as needed
|
|
door_capacity=10,
|
|
min_spacing=0.6
|
|
)
|
|
|
|
# Create and run simulation
|
|
sim_runner = PedestrianSimulation(config)
|
|
simulation = sim_runner.run()
|
|
|
|
return sim_runner
|
|
|
|
|
|
# Quick execution function
|
|
def run_simulation_quickstart():
|
|
"""Quickstart function for running a basic simulation."""
|
|
print("Pedestrian Simulation Quickstart")
|
|
print("-" * 40)
|
|
|
|
# You can modify these parameters
|
|
door_polygons = {
|
|
1: shapely.Polygon([(2, 2), (6, 2), (6, 8), (2, 8)]),
|
|
2: shapely.Polygon([(10, 2), (14, 2), (14, 8), (10, 8)]),
|
|
3: shapely.Polygon([(18, 2), (22, 2), (22, 8), (18, 8)]),
|
|
}
|
|
|
|
groups = {
|
|
"class_a": {"door": 1, "spawn_time": 0.0, "size": 30},
|
|
"class_b": {"door": 2, "spawn_time": 10.0, "size": 25},
|
|
"class_c": {"door": 3, "spawn_time": 20.0, "size": 20},
|
|
}
|
|
|
|
config = SimulationConfig(
|
|
door_polygons=door_polygons,
|
|
groups=groups,
|
|
total_simulation_time=100.0,
|
|
door_capacity=8
|
|
)
|
|
|
|
sim = PedestrianSimulation(config)
|
|
return sim.run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Option 1: Use the example function
|
|
# sim_runner = create_and_run_simulation()
|
|
|
|
# Option 2: Use quickstart for testing
|
|
simulation = run_simulation_quickstart()
|
|
|
|
# You can now analyze the simulation results
|
|
print(f"\nFinal simulation state:")
|
|
print(f" Elapsed time: {simulation.elapsed_time():.2f}s")
|
|
# Additional analysis can be added here |