1729 lines
68 KiB
Plaintext
1729 lines
68 KiB
Plaintext
===============================
|
|
(written code script)
|
|
===============================
|
|
from typing import Tuple,List,Dict
|
|
import sys
|
|
import matplotlib
|
|
matplotlib.use('QtAgg')
|
|
import matplotlib.pyplot as plt
|
|
from shapely import Polygon,Point,GeometryCollection,get_coordinates
|
|
from shapely.plotting import plot_polygon
|
|
import jupedsim as jps
|
|
import numpy as np
|
|
import pathlib
|
|
from geometry_setup import GeometrySetup
|
|
from spawning import SpawnManager
|
|
from crosswalk_setup import CrosswalkStatus,CrosswalkController,CrosswalkState,CrosswalkConfig
|
|
from agents.agent_setup import AgentConfig
|
|
from dataclasses import dataclass
|
|
|
|
@dataclass
|
|
class SimulationSetup:
|
|
simulation_time:float=200.0
|
|
time_step:float=0.01
|
|
max_iterations:int=50000
|
|
trajectory_file:str="crosswalk_sim.sqlite"
|
|
current_setup:bool=True
|
|
trigger_dist:float=1.0
|
|
activation_time:float=30.0
|
|
cooldown_time:float=10.0
|
|
min_agents_activation:int=1
|
|
activation_delay:float=2.0
|
|
|
|
class CrosswalkSimulation:
|
|
def __init__(self,config:SimulationSetup):
|
|
self.config = config
|
|
print("\nInitializing Geometry...")
|
|
self.geo = GeometrySetup(self.config.current_setup)
|
|
self.spawn_area = self.geo.spawn()
|
|
self.queue_area = self.geo.queue()
|
|
self.trigger_area = self.geo.trigger()
|
|
self.crosswalk_area = self.geo.crosswalk()
|
|
self.grass_area = self.geo.grass()
|
|
self.walkable_area = GeometryCollection([
|
|
self.spawn_area,
|
|
self.queue_area,
|
|
self.trigger_area,
|
|
self.crosswalk_area,
|
|
self.grass_area
|
|
]).unary_union
|
|
print("Geometry Configuration Complete!")
|
|
print("\nInitializing Spawn Manager...")
|
|
self.spawn_manager = SpawnManager(self.spawn_area,min_spacing=0.55)
|
|
self.spawn_manager.generate_coords()
|
|
print("Spawn Manager Setup Complete!")
|
|
print("\nInitializing Crosswalk Controller...")
|
|
self.crosswalk_controller = CrosswalkController(
|
|
CrosswalkConfig(
|
|
trigger_dist=self.config.trigger_dist,
|
|
activation_time=self.config.activation_time,
|
|
cooldown_time=self.config.cooldown_time,
|
|
min_agents_activation=self.config.min_agents_activation,
|
|
activation_delay=self.config.activation_delay,
|
|
crosswalk_area=self.crosswalk_area,
|
|
trigger_area=self.trigger_area
|
|
),
|
|
self.config.current_setup
|
|
)
|
|
print("Crosswalk Controller Setup Complete!")
|
|
print("\nInitializing Agent Setup...")
|
|
self.all_agents, pop_list = AgentConfig()
|
|
self.total_agents = len(self.all_agents)
|
|
print(f"Created {self.total_agents} Agents...")
|
|
print("Agent Setup Complete!")
|
|
print("\nInitializing JuPedSim Model...")
|
|
self.trajectory_file = self.config.trajectory_file
|
|
self.simulation = jps.Simulation(
|
|
model=jps.CollisionFreeSpeedModel(),
|
|
geometry=self.walkable_area,
|
|
trajectory_writer=jps.SqliteTrajectoryWriter(
|
|
output_file=pathlib.Path(self.trajectory_file)
|
|
),
|
|
dt=self.config.time_step
|
|
)
|
|
self.queue_waiting_coords = self.waiting_coords(
|
|
self.queue_area,self.spawn_manager.min_spacing
|
|
)
|
|
self.queue_stage_id = self.simulation.add_waiting_set_stage(self.queue_waiting_coords)
|
|
self.queue_stage = self.simulation.get_stage(self.queue_stage_id)
|
|
self.queue_stage.state = jps.WaitingSetState.INACTIVE
|
|
self.crosswalk_entry_id = self.simulation.add_waypoint_stage((((
|
|
self.crosswalk_area.bounds[1]+self.crosswalk_area.bounds[3])/2),
|
|
self.crosswalk_area.bounds[2]),
|
|
distance=0.3
|
|
)
|
|
self.crosswalk_exit_id = self.simulation.add_waypoint_stage((((
|
|
self.crosswalk_area.bounds[1]+self.crosswalk_area.bounds[3])/2),
|
|
self.crosswalk_area.bounds[0]),
|
|
distance=0.3
|
|
)
|
|
self.exit_stage_id = self.simulation.add_exit_stage([
|
|
self.grass_area.centroid.x,
|
|
self.grass_area.centroid.y
|
|
])
|
|
self.journey = jps.JourneyDescription([
|
|
self.queue_stage_id,
|
|
self.crosswalk_entry_id,
|
|
self.crosswalk_exit_id,
|
|
self.exit_stage_id
|
|
])
|
|
self.journey.set_transition_for_stage(
|
|
self.queue_stage_id,
|
|
jps.Transition.create_fixed_transition(
|
|
self.crosswalk_entry_id
|
|
))
|
|
self.journey.set_transition_for_stage(
|
|
self.crosswalk_entry_id,
|
|
jps.Transition.create_fixed_transition(
|
|
self.crosswalk_exit_id
|
|
))
|
|
self.journey.set_transition_for_stage(
|
|
self.crosswalk_exit_id,
|
|
jps.Transition.create_fixed_transition(
|
|
self.exit_stage_id
|
|
))
|
|
self.journey_id = self.simulation.add_journey(
|
|
self.journey)
|
|
self.jps_agent_ids = {}
|
|
self.active_agents = {}
|
|
self.iteration_count = 0
|
|
self.current_time = 0.0
|
|
|
|
def waiting_coords(self,area:Polygon,spacing:float)->None:
|
|
min_x,min_y,max_x,max_y = area.bounds
|
|
points = []
|
|
x = (min_x+spacing)/2
|
|
while x<max_x:
|
|
y = (min_y+spacing)/2
|
|
while y < max_y:
|
|
point = jps.Vector2D(x, y)
|
|
if area.contains(Polygon([(x, y)]).buffer(0.1)):
|
|
points.append((point.x, point.y))
|
|
y += spacing
|
|
x += spacing
|
|
return points if points else [(min_x+0.5, min_y+0.5)]
|
|
|
|
def run(self,max_iterations:int):
|
|
print("\n\nStarting Simulation")
|
|
print("Target Agents: {len(self.all_agents)}")
|
|
while self.iteration_count < max_iterations:
|
|
if self.spawn_manager.check_spawn_complete(self.all_agents):
|
|
new_agent = self.spawn_manager.spawn_agent(self.all_agents)
|
|
if new_agent:
|
|
jps_agent_params = jps.CollisionFreeSpeedModelAgentParameters(
|
|
journey_id=self.journey_id,
|
|
stage_id=self.queue_stage_id,
|
|
position=self.spawn_manager.get_agent_pos(new_agent.id),
|
|
v0=new_agent.speed,
|
|
radius=new_agent.radius
|
|
)
|
|
jps_agent_id = self.simulation.add_agent(jps_agent_params)
|
|
self.jps_agent_ids[new_agent.id] = jps_agent_id
|
|
self.agent_targets[new_agent.id] = None
|
|
print(f"Iter {self.iteration_count:05d}: Spawned agent {new_agent.id} (JPS-ID: {jps_agent_id})")
|
|
current_agent_pos = {}
|
|
for spawn_id,jps_id in self.jps_agent_ids.items():
|
|
agent = self.simulation.agent(jps_id)
|
|
current_agent_pos[spawn_id] = (agent.position[0],agent.position[1])
|
|
crosswalk_state = self.crosswalk_controller.update(
|
|
current_agent_pos,self.config.time_step,self.current_time
|
|
)
|
|
if crosswalk_state.is_active:
|
|
if self.queue_stage.stage != jps.WaitingSetState.INACTIVE:
|
|
self.queue_stage.stage = jps.WaitingSetState.INACTIVE
|
|
print(f"Iter {self.iteration_count:05d}: Crosswalk ACTIVATED, releasing queue.")
|
|
else:
|
|
if self.queue_stage.stage != jps.WaitingSetState.ACTIVE:
|
|
self.queue_stage.stage = jps.WaitingSetState.INACTIVE
|
|
print(f"Iter {self.iteration_count:05d}: Crosswalk INACTIVE, holding agents.")
|
|
self.simulation.iterate()
|
|
self.current_time += self.config.time_step
|
|
self.iteration_count +=1
|
|
agents_to_remove = []
|
|
for spawn_id, jps_id in list(self.jps_agent_ids.items()):
|
|
try:
|
|
_ = self.simulation.agent(jps_id)
|
|
except RuntimeError:
|
|
agents_to_remove.append(spawn_id)
|
|
self.spawn_manager.unfill_coords(spawn_id)
|
|
for spawn_id in agents_to_remove:
|
|
del self.jps_agent_ids[spawn_id]
|
|
print(f"Iter {self.iteration_count:05d}: Agent {spawn_id} reached exit and was removed.")
|
|
if (len(self.jps_agent_ids) ==0 and not self.spawn_manager.check_spawn_complete(self.all_agents)):
|
|
print(f"\nSimulation Completed at Iteration: {self.iteration_count}")
|
|
print(f"Total Simulation Time: {self.current_time:.2f} seconds.")
|
|
break
|
|
print("\n" + "="*50)
|
|
print("Simulation finished.")
|
|
stats = self.crosswalk_controller.get_statistics()
|
|
print(f"Crosswalk was activated {stats['total_activations']} times.")
|
|
print(f"Agents served by crosswalk: {stats['agents_served']}")
|
|
print(f"Trajectory saved to: {self.trajectory_file.absolute()}")
|
|
============================================================
|
|
|
|
Based on the below examples, can you please improve the code above to ensure that the logic is correct, variables are used correctly, and the script correctly calls functions
|
|
|
|
|
|
|
|
==========================
|
|
(example 1)
|
|
===========================
|
|
|
|
|
|
"""
|
|
main_simulation.py
|
|
|
|
Core simulation file integrating JuPedSim with custom geometry, spawning,
|
|
agent setup, and crosswalk control modules. Manages the full simulation lifecycle.
|
|
|
|
Author: Simulation Engineer
|
|
Date: 2026-01-27
|
|
Version: 1.0
|
|
"""
|
|
import sys
|
|
import time
|
|
import pathlib
|
|
from typing import Dict, Tuple, List, Optional
|
|
import numpy as np
|
|
import jupedsim as jps
|
|
from shapely import Polygon, Point
|
|
|
|
# Import your custom modules
|
|
from geometry_setup import GeometrySetup
|
|
from spawning import SpawnManager
|
|
from agents.agent_setup import run_AgentSetup
|
|
from crosswalk_setup import CrosswalkController, CrosswalkConfig, CrosswalkStatus
|
|
|
|
|
|
class CrosswalkSimulation:
|
|
"""Main simulation controller integrating JuPedSim with custom components."""
|
|
|
|
def __init__(self, current_setup: bool = False, simulation_time: float = 300.0):
|
|
"""
|
|
Initialize the complete simulation.
|
|
|
|
Args:
|
|
current_setup: Boolean flag for geometry setup (True/False)
|
|
simulation_time: Maximum simulation time in seconds
|
|
"""
|
|
# 1. SETUP GEOMETRY
|
|
print("Setting up geometry...")
|
|
self.geometry_setup = GeometrySetup(current_setup)
|
|
self.spawn_area = self.geometry_setup.spawn()
|
|
self.queue_area = self.geometry_setup.queue()
|
|
self.trigger_area = self.geometry_setup.trigger()
|
|
self.crosswalk_area = self.geometry_setup.crosswalk()
|
|
self.grass_area = self.geometry_setup.grass()
|
|
|
|
# Combine all accessible areas for JuPedSim geometry (excluding spawn as it's a source)
|
|
# The walkable area is everything except obstacles (in this case, the road area outside crosswalk)
|
|
walkable_polygon = Polygon([
|
|
(0, 0),
|
|
(30, 0),
|
|
(30, 18),
|
|
(0, 18)
|
|
])
|
|
# Define holes for non-walkable areas if needed
|
|
self.jps_geometry = walkable_polygon
|
|
|
|
# 2. SETUP CROSSWALK CONTROLLER
|
|
print("Setting up crosswalk controller...")
|
|
crosswalk_config = CrosswalkConfig(
|
|
trigger_dist=1.5, # meters
|
|
activation_time=15.0, # seconds
|
|
cooldown_time=5.0, # seconds
|
|
min_agents_activation=1,
|
|
activation_delay=1.0,
|
|
crosswalk_area=self.crosswalk_area,
|
|
trigger_area=self.trigger_area
|
|
)
|
|
self.crosswalk_controller = CrosswalkController(crosswalk_config, current_setup)
|
|
|
|
# 3. SETUP AGENT SPAWNING
|
|
print("Setting up agent spawning...")
|
|
self.spawn_manager = SpawnManager(self.spawn_area, min_spacing=0.55)
|
|
self.spawn_manager.generate_coords(max_samples=200)
|
|
|
|
# 4. LOAD AGENT DEFINITIONS
|
|
print("Loading agent definitions...")
|
|
self.all_agents, self.pop_list = run_AgentSetup()
|
|
self.total_agents = len(self.all_agents)
|
|
|
|
# 5. SETUP JUPEDSIM SIMULATION [citation:1]
|
|
print("Initializing JuPedSim simulation...")
|
|
self.trajectory_file = "crosswalk_simulation.sqlite"
|
|
self.simulation = jps.Simulation(
|
|
model=jps.CollisionFreeSpeedModel(),
|
|
geometry=self.jps_geometry,
|
|
trajectory_writer=jps.SqliteTrajectoryWriter(
|
|
output_file=pathlib.Path(self.trajectory_file)
|
|
),
|
|
)
|
|
|
|
# 6. DEFINE JOURNEYS AND STAGES [citation:2]
|
|
print("Setting up routing journeys...")
|
|
self._setup_routing()
|
|
|
|
# 7. SIMULATION STATE
|
|
self.simulation_time = simulation_time
|
|
self.current_time = 0.0
|
|
self.time_step = 0.1 # seconds
|
|
self.active_agents: Dict[int, Dict] = {} # id -> {spawn_time, stage_history}
|
|
self.agents_in_queue: set = set()
|
|
self.agents_in_crosswalk: set = set()
|
|
self.simulation_complete = False
|
|
|
|
print(f"Simulation initialized with {self.total_agents} total agents.")
|
|
print(f"Crosswalk: trigger={crosswalk_config.trigger_dist}m, "
|
|
f"active={crosswalk_config.activation_time}s, "
|
|
f"cooldown={crosswalk_config.cooldown_time}s")
|
|
|
|
def _setup_routing(self) -> None:
|
|
"""
|
|
Setup JuPedSim journeys and stages for agent routing [citation:2].
|
|
Agents follow: Queue Area -> (Wait for Crosswalk) -> Crosswalk -> Grass (Exit)
|
|
"""
|
|
# 1. QUEUE AREA (Waiting Stage)
|
|
# Create a waypoint in the center of the queue area as a gathering point
|
|
queue_center = self.queue_area.centroid
|
|
self.queue_stage_id = self.simulation.add_waypoint_stage(
|
|
[queue_center.x, queue_center.y],
|
|
distance=0.5 # Acceptance radius
|
|
)
|
|
|
|
# 2. CROSSWALK (Conditional Stage)
|
|
# Create waypoints for crosswalk entry and exit
|
|
crosswalk_bounds = self.crosswalk_area.bounds
|
|
crosswalk_entry = ((crosswalk_bounds[0] + crosswalk_bounds[2]) / 2,
|
|
crosswalk_bounds[1]) # Bottom center
|
|
crosswalk_exit = ((crosswalk_bounds[0] + crosswalk_bounds[2]) / 2,
|
|
crosswalk_bounds[3]) # Top center
|
|
|
|
self.crosswalk_entry_id = self.simulation.add_waypoint_stage(
|
|
crosswalk_entry,
|
|
distance=0.3
|
|
)
|
|
self.crosswalk_exit_id = self.simulation.add_waypoint_stage(
|
|
crosswalk_exit,
|
|
distance=0.3
|
|
)
|
|
|
|
# 3. GRASS AREA (Exit Stage)
|
|
# Create exit in the grass area
|
|
grass_bounds = self.grass_area.bounds
|
|
exit_polygon = Polygon([
|
|
(grass_bounds[0], grass_bounds[1]),
|
|
(grass_bounds[2], grass_bounds[1]),
|
|
(grass_bounds[2], grass_bounds[1] + 2.0),
|
|
(grass_bounds[0], grass_bounds[1] + 2.0)
|
|
])
|
|
self.exit_stage_id = self.simulation.add_exit_stage(exit_polygon)
|
|
|
|
# 4. CREATE JOURNEYS [citation:2]
|
|
# Journey 1: For agents waiting to cross
|
|
self.waiting_journey = jps.JourneyDescription([
|
|
self.queue_stage_id,
|
|
self.crosswalk_entry_id,
|
|
self.exit_stage_id
|
|
])
|
|
|
|
# Set transitions: queue -> crosswalk entry -> exit
|
|
self.waiting_journey.set_transition_for_stage(
|
|
self.queue_stage_id,
|
|
jps.Transition.create_fixed_transition(self.crosswalk_entry_id)
|
|
)
|
|
self.waiting_journey.set_transition_for_stage(
|
|
self.crosswalk_entry_id,
|
|
jps.Transition.create_fixed_transition(self.exit_stage_id)
|
|
)
|
|
|
|
self.waiting_journey_id = self.simulation.add_journey(self.waiting_journey)
|
|
|
|
# Journey 2: For agents actively crossing (bypasses queue)
|
|
self.crossing_journey = jps.JourneyDescription([
|
|
self.crosswalk_entry_id,
|
|
self.exit_stage_id
|
|
])
|
|
self.crossing_journey.set_transition_for_stage(
|
|
self.crosswalk_entry_id,
|
|
jps.Transition.create_fixed_transition(self.exit_stage_id)
|
|
)
|
|
self.crossing_journey_id = self.simulation.add_journey(self.crossing_journey)
|
|
|
|
def _spawn_new_agents(self) -> None:
|
|
"""
|
|
Spawn new agents if there are available spawn points and agents remaining.
|
|
"""
|
|
# Get current agent positions for crosswalk controller
|
|
current_positions = {}
|
|
for agent_id in self.active_agents:
|
|
agent = self.simulation.agent(agent_id)
|
|
if agent:
|
|
current_positions[agent_id] = (agent.position[0], agent.position[1])
|
|
|
|
# Update crosswalk state with current agent positions
|
|
crosswalk_state = self.crosswalk_controller.update(
|
|
agent_pos=current_positions,
|
|
time_step=self.time_step,
|
|
current_time=self.current_time
|
|
)
|
|
|
|
# Determine how many agents to spawn (respecting queue capacity)
|
|
agents_to_spawn = min(
|
|
5, # Max spawn per iteration
|
|
self.total_agents - len(self.active_agents)
|
|
)
|
|
|
|
for _ in range(agents_to_spawn):
|
|
agent_data = self.spawn_manager.spawn_agent(self.all_agents)
|
|
if not agent_data:
|
|
break # No more agents or spawn points
|
|
|
|
# Choose journey based on crosswalk state
|
|
if crosswalk_state.is_active:
|
|
# Crosswalk is active, go directly to crossing
|
|
journey_id = self.crossing_journey_id
|
|
stage_id = self.crosswalk_entry_id
|
|
initial_target = "crosswalk"
|
|
else:
|
|
# Crosswalk inactive, go to queue area
|
|
journey_id = self.waiting_journey_id
|
|
stage_id = self.queue_stage_id
|
|
initial_target = "queue"
|
|
self.agents_in_queue.add(agent_data.id)
|
|
|
|
# Add agent to JuPedSim simulation [citation:1]
|
|
spawn_point = self.spawn_manager.get_agent_pos(agent_data.id)
|
|
agent_params = jps.CollisionFreeSpeedModelAgentParameters(
|
|
journey_id=journey_id,
|
|
stage_id=stage_id,
|
|
position=spawn_point,
|
|
desired_speed=agent_data.speed,
|
|
radius=agent_data.radius
|
|
)
|
|
|
|
jps_agent_id = self.simulation.add_agent(agent_params)
|
|
|
|
# Track agent in our system
|
|
self.active_agents[jps_agent_id] = {
|
|
'original_id': agent_data.id,
|
|
'grade': agent_data.grade,
|
|
'spawn_time': self.current_time,
|
|
'current_stage': initial_target,
|
|
'stage_history': [initial_target]
|
|
}
|
|
|
|
print(f"Spawned agent {agent_data.id} (JuPedSim ID: {jps_agent_id}) "
|
|
f"at {spawn_point}, target: {initial_target}, "
|
|
f"speed: {agent_data.speed:.2f}m/s")
|
|
|
|
def _update_agent_stages(self) -> None:
|
|
"""
|
|
Update agent stages based on crosswalk state and agent positions.
|
|
Move agents from queue to crosswalk when crosswalk becomes active.
|
|
"""
|
|
if not self.crosswalk_controller.is_active:
|
|
return
|
|
|
|
# Crosswalk is active, move queued agents to crossing
|
|
agents_to_move = []
|
|
for jps_id, agent_info in self.active_agents.items():
|
|
if agent_info['current_stage'] == 'queue' and jps_id in self.agents_in_queue:
|
|
agents_to_move.append(jps_id)
|
|
|
|
for jps_id in agents_to_move[:5]: # Move up to 5 agents per iteration
|
|
agent = self.simulation.agent(jps_id)
|
|
if agent:
|
|
# Check if agent is actually in queue area before moving
|
|
agent_point = Point(agent.position[0], agent.position[1])
|
|
if self.queue_area.contains(agent_point):
|
|
# Change agent's journey to crossing journey
|
|
self.simulation.switch_agent_journey(
|
|
jps_id,
|
|
self.crossing_journey_id,
|
|
self.crosswalk_entry_id
|
|
)
|
|
|
|
self.active_agents[jps_id]['current_stage'] = 'crosswalk'
|
|
self.active_agents[jps_id]['stage_history'].append('crosswalk')
|
|
self.agents_in_queue.discard(jps_id)
|
|
self.agents_in_crosswalk.add(jps_id)
|
|
|
|
print(f"Moved agent {self.active_agents[jps_id]['original_id']} "
|
|
f"from queue to crosswalk")
|
|
|
|
def _check_agent_progress(self) -> None:
|
|
"""
|
|
Check agent progress through stages and handle completed agents.
|
|
"""
|
|
completed_agents = []
|
|
|
|
for jps_id, agent_info in list(self.active_agents.items()):
|
|
agent = self.simulation.agent(jps_id)
|
|
if not agent:
|
|
# Agent has left the simulation (reached exit)
|
|
completed_agents.append(jps_id)
|
|
|
|
# Free up spawn point
|
|
original_id = agent_info['original_id']
|
|
self.spawn_manager.unfill_coords(original_id)
|
|
|
|
print(f"Agent {original_id} completed journey in "
|
|
f"{self.current_time - agent_info['spawn_time']:.1f}s, "
|
|
f"stages: {agent_info['stage_history']}")
|
|
|
|
# Update crosswalk controller if agent was in trigger zone
|
|
if original_id in self.crosswalk_controller.agents_in_trigger:
|
|
self.crosswalk_controller.agents_in_trigger.discard(original_id)
|
|
|
|
else:
|
|
# Update current stage based on agent's target
|
|
current_target = agent.stage_id
|
|
if current_target == self.crosswalk_entry_id and agent_info['current_stage'] != 'crosswalk':
|
|
agent_info['current_stage'] = 'crosswalk'
|
|
agent_info['stage_history'].append('crosswalk')
|
|
self.agents_in_queue.discard(jps_id)
|
|
self.agents_in_crosswalk.add(jps_id)
|
|
|
|
elif current_target == self.exit_stage_id and agent_info['current_stage'] != 'exit':
|
|
agent_info['current_stage'] = 'exit'
|
|
agent_info['stage_history'].append('exit')
|
|
self.agents_in_crosswalk.discard(jps_id)
|
|
|
|
# Remove completed agents from tracking
|
|
for jps_id in completed_agents:
|
|
original_id = self.active_agents[jps_id]['original_id']
|
|
self.agents_in_queue.discard(jps_id)
|
|
self.agents_in_crosswalk.discard(jps_id)
|
|
del self.active_agents[jps_id]
|
|
|
|
def _log_simulation_state(self, iteration: int) -> None:
|
|
"""
|
|
Log current simulation state.
|
|
"""
|
|
if iteration % 100 == 0: # Log every 10 seconds (100 iterations * 0.1s)
|
|
crosswalk_stats = self.crosswalk_controller.get_statistics()
|
|
status = self.crosswalk_controller.get_state()
|
|
|
|
print(f"\n--- Time: {self.current_time:.1f}s, Iteration: {iteration} ---")
|
|
print(f"Active agents: {len(self.active_agents)}")
|
|
print(f" In queue: {len(self.agents_in_queue)}, "
|
|
f"In crosswalk: {len(self.agents_in_crosswalk)}")
|
|
print(f"Crosswalk: {status.status.value.upper()}, "
|
|
f"Active: {status.is_active}, "
|
|
f"Agents in trigger: {status.agents_in_trigger}")
|
|
print(f"Total activations: {crosswalk_stats['total_activations']}, "
|
|
f"Agents served: {crosswalk_stats['agents_served']}")
|
|
print("-" * 50)
|
|
|
|
def run(self) -> Dict:
|
|
"""
|
|
Run the complete simulation.
|
|
|
|
Returns:
|
|
Dictionary with simulation results and statistics
|
|
"""
|
|
print("\n" + "="*60)
|
|
print("STARTING CROSSWALK SIMULATION")
|
|
print("="*60)
|
|
|
|
start_wall_time = time.time()
|
|
iteration = 0
|
|
|
|
try:
|
|
while self.current_time < self.simulation_time:
|
|
# 1. Spawn new agents if capacity allows
|
|
self._spawn_new_agents()
|
|
|
|
# 2. Update agent stages based on crosswalk state
|
|
self._update_agent_stages()
|
|
|
|
# 3. Execute one simulation iteration [citation:1]
|
|
self.simulation.iterate()
|
|
|
|
# 4. Update simulation time
|
|
self.current_time += self.time_step
|
|
iteration += 1
|
|
|
|
# 5. Check agent progress
|
|
self._check_agent_progress()
|
|
|
|
# 6. Log state periodically
|
|
self._log_simulation_state(iteration)
|
|
|
|
# 7. Check for simulation completion
|
|
if (len(self.active_agents) == 0 and
|
|
len(self.spawn_manager.spawned_agents) >= self.total_agents):
|
|
print("\nAll agents have completed their journey.")
|
|
self.simulation_complete = True
|
|
break
|
|
|
|
except KeyboardInterrupt:
|
|
print("\nSimulation interrupted by user.")
|
|
except Exception as e:
|
|
print(f"\nError during simulation: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
# Calculate final statistics
|
|
wall_time_elapsed = time.time() - start_wall_time
|
|
crosswalk_stats = self.crosswalk_controller.get_statistics()
|
|
|
|
results = {
|
|
'simulation_time': self.current_time,
|
|
'wall_time': wall_time_elapsed,
|
|
'iterations': iteration,
|
|
'total_agents': self.total_agents,
|
|
'agents_spawned': len(self.spawn_manager.spawned_agents),
|
|
'agents_completed': self.total_agents - len(self.active_agents),
|
|
'crosswalk_activations': crosswalk_stats['total_activations'],
|
|
'crosswalk_active_time': crosswalk_stats['total_active_time'],
|
|
'agents_served': crosswalk_stats['agents_served'],
|
|
'simulation_complete': self.simulation_complete,
|
|
'trajectory_file': self.trajectory_file
|
|
}
|
|
|
|
print("\n" + "="*60)
|
|
print("SIMULATION COMPLETE")
|
|
print("="*60)
|
|
print(f"Simulation time: {self.current_time:.1f}s")
|
|
print(f"Wall time: {wall_time_elapsed:.1f}s")
|
|
print(f"Iterations: {iteration}")
|
|
print(f"Agents: {results['agents_spawned']} spawned, "
|
|
f"{results['agents_completed']} completed")
|
|
print(f"Crosswalk: {results['crosswalk_activations']} activations, "
|
|
f"{results['crosswalk_active_time']:.1f}s total active time")
|
|
print(f"Trajectory data saved to: {self.trajectory_file}")
|
|
print("="*60)
|
|
|
|
return results
|
|
|
|
def visualize(self) -> None:
|
|
"""
|
|
Create a simple visualization of the simulation setup.
|
|
Requires matplotlib.
|
|
"""
|
|
try:
|
|
import matplotlib.pyplot as plt
|
|
from shapely.plotting import plot_polygon
|
|
|
|
fig, ax = plt.subplots(figsize=(12, 8))
|
|
|
|
# Plot areas
|
|
plot_polygon(self.spawn_area, ax=ax, color='purple', alpha=0.3, label='Spawn Area')
|
|
plot_polygon(self.queue_area, ax=ax, color='blue', alpha=0.3, label='Queue Area')
|
|
plot_polygon(self.trigger_area, ax=ax, color='green', alpha=0.3, label='Trigger Area')
|
|
plot_polygon(self.crosswalk_area, ax=ax, color='red', alpha=0.5, label='Crosswalk')
|
|
plot_polygon(self.grass_area, ax=ax, color='lightgreen', alpha=0.3, label='Grass Area')
|
|
|
|
# Plot spawn points
|
|
spawn_points = self.spawn_manager.spawn_coords
|
|
ax.scatter(spawn_points[:, 0], spawn_points[:, 1],
|
|
c='black', s=10, alpha=0.5, label='Spawn Points')
|
|
|
|
# Add journey waypoints
|
|
queue_center = self.queue_area.centroid
|
|
ax.scatter(queue_center.x, queue_center.y, c='blue', s=100,
|
|
marker='s', label='Queue Waypoint')
|
|
|
|
crosswalk_bounds = self.crosswalk_area.bounds
|
|
crosswalk_entry = ((crosswalk_bounds[0] + crosswalk_bounds[2]) / 2,
|
|
crosswalk_bounds[1])
|
|
crosswalk_exit = ((crosswalk_bounds[0] + crosswalk_bounds[2]) / 2,
|
|
crosswalk_bounds[3])
|
|
|
|
ax.scatter(crosswalk_entry[0], crosswalk_entry[1], c='red', s=100,
|
|
marker='^', label='Crosswalk Entry')
|
|
ax.scatter(crosswalk_exit[0], crosswalk_exit[1], c='green', s=100,
|
|
marker='v', label='Crosswalk Exit')
|
|
|
|
ax.set_xlabel('X (meters)')
|
|
ax.set_ylabel('Y (meters)')
|
|
ax.set_title('Crosswalk Simulation Setup')
|
|
ax.legend(loc='upper right')
|
|
ax.grid(True, alpha=0.3)
|
|
ax.set_aspect('equal')
|
|
|
|
plt.tight_layout()
|
|
plt.show()
|
|
|
|
except ImportError:
|
|
print("Visualization requires matplotlib. Install with: pip install matplotlib")
|
|
|
|
|
|
def main():
|
|
"""
|
|
Main entry point for the simulation.
|
|
"""
|
|
# Parse command line arguments
|
|
import argparse
|
|
parser = argparse.ArgumentParser(description='Run crosswalk simulation')
|
|
parser.add_argument('--setup', type=str, default='new',
|
|
help='Geometry setup: "current" or "new" (default)')
|
|
parser.add_argument('--time', type=float, default=300.0,
|
|
help='Maximum simulation time in seconds (default: 300)')
|
|
parser.add_argument('--visualize', action='store_true',
|
|
help='Show visualization before running')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Convert setup string to boolean
|
|
current_setup = (args.setup.lower() == 'current')
|
|
|
|
# Create and run simulation
|
|
sim = CrosswalkSimulation(
|
|
current_setup=current_setup,
|
|
simulation_time=args.time
|
|
)
|
|
|
|
# Show visualization if requested
|
|
if args.visualize:
|
|
sim.visualize()
|
|
input("Press Enter to start simulation...")
|
|
|
|
# Run simulation
|
|
results = sim.run()
|
|
|
|
# Save results to file
|
|
import json
|
|
results_file = f"simulation_results_{time.strftime('%Y%m%d_%H%M%S')}.json"
|
|
with open(results_file, 'w') as f:
|
|
json.dump(results, f, indent=2)
|
|
|
|
print(f"\nResults saved to: {results_file}")
|
|
return results
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|
|
|
|
|
|
======================================
|
|
(example 2)
|
|
======================================
|
|
|
|
import sys
|
|
import pathlib
|
|
import numpy as np
|
|
from typing import Dict, Tuple
|
|
|
|
# 1. Import JuPedSim and supporting libraries
|
|
import jupedsim as jps
|
|
from shapely import Polygon, GeometryCollection, get_coordinates
|
|
from shapely.plotting import plot_polygon
|
|
import matplotlib.pyplot as plt
|
|
|
|
# 2. Import your custom modules
|
|
from geometry_setup import GeometrySetup
|
|
from spawning import SpawnManager
|
|
from agents.agent_setup import run_AgentSetup
|
|
from crosswalk_setup import CrosswalkController, CrosswalkConfig, CrosswalkStatus
|
|
|
|
class CrosswalkSimulation:
|
|
"""
|
|
Main class to integrate custom components with JuPedSim.
|
|
Manages the simulation loop, agent lifecycle, and crosswalk-controlled routing.
|
|
"""
|
|
def __init__(self, current_setup: bool, sim_time_step: float = 0.01):
|
|
self.sim_time_step = sim_time_step
|
|
self.current_time = 0.0
|
|
|
|
# 3. Initialize Geometry and Derived Polygons
|
|
print("Initializing geometry...")
|
|
self.geo = GeometrySetup(current_setup)
|
|
self.spawn_area = self.geo.spawn()
|
|
self.queue_area = self.geo.queue()
|
|
self.trigger_area = self.geo.trigger()
|
|
self.crosswalk_area = self.geo.crosswalk()
|
|
self.grass_area = self.geo.grass()
|
|
|
|
# 4. Create the unified walkable area for JuPedSim
|
|
# The walkable area is the union of all non-obstacle polygons.
|
|
self.walkable_area = GeometryCollection([
|
|
self.spawn_area,
|
|
self.queue_area,
|
|
self.trigger_area,
|
|
self.crosswalk_area,
|
|
self.grass_area
|
|
]).unary_union # Creates a single polygon[citation:7]
|
|
|
|
# 5. Initialize your custom management systems
|
|
print("Setting up agent and spawn managers...")
|
|
self.all_agents, self.pop_list = run_AgentSetup()
|
|
self.spawn_manager = SpawnManager(self.spawn_area, min_spacing=0.55)
|
|
self.spawn_manager.generate_coords(max_samples=500) # Generate spawn points
|
|
|
|
print("Configuring crosswalk controller...")
|
|
crosswalk_config = CrosswalkConfig(
|
|
trigger_dist=2.0,
|
|
activation_time=15.0, # seconds active
|
|
cooldown_time=5.0, # seconds between activations
|
|
min_agents_activation=1,
|
|
activation_delay=0.0,
|
|
crosswalk_area=self.crosswalk_area,
|
|
trigger_area=self.trigger_area
|
|
)
|
|
self.crosswalk_controller = CrosswalkController(crosswalk_config, current_setup)
|
|
|
|
# 6. Set up the JuPedSim simulation core[citation:4]
|
|
print("Creating JuPedSim simulation object...")
|
|
self.trajectory_file = pathlib.Path("crosswalk_simulation.sqlite")
|
|
self.simulation = jps.Simulation(
|
|
model=jps.CollisionFreeSpeedModel(), # Choose a pedestrian model[citation:1]
|
|
geometry=self.walkable_area,
|
|
trajectory_writer=jps.SqliteTrajectoryWriter(
|
|
output_file=self.trajectory_file
|
|
),
|
|
dt=self.sim_time_step
|
|
)
|
|
|
|
# 7. Define JuPedSim Stages for routing[citation:3]
|
|
# Waiting Set: Agents wait here (in the queue area) for the crosswalk.
|
|
self.waiting_positions = self._generate_waiting_positions(self.queue_area, spacing=0.6)
|
|
self.waiting_stage_id = self.simulation.add_waiting_set_stage(self.waiting_positions)
|
|
self.waiting_stage = self.simulation.get_stage(self.waiting_stage_id)
|
|
self.waiting_stage.state = jps.WaitingSetState.INACTIVE # Start as a waypoint
|
|
|
|
# Exit Stage: Agents finish here (in the grass area).
|
|
# Use the centroid of the grass area as the final target.
|
|
grass_centroid = list(self.grass_area.centroid.coords[0])
|
|
self.exit_stage_id = self.simulation.add_exit_stage(
|
|
Polygon([(grass_centroid[0]-0.5, grass_centroid[1]-0.5),
|
|
(grass_centroid[0]+0.5, grass_centroid[1]-0.5),
|
|
(grass_centroid[0]+0.5, grass_centroid[1]+0.5),
|
|
(grass_centroid[0]-0.5, grass_centroid[1]+0.5)])
|
|
)
|
|
|
|
# 8. Create the main Journey[citation:3]
|
|
# Journey: Waiting Set -> Exit.
|
|
self.journey = jps.JourneyDescription([self.waiting_stage_id, self.exit_stage_id])
|
|
# Transition: Agents go from the waiting set directly to the exit.
|
|
self.journey.set_transition_for_stage(
|
|
self.waiting_stage_id,
|
|
jps.Transition.create_fixed_transition(self.exit_stage_id)
|
|
)
|
|
self.journey_id = self.simulation.add_journey(self.journey)
|
|
|
|
# 9. Tracking dictionaries
|
|
self.jps_agent_ids = {} # Maps your agent.id -> JuPedSim agent id
|
|
self.agent_targets = {} # Maps your agent.id -> current target (x, y)
|
|
|
|
def _generate_waiting_positions(self, polygon: Polygon, spacing: float):
|
|
"""Generate grid positions inside a polygon for waiting/queueing."""
|
|
min_x, min_y, max_x, max_y = polygon.bounds
|
|
positions = []
|
|
x = min_x + spacing / 2
|
|
while x < max_x:
|
|
y = min_y + spacing / 2
|
|
while y < max_y:
|
|
point = jps.Vector2D(x, y)
|
|
if polygon.contains(Polygon([(x, y)]).buffer(0.1)):
|
|
positions.append((point.x, point.y))
|
|
y += spacing
|
|
x += spacing
|
|
return positions if positions else [(min_x+0.5, min_y+0.5)]
|
|
|
|
def run(self, max_iterations: int = 50000):
|
|
"""Main simulation loop."""
|
|
print(f"\nStarting simulation. Target agents: {len(self.all_agents)}")
|
|
iteration = 0
|
|
|
|
while iteration < max_iterations:
|
|
# A. SPAWNING: Add new agents if there is space and agents left.
|
|
if self.spawn_manager.check_spawn_complete(self.all_agents):
|
|
new_agent = self.spawn_manager.spawn_agent(self.all_agents)
|
|
if new_agent:
|
|
# Add agent to JuPedSim targeting the waiting set.
|
|
jps_agent_params = jps.CollisionFreeSpeedModelAgentParameters(
|
|
journey_id=self.journey_id,
|
|
stage_id=self.waiting_stage_id,
|
|
position=self.spawn_manager.get_agent_pos(new_agent.id),
|
|
v0=new_agent.speed, # Desired speed
|
|
radius=new_agent.radius
|
|
)
|
|
jps_agent_id = self.simulation.add_agent(jps_agent_params)
|
|
self.jps_agent_ids[new_agent.id] = jps_agent_id
|
|
self.agent_targets[new_agent.id] = None
|
|
print(f"Iter {iteration:05d}: Spawned agent {new_agent.id} (JPS-ID: {jps_agent_id})")
|
|
|
|
# B. CROSSWALK CONTROL & ROUTING
|
|
# Get positions of ALL agents currently in the JuPedSim simulation.
|
|
current_agent_positions = {}
|
|
for your_id, jps_id in self.jps_agent_ids.items():
|
|
agent = self.simulation.agent(jps_id)
|
|
current_agent_positions[your_id] = (agent.position[0], agent.position[1])
|
|
|
|
# Update the crosswalk state based on agents in the trigger area.
|
|
crosswalk_state = self.crosswalk_controller.update(
|
|
current_agent_positions, self.sim_time_step, self.current_time
|
|
)
|
|
|
|
# C. CONTROL AGENT MOVEMENT BASED ON CROSSWALK STATE
|
|
if crosswalk_state.is_active:
|
|
# Crosswalk ACTIVE: Release agents from the waiting set.
|
|
if self.waiting_stage.state != jps.WaitingSetState.INACTIVE:
|
|
self.waiting_stage.state = jps.WaitingSetState.INACTIVE
|
|
print(f"Iter {iteration:05d}: Crosswalk ACTIVATED, releasing queue.")
|
|
else:
|
|
# Crosswalk INACTIVE: Hold agents in the waiting set.
|
|
if self.waiting_stage.state != jps.WaitingSetState.ACTIVE:"""
|
|
main_simulation.py
|
|
Main integration file for crosswalk simulation with JuPedSim
|
|
"""
|
|
|
|
import sys
|
|
import pathlib
|
|
from typing import Dict, Tuple, List
|
|
import jupedsim as jps
|
|
from shapely import Polygon, Point
|
|
import numpy as np
|
|
from dataclasses import dataclass
|
|
import matplotlib.pyplot as plt
|
|
|
|
# Import your existing modules
|
|
from geometry_setup import GeometrySetup
|
|
from spawning import SpawnManager
|
|
from agents.agent_setup import run_AgentSetup
|
|
from crosswalk_setup import CrosswalkController, CrosswalkConfig, CrosswalkStatus
|
|
|
|
@dataclass
|
|
class SimulationConfig:
|
|
"""Configuration for the entire simulation"""
|
|
time_step: float = 0.01 # Simulation time step in seconds
|
|
max_iterations: int = 5000 # Maximum simulation iterations
|
|
output_file: str = "crosswalk_simulation.sqlite"
|
|
current_setup: bool = True # Use current or alternative geometry
|
|
model_type: str = "collision_free" # Options: "collision_free", "anticipation"
|
|
|
|
# Crosswalk parameters
|
|
trigger_distance: float = 1.0
|
|
activation_time: float = 10.0
|
|
cooldown_time: float = 5.0
|
|
min_agents_activation: int = 1
|
|
activation_delay: float = 2.0
|
|
|
|
class CrosswalkSimulation:
|
|
"""Main simulation class integrating all components"""
|
|
|
|
def __init__(self, config: SimulationConfig):
|
|
self.config = config
|
|
self.simulation = None
|
|
self.geometry = None
|
|
self.spawn_manager = None
|
|
self.crosswalk_controller = None
|
|
self.agents_data = []
|
|
self.journey_id = None
|
|
self.exit_id = None
|
|
self.queue_stage_id = None
|
|
|
|
# Statistics
|
|
self.iteration_count = 0
|
|
self.active_agents = {}
|
|
|
|
def setup(self):
|
|
"""Setup all simulation components"""
|
|
print("Setting up simulation components...")
|
|
|
|
# 1. Setup geometry
|
|
geo_setup = GeometrySetup(self.config.current_setup)
|
|
self.geometry = self._create_geometry_polygon(geo_setup)
|
|
|
|
# 2. Setup crosswalk controller
|
|
crosswalk_config = CrosswalkConfig(
|
|
trigger_dist=self.config.trigger_distance,
|
|
activation_time=self.config.activation_time,
|
|
cooldown_time=self.config.cooldown_time,
|
|
min_agents_activation=self.config.min_agents_activation,
|
|
activation_delay=self.config.activation_delay,
|
|
crosswalk_area=geo_setup.crosswalk(),
|
|
trigger_area=geo_setup.trigger()
|
|
)
|
|
self.crosswalk_controller = CrosswalkController(
|
|
crosswalk_config,
|
|
self.config.current_setup
|
|
)
|
|
|
|
# 3. Setup spawn manager
|
|
spawn_area = geo_setup.spawn()
|
|
self.spawn_manager = SpawnManager(spawn_area, min_spacing=0.55)
|
|
self.spawn_manager.generate_coords()
|
|
|
|
# 4. Get agent data
|
|
self.agents_data, pop_list = run_AgentSetup()
|
|
print(f"Created {len(self.agents_data)} agents")
|
|
|
|
# 5. Setup JuPedSim simulation
|
|
self._setup_jupedsim_simulation()
|
|
|
|
print("Setup complete!")
|
|
|
|
def _create_geometry_polygon(self, geo_setup: GeometrySetup) -> Polygon:
|
|
"""Create a single walkable area polygon from all geometry components"""
|
|
# Combine all non-walkable areas (obstacles) as holes
|
|
walkable_exterior = Polygon([
|
|
(0, 0),
|
|
(30, 0),
|
|
(30, 20),
|
|
(0, 20)
|
|
])
|
|
|
|
# Define obstacles (non-walkable areas)
|
|
obstacles = [
|
|
geo_setup.grass(), # Grass area is non-walkable
|
|
]
|
|
|
|
return walkable_exterior
|
|
|
|
def _setup_jupedsim_simulation(self):
|
|
"""Setup JuPedSim simulation object with chosen model[citation:1]"""
|
|
|
|
# Choose pedestrian model[citation:1]
|
|
if self.config.model_type == "collision_free":
|
|
model = jps.CollisionFreeSpeedModel()
|
|
elif self.config.model_type == "anticipation":
|
|
model = jps.AnticipationVelocityModel()
|
|
else:
|
|
raise ValueError(f"Unknown model type: {self.config.model_type}")
|
|
|
|
# Create simulation object[citation:6]
|
|
self.simulation = jps.Simulation(
|
|
model=model,
|
|
geometry=self.geometry,
|
|
trajectory_writer=jps.SqliteTrajectoryWriter(
|
|
output_file=pathlib.Path(self.config.output_file)
|
|
),
|
|
)
|
|
|
|
# Setup journey with stages[citation:3]
|
|
self._setup_journey()
|
|
|
|
def _setup_journey(self):
|
|
"""Setup agent journey with stages and transitions[citation:3]"""
|
|
|
|
# Create exit stage (across the road)
|
|
exit_polygon = Polygon([
|
|
(4, 17.761),
|
|
(0, 17.761),
|
|
(0, 20),
|
|
(4, 20)
|
|
])
|
|
self.exit_id = self.simulation.add_exit_stage(exit_polygon)
|
|
|
|
# Create waiting positions before crosswalk (queue area)
|
|
queue_positions = self._generate_queue_positions()
|
|
self.queue_stage_id = self.simulation.add_queue_stage(queue_positions)
|
|
|
|
# Create waypoint at crosswalk entrance
|
|
crosswalk_entrance = (15.0, 7.0) # Near trigger area
|
|
waypoint_dist = 0.5
|
|
waypoint_id = self.simulation.add_waypoint_stage(crosswalk_entrance, waypoint_dist)
|
|
|
|
# Create journey with stages[citation:3]
|
|
journey = jps.JourneyDescription([
|
|
waypoint_id, # First go to crosswalk entrance
|
|
self.queue_stage_id, # Wait in queue
|
|
self.exit_id # Finally exit
|
|
])
|
|
|
|
# Setup transitions between stages[citation:3]
|
|
journey.set_transition_for_stage(
|
|
waypoint_id,
|
|
jps.Transition.create_fixed_transition(self.queue_stage_id)
|
|
)
|
|
|
|
# Transition from queue to exit only when crosswalk is active
|
|
# We'll control this dynamically in the simulation loop
|
|
journey.set_transition_for_stage(
|
|
self.queue_stage_id,
|
|
jps.Transition.create_fixed_transition(self.exit_id)
|
|
)
|
|
|
|
self.journey_id = self.simulation.add_journey(journey)
|
|
|
|
def _generate_queue_positions(self) -> List[Tuple[float, float]]:
|
|
"""Generate waiting positions in the queue area"""
|
|
queue_positions = []
|
|
# Create a grid of waiting positions in the queue area
|
|
for i in range(5): # 5 rows
|
|
for j in range(3): # 3 columns
|
|
x = 18.0 + j * 0.6
|
|
y = 6.0 + i * 0.6
|
|
queue_positions.append((x, y))
|
|
return queue_positions
|
|
|
|
def run(self):
|
|
"""Run the main simulation loop"""
|
|
print("Starting simulation...")
|
|
|
|
# Spawn initial agents
|
|
self._spawn_initial_agents()
|
|
|
|
# Get queue object for controlling agent release[citation:2]
|
|
queue = self.simulation.get_stage(self.queue_stage_id)
|
|
|
|
# Main simulation loop[citation:6]
|
|
while (self.simulation.agent_count() > 0 and
|
|
self.iteration_count < self.config.max_iterations):
|
|
|
|
# Get current agent positions
|
|
agent_positions = self._get_agent_positions()
|
|
|
|
# Update crosswalk controller
|
|
crosswalk_state = self.crosswalk_controller.update(
|
|
agent_positions,
|
|
self.config.time_step,
|
|
current_time=self.iteration_count * self.config.time_step
|
|
)
|
|
|
|
# Control queue based on crosswalk state[citation:2]
|
|
if crosswalk_state.is_active:
|
|
# Release agents from queue when crosswalk is active
|
|
agents_in_queue = self._count_agents_in_stage(self.queue_stage_id)
|
|
if agents_in_queue > 0:
|
|
# Release all agents that can cross
|
|
queue.pop(agents_in_queue)
|
|
|
|
# Spawn new agents if needed
|
|
self._spawn_new_agents()
|
|
|
|
# Run simulation iteration
|
|
self.simulation.iterate()
|
|
self.iteration_count += 1
|
|
|
|
# Print progress
|
|
if self.iteration_count % 100 == 0:
|
|
self._print_progress(crosswalk_state)
|
|
|
|
print(f"Simulation completed after {self.iteration_count} iterations")
|
|
print(f"Agents remaining: {self.simulation.agent_count()}")
|
|
|
|
# Print final statistics
|
|
stats = self.crosswalk_controller.get_statistics()
|
|
print("\nCrosswalk Statistics:")
|
|
for key, value in stats.items():
|
|
print(f" {key}: {value}")
|
|
|
|
def _spawn_initial_agents(self):
|
|
"""Spawn initial batch of agents"""
|
|
num_initial_agents = min(20, len(self.agents_data))
|
|
print(f"Spawning {num_initial_agents} initial agents...")
|
|
|
|
for _ in range(num_initial_agents):
|
|
agent = self.spawn_manager.spawn_agent(self.agents_data)
|
|
if agent:
|
|
self._add_agent_to_simulation(agent)
|
|
|
|
def _spawn_new_agents(self):
|
|
"""Spawn new agents during simulation if needed"""
|
|
# Spawn new agent every 10 iterations if we have capacity
|
|
if self.iteration_count % 10 == 0:
|
|
agent = self.spawn_manager.spawn_agent(self.agents_data)
|
|
if agent:
|
|
self._add_agent_to_simulation(agent)
|
|
|
|
def _add_agent_to_simulation(self, agent):
|
|
"""Add an agent to the JuPedSim simulation[citation:6]"""
|
|
|
|
# Get spawn position
|
|
spawn_pos = self.spawn_manager.get_agent_pos(agent.id)
|
|
|
|
# Choose appropriate agent parameters based on model[citation:1]
|
|
if self.config.model_type == "collision_free":
|
|
agent_params = jps.CollisionFreeSpeedModelAgentParameters(
|
|
position=spawn_pos,
|
|
desired_speed=agent.speed,
|
|
radius=agent.radius,
|
|
journey_id=self.journey_id,
|
|
stage_id=self.queue_stage_id # Start in queue stage
|
|
)
|
|
else: # anticipation model
|
|
agent_params = jps.AnticipationVelocityModelAgentParameters(
|
|
position=spawn_pos,
|
|
desired_speed=agent.speed,
|
|
radius=agent.radius,
|
|
journey_id=self.journey_id,
|
|
stage_id=self.queue_stage_id
|
|
)
|
|
|
|
# Add agent to simulation
|
|
agent_id = self.simulation.add_agent(agent_params)
|
|
self.active_agents[agent_id] = agent.id
|
|
|
|
print(f"Spawned agent {agent.id} at position {spawn_pos}")
|
|
|
|
def _get_agent_positions(self) -> Dict[int, Tuple[float, float]]:
|
|
"""Get current positions of all agents in simulation"""
|
|
positions = {}
|
|
for agent in self.simulation.agents():
|
|
positions[agent.id] = agent.position
|
|
return positions
|
|
|
|
def _count_agents_in_stage(self, stage_id: int) -> int:
|
|
"""Count agents currently targeting a specific stage"""
|
|
count = 0
|
|
for agent in self.simulation.agents():
|
|
if agent.stage_id == stage_id:
|
|
count += 1
|
|
return count
|
|
|
|
def _print_progress(self, crosswalk_state):
|
|
"""Print simulation progress"""
|
|
print(f"Iteration: {self.iteration_count}, "
|
|
f"Agents: {self.simulation.agent_count()}, "
|
|
f"Crosswalk: {crosswalk_state.status.value}, "
|
|
f"Active time remaining: {crosswalk_state.time_remaining:.1f}s")
|
|
|
|
def visualize(self):
|
|
"""Visualize simulation results"""
|
|
try:
|
|
import pedpy
|
|
from jupedsim.internal.notebook_utils import read_sqlite_file
|
|
|
|
# Read trajectory data
|
|
trajectory_data, walkable_area = read_sqlite_file(self.config.output_file)
|
|
|
|
# Create visualization
|
|
fig, axes = plt.subplots(1, 2, figsize=(15, 6))
|
|
|
|
# Plot trajectories
|
|
pedpy.plot_trajectories(
|
|
traj=trajectory_data,
|
|
axes=axes[0],
|
|
walkable_area=walkable_area
|
|
)
|
|
axes[0].set_title("Agent Trajectories")
|
|
|
|
# Plot density heatmap
|
|
density = pedpy.compute_classic_density(
|
|
traj_data=trajectory_data,
|
|
measurement_area=walkable_area.polygon,
|
|
frame_step=10
|
|
)
|
|
pedpy.plot_density(
|
|
density=density,
|
|
axes=axes[1],
|
|
walkable_area=walkable_area
|
|
)
|
|
axes[1].set_title("Density Heatmap")
|
|
|
|
plt.tight_layout()
|
|
plt.show()
|
|
|
|
except ImportError:
|
|
print("Visualization requires pedpy and matplotlib. Install with: pip install pedpy matplotlib")
|
|
|
|
def main():
|
|
"""Main entry point for the simulation"""
|
|
|
|
# Configuration
|
|
config = SimulationConfig(
|
|
current_setup=True,
|
|
time_step=0.01,
|
|
max_iterations=3000,
|
|
output_file="crosswalk_simulation_results.sqlite",
|
|
trigger_distance=1.5,
|
|
activation_time=15.0,
|
|
cooldown_time=3.0,
|
|
min_agents_activation=1,
|
|
activation_delay=1.0
|
|
)
|
|
|
|
# Create and run simulation
|
|
simulation = CrosswalkSimulation(config)
|
|
simulation.setup()
|
|
simulation.run()
|
|
|
|
# Optional: Visualize results
|
|
# simulation.visualize()
|
|
|
|
print("\nSimulation completed successfully!")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
self.waiting_stage.state = jps.WaitingSetState.ACTIVE
|
|
print(f"Iter {iteration:05d}: Crosswalk INACTIVE, holding agents.")
|
|
|
|
# D. ITERATE: Advance the JuPedSim simulation by one step.
|
|
self.simulation.iterate()
|
|
self.current_time += self.sim_time_step
|
|
iteration += 1
|
|
|
|
# E. REMOVAL: Check for agents that have reached the exit.
|
|
# Their JPS id will no longer be found in the simulation.
|
|
agents_to_remove = []
|
|
for your_id, jps_id in list(self.jps_agent_ids.items()):
|
|
try:
|
|
# This will fail if the agent has been removed (reached exit).
|
|
_ = self.simulation.agent(jps_id)
|
|
except RuntimeError:
|
|
agents_to_remove.append(your_id)
|
|
# Free up the spawn point for this agent.
|
|
self.spawn_manager.unfill_coords(your_id)
|
|
|
|
for your_id in agents_to_remove:
|
|
del self.jps_agent_ids[your_id]
|
|
if your_id in self.agent_targets:
|
|
del self.agent_targets[your_id]
|
|
print(f"Iter {iteration:05d}: Agent {your_id} reached exit and was removed.")
|
|
|
|
# F. COMPLETION CHECK: Stop if all agents spawned and exited.
|
|
if (len(self.jps_agent_ids) == 0 and
|
|
not self.spawn_manager.check_spawn_complete(self.all_agents)):
|
|
print(f"\nSimulation completed at iteration {iteration}.")
|
|
print(f"Total simulation time: {self.current_time:.2f} seconds.")
|
|
break
|
|
|
|
# G. FINALIZATION
|
|
print("\n" + "="*50)
|
|
print("Simulation finished.")
|
|
stats = self.crosswalk_controller.get_statistics()
|
|
print(f"Crosswalk was activated {stats['total_activations']} times.")
|
|
print(f"Agents served by crosswalk: {stats['agents_served']}")
|
|
print(f"Trajectory saved to: {self.trajectory_file.absolute()}")
|
|
|
|
def plot_final_setup(self):
|
|
"""Visualize the final geometry setup."""
|
|
fig, ax = plt.subplots(figsize=(10, 8))
|
|
for poly, color, label in [
|
|
(self.spawn_area, 'purple', 'Spawn Area'),
|
|
(self.queue_area, 'blue', 'Queue Area'),
|
|
(self.trigger_area, 'green', 'Trigger Area'),
|
|
(self.crosswalk_area, 'red', 'Crosswalk Area'),
|
|
(self.grass_area, 'lightblue', 'Grass Area')
|
|
]:
|
|
plot_polygon(poly, ax=ax, color=color, alpha=0.5, label=label, add_points=False)
|
|
ax.legend()
|
|
ax.set_title("Simulation Geometry Setup")
|
|
ax.set_xlabel("x [m]")
|
|
ax.set_ylabel("y [m]")
|
|
ax.set_aspect('equal')
|
|
plt.show()
|
|
|
|
if __name__ == "__main__":
|
|
# Run the simulation with the 'False' geometry setup.
|
|
sim = CrosswalkSimulation(current_setup=False, sim_time_step=0.05)
|
|
sim.plot_final_setup() # Optional: view geometry before running
|
|
sim.run(max_iterations=20000)
|
|
|
|
==========================
|
|
(example 3)
|
|
==========================
|
|
"""
|
|
main_simulation.py
|
|
Main integration file for crosswalk simulation with JuPedSim
|
|
"""
|
|
|
|
import sys
|
|
import pathlib
|
|
from typing import Dict, Tuple, List
|
|
import jupedsim as jps
|
|
from shapely import Polygon, Point
|
|
import numpy as np
|
|
from dataclasses import dataclass
|
|
import matplotlib.pyplot as plt
|
|
|
|
# Import your existing modules
|
|
from geometry_setup import GeometrySetup
|
|
from spawning import SpawnManager
|
|
from agents.agent_setup import run_AgentSetup
|
|
from crosswalk_setup import CrosswalkController, CrosswalkConfig, CrosswalkStatus
|
|
|
|
@dataclass
|
|
class SimulationConfig:
|
|
"""Configuration for the entire simulation"""
|
|
time_step: float = 0.01 # Simulation time step in seconds
|
|
max_iterations: int = 5000 # Maximum simulation iterations
|
|
output_file: str = "crosswalk_simulation.sqlite"
|
|
current_setup: bool = True # Use current or alternative geometry
|
|
model_type: str = "collision_free" # Options: "collision_free", "anticipation"
|
|
|
|
# Crosswalk parameters
|
|
trigger_distance: float = 1.0
|
|
activation_time: float = 10.0
|
|
cooldown_time: float = 5.0
|
|
min_agents_activation: int = 1
|
|
activation_delay: float = 2.0
|
|
|
|
class CrosswalkSimulation:
|
|
"""Main simulation class integrating all components"""
|
|
|
|
def __init__(self, config: SimulationConfig):
|
|
self.config = config
|
|
self.simulation = None
|
|
self.geometry = None
|
|
self.spawn_manager = None
|
|
self.crosswalk_controller = None
|
|
self.agents_data = []
|
|
self.journey_id = None
|
|
self.exit_id = None
|
|
self.queue_stage_id = None
|
|
|
|
# Statistics
|
|
self.iteration_count = 0
|
|
self.active_agents = {}
|
|
|
|
def setup(self):
|
|
"""Setup all simulation components"""
|
|
print("Setting up simulation components...")
|
|
|
|
# 1. Setup geometry
|
|
geo_setup = GeometrySetup(self.config.current_setup)
|
|
self.geometry = self._create_geometry_polygon(geo_setup)
|
|
|
|
# 2. Setup crosswalk controller
|
|
crosswalk_config = CrosswalkConfig(
|
|
trigger_dist=self.config.trigger_distance,
|
|
activation_time=self.config.activation_time,
|
|
cooldown_time=self.config.cooldown_time,
|
|
min_agents_activation=self.config.min_agents_activation,
|
|
activation_delay=self.config.activation_delay,
|
|
crosswalk_area=geo_setup.crosswalk(),
|
|
trigger_area=geo_setup.trigger()
|
|
)
|
|
self.crosswalk_controller = CrosswalkController(
|
|
crosswalk_config,
|
|
self.config.current_setup
|
|
)
|
|
|
|
# 3. Setup spawn manager
|
|
spawn_area = geo_setup.spawn()
|
|
self.spawn_manager = SpawnManager(spawn_area, min_spacing=0.55)
|
|
self.spawn_manager.generate_coords()
|
|
|
|
# 4. Get agent data
|
|
self.agents_data, pop_list = run_AgentSetup()
|
|
print(f"Created {len(self.agents_data)} agents")
|
|
|
|
# 5. Setup JuPedSim simulation
|
|
self._setup_jupedsim_simulation()
|
|
|
|
print("Setup complete!")
|
|
|
|
def _create_geometry_polygon(self, geo_setup: GeometrySetup) -> Polygon:
|
|
"""Create a single walkable area polygon from all geometry components"""
|
|
# Combine all non-walkable areas (obstacles) as holes
|
|
walkable_exterior = Polygon([
|
|
(0, 0),
|
|
(30, 0),
|
|
(30, 20),
|
|
(0, 20)
|
|
])
|
|
|
|
# Define obstacles (non-walkable areas)
|
|
obstacles = [
|
|
geo_setup.grass(), # Grass area is non-walkable
|
|
]
|
|
|
|
return walkable_exterior
|
|
|
|
def _setup_jupedsim_simulation(self):
|
|
"""Setup JuPedSim simulation object with chosen model[citation:1]"""
|
|
|
|
# Choose pedestrian model[citation:1]
|
|
if self.config.model_type == "collision_free":
|
|
model = jps.CollisionFreeSpeedModel()
|
|
elif self.config.model_type == "anticipation":
|
|
model = jps.AnticipationVelocityModel()
|
|
else:
|
|
raise ValueError(f"Unknown model type: {self.config.model_type}")
|
|
|
|
# Create simulation object[citation:6]
|
|
self.simulation = jps.Simulation(
|
|
model=model,
|
|
geometry=self.geometry,
|
|
trajectory_writer=jps.SqliteTrajectoryWriter(
|
|
output_file=pathlib.Path(self.config.output_file)
|
|
),
|
|
)
|
|
|
|
# Setup journey with stages[citation:3]
|
|
self._setup_journey()
|
|
|
|
def _setup_journey(self):
|
|
"""Setup agent journey with stages and transitions[citation:3]"""
|
|
|
|
# Create exit stage (across the road)
|
|
exit_polygon = Polygon([
|
|
(4, 17.761),
|
|
(0, 17.761),
|
|
(0, 20),
|
|
(4, 20)
|
|
])
|
|
self.exit_id = self.simulation.add_exit_stage(exit_polygon)
|
|
|
|
# Create waiting positions before crosswalk (queue area)
|
|
queue_positions = self._generate_queue_positions()
|
|
self.queue_stage_id = self.simulation.add_queue_stage(queue_positions)
|
|
|
|
# Create waypoint at crosswalk entrance
|
|
crosswalk_entrance = (15.0, 7.0) # Near trigger area
|
|
waypoint_dist = 0.5
|
|
waypoint_id = self.simulation.add_waypoint_stage(crosswalk_entrance, waypoint_dist)
|
|
|
|
# Create journey with stages[citation:3]
|
|
journey = jps.JourneyDescription([
|
|
waypoint_id, # First go to crosswalk entrance
|
|
self.queue_stage_id, # Wait in queue
|
|
self.exit_id # Finally exit
|
|
])
|
|
|
|
# Setup transitions between stages[citation:3]
|
|
journey.set_transition_for_stage(
|
|
waypoint_id,
|
|
jps.Transition.create_fixed_transition(self.queue_stage_id)
|
|
)
|
|
|
|
# Transition from queue to exit only when crosswalk is active
|
|
# We'll control this dynamically in the simulation loop
|
|
journey.set_transition_for_stage(
|
|
self.queue_stage_id,
|
|
jps.Transition.create_fixed_transition(self.exit_id)
|
|
)
|
|
|
|
self.journey_id = self.simulation.add_journey(journey)
|
|
|
|
def _generate_queue_positions(self) -> List[Tuple[float, float]]:
|
|
"""Generate waiting positions in the queue area"""
|
|
queue_positions = []
|
|
# Create a grid of waiting positions in the queue area
|
|
for i in range(5): # 5 rows
|
|
for j in range(3): # 3 columns
|
|
x = 18.0 + j * 0.6
|
|
y = 6.0 + i * 0.6
|
|
queue_positions.append((x, y))
|
|
return queue_positions
|
|
|
|
def run(self):
|
|
"""Run the main simulation loop"""
|
|
print("Starting simulation...")
|
|
|
|
# Spawn initial agents
|
|
self._spawn_initial_agents()
|
|
|
|
# Get queue object for controlling agent release[citation:2]
|
|
queue = self.simulation.get_stage(self.queue_stage_id)
|
|
|
|
# Main simulation loop[citation:6]
|
|
while (self.simulation.agent_count() > 0 and
|
|
self.iteration_count < self.config.max_iterations):
|
|
|
|
# Get current agent positions
|
|
agent_positions = self._get_agent_positions()
|
|
|
|
# Update crosswalk controller
|
|
crosswalk_state = self.crosswalk_controller.update(
|
|
agent_positions,
|
|
self.config.time_step,
|
|
current_time=self.iteration_count * self.config.time_step
|
|
)
|
|
|
|
# Control queue based on crosswalk state[citation:2]
|
|
if crosswalk_state.is_active:
|
|
# Release agents from queue when crosswalk is active
|
|
agents_in_queue = self._count_agents_in_stage(self.queue_stage_id)
|
|
if agents_in_queue > 0:
|
|
# Release all agents that can cross
|
|
queue.pop(agents_in_queue)
|
|
|
|
# Spawn new agents if needed
|
|
self._spawn_new_agents()
|
|
|
|
# Run simulation iteration
|
|
self.simulation.iterate()
|
|
self.iteration_count += 1
|
|
|
|
# Print progress
|
|
if self.iteration_count % 100 == 0:
|
|
self._print_progress(crosswalk_state)
|
|
|
|
print(f"Simulation completed after {self.iteration_count} iterations")
|
|
print(f"Agents remaining: {self.simulation.agent_count()}")
|
|
|
|
# Print final statistics
|
|
stats = self.crosswalk_controller.get_statistics()
|
|
print("\nCrosswalk Statistics:")
|
|
for key, value in stats.items():
|
|
print(f" {key}: {value}")
|
|
|
|
def _spawn_initial_agents(self):
|
|
"""Spawn initial batch of agents"""
|
|
num_initial_agents = min(20, len(self.agents_data))
|
|
print(f"Spawning {num_initial_agents} initial agents...")
|
|
|
|
for _ in range(num_initial_agents):
|
|
agent = self.spawn_manager.spawn_agent(self.agents_data)
|
|
if agent:
|
|
self._add_agent_to_simulation(agent)
|
|
|
|
def _spawn_new_agents(self):
|
|
"""Spawn new agents during simulation if needed"""
|
|
# Spawn new agent every 10 iterations if we have capacity
|
|
if self.iteration_count % 10 == 0:
|
|
agent = self.spawn_manager.spawn_agent(self.agents_data)
|
|
if agent:
|
|
self._add_agent_to_simulation(agent)
|
|
|
|
def _add_agent_to_simulation(self, agent):
|
|
"""Add an agent to the JuPedSim simulation[citation:6]"""
|
|
|
|
# Get spawn position
|
|
spawn_pos = self.spawn_manager.get_agent_pos(agent.id)
|
|
|
|
# Choose appropriate agent parameters based on model[citation:1]
|
|
if self.config.model_type == "collision_free":
|
|
agent_params = jps.CollisionFreeSpeedModelAgentParameters(
|
|
position=spawn_pos,
|
|
desired_speed=agent.speed,
|
|
radius=agent.radius,
|
|
journey_id=self.journey_id,
|
|
stage_id=self.queue_stage_id # Start in queue stage
|
|
)
|
|
else: # anticipation model
|
|
agent_params = jps.AnticipationVelocityModelAgentParameters(
|
|
position=spawn_pos,
|
|
desired_speed=agent.speed,
|
|
radius=agent.radius,
|
|
journey_id=self.journey_id,
|
|
stage_id=self.queue_stage_id
|
|
)
|
|
|
|
# Add agent to simulation
|
|
agent_id = self.simulation.add_agent(agent_params)
|
|
self.active_agents[agent_id] = agent.id
|
|
|
|
print(f"Spawned agent {agent.id} at position {spawn_pos}")
|
|
|
|
def _get_agent_positions(self) -> Dict[int, Tuple[float, float]]:
|
|
"""Get current positions of all agents in simulation"""
|
|
positions = {}
|
|
for agent in self.simulation.agents():
|
|
positions[agent.id] = agent.position
|
|
return positions
|
|
|
|
def _count_agents_in_stage(self, stage_id: int) -> int:
|
|
"""Count agents currently targeting a specific stage"""
|
|
count = 0
|
|
for agent in self.simulation.agents():
|
|
if agent.stage_id == stage_id:
|
|
count += 1
|
|
return count
|
|
|
|
def _print_progress(self, crosswalk_state):
|
|
"""Print simulation progress"""
|
|
print(f"Iteration: {self.iteration_count}, "
|
|
f"Agents: {self.simulation.agent_count()}, "
|
|
f"Crosswalk: {crosswalk_state.status.value}, "
|
|
f"Active time remaining: {crosswalk_state.time_remaining:.1f}s")
|
|
|
|
def visualize(self):
|
|
"""Visualize simulation results"""
|
|
try:
|
|
import pedpy
|
|
from jupedsim.internal.notebook_utils import read_sqlite_file
|
|
|
|
# Read trajectory data
|
|
trajectory_data, walkable_area = read_sqlite_file(self.config.output_file)
|
|
|
|
# Create visualization
|
|
fig, axes = plt.subplots(1, 2, figsize=(15, 6))
|
|
|
|
# Plot trajectories
|
|
pedpy.plot_trajectories(
|
|
traj=trajectory_data,
|
|
axes=axes[0],
|
|
walkable_area=walkable_area
|
|
)
|
|
axes[0].set_title("Agent Trajectories")
|
|
|
|
# Plot density heatmap
|
|
density = pedpy.compute_classic_density(
|
|
traj_data=trajectory_data,
|
|
measurement_area=walkable_area.polygon,
|
|
frame_step=10
|
|
)
|
|
pedpy.plot_density(
|
|
density=density,
|
|
axes=axes[1],
|
|
walkable_area=walkable_area
|
|
)
|
|
axes[1].set_title("Density Heatmap")
|
|
|
|
plt.tight_layout()
|
|
plt.show()
|
|
|
|
except ImportError:
|
|
print("Visualization requires pedpy and matplotlib. Install with: pip install pedpy matplotlib")
|
|
|
|
def main():
|
|
"""Main entry point for the simulation"""
|
|
|
|
# Configuration
|
|
config = SimulationConfig(
|
|
current_setup=True,
|
|
time_step=0.01,
|
|
max_iterations=3000,
|
|
output_file="crosswalk_simulation_results.sqlite",
|
|
trigger_distance=1.5,
|
|
activation_time=15.0,
|
|
cooldown_time=3.0,
|
|
min_agents_activation=1,
|
|
activation_delay=1.0
|
|
)
|
|
|
|
# Create and run simulation
|
|
simulation = CrosswalkSimulation(config)
|
|
simulation.setup()
|
|
simulation.run()
|
|
|
|
# Optional: Visualize results
|
|
# simulation.visualize()
|
|
|
|
print("\nSimulation completed successfully!")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|
|
|
|
|