Files
SRS_evac_sim/_source_/simulation.py
Varyngoth 676659e5b9 First
2026-01-28 13:31:49 -04:00

253 lines
11 KiB
Python

from typing import Tuple,List,Dict
import sys
from datetime import datetime as dt
import matplotlib
matplotlib.use('QtAgg')
import matplotlib.pyplot as plt
from shapely import Polygon,Point,GeometryCollection,get_coordinates
from shapely.plotting import plot_polygon
import jupedsim as jps
import numpy as np
import pathlib
from geometry_setup import GeometrySetup
from spawning import SpawnManager
from crosswalk_setup import CrosswalkStatus,CrosswalkController,CrosswalkState,CrosswalkConfig
from agent_setup import AgentConfig
from dataclasses import dataclass
@dataclass
class SimulationSetup:
simulation_time:float=200.0
time_step:float=0.01
max_iterations:int=50000
trajectory_file:str="crosswalk_sim.sqlite"
current_setup:bool=True
trigger_dist:float=1.0
activation_time:float=30.0
cooldown_time:float=10.0
min_agents_activation:int=1
activation_delay:float=2.0
class CrosswalkSimulation:
def __init__(self,config:SimulationSetup):
self.config = config
self.RED = "\033[1;31m"
self.GREEN = "\033[1;32m"
self.YELLOW = "\033[1;33m"
self.BLUE = "\033[1;34m"
self.PURPLE = "\033[1;35m"
self.CYAN = "\033[1;36m"
self.RESET = "\033[0m"
print(f"{self.YELLOW}\nInitializing Geometry...")
self.geo = GeometrySetup(self.config.current_setup)
self.spawn_area = self.geo.spawn()
self.queue_area = self.geo.queue()
self.grass_area = self.geo.grass()
self.crosswalk_area = self.geo.crosswalk()
self.entry_area = self.geo.entry_polygon()
self.exit_area = self.geo.exit_polygon()
self.walkable_area = GeometryCollection([
self.spawn_area,
self.queue_area,
self.grass_area,
self.crosswalk_area,
])
print(f"{self.GREEN}Geometry Configuration Complete!")
print(f"{self.YELLOW}\nInitializing Agent Setup...")
self.all_agents, pop_count = AgentConfig()
self.total_agents = len(self.all_agents)
self.req_spacing = (max([agent.radius for agent in self.all_agents])*2.2)
print(f"{self.GREEN}Agent Setup Complete!")
print(f"{self.RESET}Generated {self.total_agents} Agents")
print(f"Population Breakdown:\n"+"".join(f"\t{k}: {v}\n" for k,v in pop_count.items()))
print(f"{self.YELLOW}Initializing Spawn Manager...")
self.spawn_manager = SpawnManager(self.spawn_area,min_spacing=self.req_spacing)
self.all_coords = self.spawn_manager.generate_coords()
print(f"{self.GREEN}Spawn Manager Setup Complete!")
print(f"{self.RESET}Generated {len(self.all_coords)} Spawn Coordinates")
print(f"{self.YELLOW}\nInitializing Crosswalk Controller...")
self.crosswalk_controller = CrosswalkController(
CrosswalkConfig(
trigger_dist=self.config.trigger_dist,
activation_time=self.config.activation_time,
cooldown_time=self.config.cooldown_time,
min_agents_activation=self.config.min_agents_activation,
activation_delay=self.config.activation_delay,
crosswalk_area=self.crosswalk_area,
trigger_area=self.entry_area
),
self.config.current_setup
)
print(f"{self.GREEN}Crosswalk Controller Setup Complete!")
print(f"{self.YELLOW}\nInitializing JuPedSim Model...")
self.trajectory_file = self.config.trajectory_file
self.simulation = jps.Simulation(
model=jps.CollisionFreeSpeedModel(),
geometry=self.walkable_area,
trajectory_writer=jps.SqliteTrajectoryWriter(
output_file=pathlib.Path(self.trajectory_file)
),
dt=self.config.time_step
)
self.journey_setup()
self.jps_agent_ids:Dict[int,int] = {}
self.iteration_count = 0
self.current_time = 0.0
print(f"{self.GREEN}JuPedSim Model Initialized{self.RESET}")
print(f"\tTime Step: {self.config.time_step}s")
print(f"\tMax Iterations: {self.config.max_iterations}")
print(f"\tCrosswalk Activation Length: {self.config.activation_time}s")
print(f"\tCrosswalk Cooldown Length: {self.config.cooldown_time}s")
def journey_setup(self):
self.queue_waiting_coords = self.waiting_coords(
self.queue_area,
self.spawn_manager.min_spacing
)
self.queue_stage_id = self.simulation.add_waiting_set_stage(
self.queue_waiting_coords
)
self.queue_stage = self.simulation.get_stage(self.queue_stage_id)
self.queue_stage.state = jps.WaitingSetState.ACTIVE
self.crosswalk_entry_id = self.simulation.add_waypoint_stage((
self.entry_area.centroid.x,
self.entry_area.centroid.y),
self.spawn_manager.min_spacing
)
self.crosswalk_exit_id = self.simulation.add_waypoint_stage((
self.exit_area.centroid.x,
self.exit_area.centroid.y),
self.spawn_manager.min_spacing
)
self.exit_stage_id = self.simulation.add_waypoint_stage((
self.grass_area.centroid.x,
self.grass_area.centroid.y),
self.spawn_manager.min_spacing
)
self.journey = jps.JourneyDescription([
self.queue_stage_id,
self.crosswalk_entry_id,
self.crosswalk_exit_id,
self.exit_stage_id
])
self.journey.set_transition_for_stage(
self.queue_stage_id,
jps.Transition.create_fixed_transition(
self.crosswalk_entry_id
))
self.journey.set_transition_for_stage(
self.crosswalk_entry_id,
jps.Transition.create_fixed_transition(
self.crosswalk_exit_id
))
self.journey.set_transition_for_stage(
self.crosswalk_exit_id,
jps.Transition.create_fixed_transition(
self.exit_stage_id
))
self.journey_id = self.simulation.add_journey(
self.journey)
def waiting_coords(self,area:Polygon,spacing:float)->List[Tuple[float,float]]:
min_x,min_y,max_x,max_y = area.bounds
points = []
x = (min_x+spacing)/2
while x<max_x:
y = (min_y+spacing)/2
while y < max_y:
if area.contains(Point(x, y)):
points.append((x,y))
y += spacing
x += spacing
return points if points else [(min_x+0.5, min_y+0.5)]
def run(self):
print(f"{self.CYAN}\nStarting Simulation")
print(f"Target Agents: {len(self.all_agents)}\n{self.RESET}")
while self.iteration_count < self.config.max_iterations:
if self.spawn_manager.check_spawn_complete(self.all_agents):
new_agent = self.spawn_manager.spawn_agent(self.all_agents)
if new_agent:
jps_agent_params = jps.CollisionFreeSpeedModelAgentParameters(
journey_id=self.journey_id,
stage_id=self.queue_stage_id,
position=self.spawn_manager.get_agent_pos(new_agent.id),
desired_speed=new_agent.speed,
radius=new_agent.radius
)
jps_agent_id = self.simulation.add_agent(jps_agent_params)
self.jps_agent_ids[new_agent.id] = jps_agent_id
#if self.iteration_count % 100 ==0:
# print(f"{self.YELLOW}[Iteration: {self.iteration_count:05d}]{self.RESET} Total Agents Spawned {len(self.jps_agent_ids)}")
print(f"{self.YELLOW}[Iteration: {self.iteration_count:05d}]{self.RESET} Total Agents Spawned {len(self.jps_agent_ids)}")
current_agent_pos = {}
for spawn_id,jps_id in self.jps_agent_ids.items():
try:
agent = self.simulation.agent(jps_id)
current_agent_pos[spawn_id] = (agent.position[0],agent.position[1])
except RuntimeError:
continue
crosswalk_state = self.crosswalk_controller.update(
current_agent_pos,
self.config.time_step,
self.current_time
)
if crosswalk_state.is_active:
if self.queue_stage.state != jps.WaitingSetState.INACTIVE:
self.queue_stage.state = jps.WaitingSetState.INACTIVE
print(f"{self.YELLOW}[Iteration: {self.iteration_count:05d}]{self.RESET} Crosswalk ACTIVATED")
else:
if self.queue_stage.state != jps.WaitingSetState.ACTIVE:
self.queue_stage.state = jps.WaitingSetState.ACTIVE
self.simulation.iterate()
self.current_time += self.config.time_step
self.iteration_count +=1
agents_to_remove = []
for spawn_id, jps_id in list(self.jps_agent_ids.items()):
try:
_ = self.simulation.agent(jps_id)
except RuntimeError:
agents_to_remove.append(spawn_id)
for spawn_id in agents_to_remove:
self.spawn_manager.unfill_coords(spawn_id)
del self.jps_agent_ids[spawn_id]
print(f"{self.YELLOW}[Iteration: {self.iteration_count:05d}]{self.RESET} JOURNEY COMPLETE: Agent {spawn_id}")
if (len(self.jps_agent_ids) ==0 and not self.spawn_manager.check_spawn_complete(self.all_agents)):
print(f"{self.GREEN}\nSimulation Completed at Iteration: {self.iteration_count}")
print(f"Total Simulation Time: {self.current_time:.2f} s")
break
print("\n")
print(f"{self.GREEN}=== SIMULATION COMPLETE ===")
stats = self.crosswalk_controller.get_statistics()
print(f"{self.YELLOW}Crosswalk Activations: {self.RESET}{stats['total_activations']}")
print(f"{self.YELLOW}Agents Served: {self.RESET}{stats['agents_served']}")
print(f"{self.PURPLE}Trajectory File:\n{self.trajectory_file}{self.RESET}\n")
return
def main():
config = SimulationSetup(
simulation_time=200.0,
time_step=0.01,
max_iterations=50000,
trajectory_file=f"_SRS_sim_{dt.now().strftime("%H%M%S")}_.sqlite",
current_setup=True,
trigger_dist=1.0,
activation_time=30.0,
cooldown_time=10.0,
min_agents_activation=1,
activation_delay=2.0,
)
sim = CrosswalkSimulation(config)
sim.run()
if __name__ == "__main__":
main()