Files
SRS_evac_sim/archive/prompt_full-run-sim2.txt
Varyngoth 676659e5b9 First
2026-01-28 13:31:49 -04:00

691 lines
24 KiB
Plaintext

The text file includes all the files which makeup my simulation configuration. could you please run through them and fix any potential errors, delete any unused or unnecessary variables?
===================
(geometry_setup.py)
===================
import sys
import matplotlib
matplotlib.use('QtAgg')
import matplotlib.pyplot as plt
from matplotlib.figure import Figure
from matplotlib.axes import Axes
from shapely import Polygon
from shapely.plotting import plot_polygon
from typing import Tuple
class GeometrySetup:
def __init__(self,current_setup:bool):
self.current_setup = current_setup
self.spawn_area = self.spawn()
self.queue_area = self.queue()
self.grass_area = self.grass()
self.crosswalk_area = self.crosswalk()
self.entry_area = self.entry_polygon()
self.exit_area = self.exit_polygon()
def spawn(self)->Polygon:
spawn_area = Polygon([
(25.0,16.823),
(25.0,4.569),
(26.163,4.569),
(28.214,5.07),
(28.214,17.761),
(25.787,17.761),
])
return spawn_area
def queue(self)->Polygon:
if self.current_setup is True:
queue_area = Polygon([
(18.767,9.395),
(16.924,9.395),
(15.214,9.395), #prev trigger area
(15.214,0), # ||
(16.924,0), # ||
(16.924,4.569),
(25.0,4.569),
(25.0,16.823)
])
else:
queue_area = Polygon([
(19.531,10.306),
(15.214,10.306),
(13.88,10.306), #prev trigger area
(12.98,9.896), # ||
(12.98,0), # ||
(15.214,0), #
(15.214,0),
(16.924,0),
(16.924,4.569),
(25.0,4.569),
(25.0,16.823)
])
return queue_area
def grass(self)->Polygon:
if self.current_setup is True:
grass_area = Polygon([
(4,0),
(0,0),
(0,17.761),
(4,17.761)
])
else:
grass_area = Polygon([
(6.23,0),
(0,0),
(0,17.761),
(4,17.761),
(4,10.306),
(5.33,10.306),
(6.23,9.896)
])
return grass_area
def crosswalk(self)->Polygon:
if self.current_setup is True:
crosswalk_area = Polygon([
(4,6.068),
(4,7.896),
(4,7.896),
(15.214,7.896),
(15.214,6.068),
(4,6.068)
])
else:
crosswalk_area = Polygon([
(6.23,4.982),
(6.23,8.982),
(12.98,8.982),
(12.98,4.982)
])
return crosswalk_area
def entry_polygon(self)->Polygon:
if self.current_setup is True:
entry_area = Polygon([ # x: 2.9m, y: 3.428m
(15.314,5.268), # dx: 0.1m, dy: 0.8m
(15.314,8.696),
(18.214,8.696),
(18.214,5.268)
])
else:
entry_area = Polygon([ # x: 2.9m, y: 5.6m
(15.98,9.782),
(15.98,4.182),
(13.08,4.182),
(13.08,9.782)
])
return entry_area
def exit_polygon(self)->Polygon:
if self.current_setup is True:
exit_area = Polygon([ # x: 2.9m, y: 3.428m
(1,5.268),
(1,8.696),
(3.9,8.696),
(3.9,5.268)
])
else:
exit_area = Polygon([ # x: 2.9m, y: 5.6m
(3.23,4.182),
(3.23,9.782),
(6.13,9.782),
(6.13,4.182)
])
return exit_area
def plot_all(self)->Tuple[Figure,Axes]:
plot_polygon(self.spawn_area,color="green",add_points=False)
plot_polygon(self.queue_area,color="blue",add_points=False)
plot_polygon(self.grass_area,color="blue",add_points=False)
plot_polygon(self.crosswalk_area,color="red",add_points=False)
plot_polygon(self.entry_area,color="black",add_points=False)
plot_polygon(self.exit_area,color="black",add_points=False)
return
if __name__ == "__main__":
from PyQt6 import QtWidgets
app = QtWidgets.QApplication(sys.argv)
GeometrySetup(False).plot_all()
plt.show(block=False)
sys.exit(app.exec())
===================
(agent_setup.py)
===================
from agent_data import grade_data
from dataclasses import dataclass
import numpy as np
from typing import List
@dataclass
class AgentSetup:
id:int
grade:str
speed:float
radius:float
def AgentConfig()->List[List[AgentSetup],List[int]]:
agent_id = 1
pop_list = []
all_agents = []
for idx,key in enumerate(grade_data.keys()):
grade = grade_data[key]
pop_list.append(grade["Pop Current"])
for i in range(pop_list[idx]):
rng = np.random.default_rng()
agent = AgentSetup(
id=agent_id,
grade=key,
speed=rng.normal(
grade["Speed Mean"],
grade["Speed Std Dev"]
),
radius=grade["Radius"]
)
all_agents.append(agent)
agent_id +=1
return all_agents, pop_list
===================
(crosswalk_setup.py)
===================
from dataclasses import dataclass,field
from enum import Enum
from typing import List,Dict,Tuple,Set
from shapely import Polygon,Point
from geometry_setup import GeometrySetup
import shapely
import numpy as np
class CrosswalkStatus(Enum):
INACTIVE = "inactive"
ACTIVE = "active"
COOLDOWN = "cooldown"
@dataclass(frozen=True)
class CrosswalkConfig:
trigger_dist:float
activation_time:float
cooldown_time:float
min_agents_activation:int
activation_delay:float
crosswalk_area:Polygon
trigger_area:Polygon
@dataclass
class CrosswalkState:
status:CrosswalkStatus
is_active:bool
time_active:float
time_remaining:float
agents_in_trigger:int
agents_waiting:int
activation_count:int
class CrosswalkController:
def __init__(self,config:CrosswalkConfig,current_setup:bool):
self.config = config
self.crosswalk_area = config.crosswalk_area
self.trigger_area = config.trigger_area
self.status = CrosswalkStatus.INACTIVE
self.state_start_time = 0.0
self.current_time = 0.0
self.activation_count = 0
self.agents_in_trigger: Set[int] = set()
self.agents_waiting: Set[int] = set()
self.total_active_time = 0.0
self.agents_served = 0
def update(
self,
agent_pos:Dict[int,Tuple[float,float]],
time_step:float,
current_time:float=None
)->CrosswalkState:
if current_time is not None:
self.current_time = current_time
else:
self.current_time +=time_step
self.update_agents_in_trigger(agent_pos)
self.update_state(time_step)
self.update_statistics(time_step)
return self.get_state()
def update_agents_in_trigger(
self,
agent_pos:Dict[int,Tuple[float,float]]
)->None:
self.agents_in_trigger.clear()
if not agent_pos:
return
agent_ids = list(agent_pos.keys())
positions = np.array(list(agent_pos.values()),dtype=np.float32)
in_trigger = self.points_in_area(positions,self.trigger_area)
for i, agent_id in enumerate(agent_ids):
if in_trigger[i]:
self.agents_in_trigger.add(agent_id)
@staticmethod
def points_in_area(points:np.ndarray,polygon:Polygon)->np.ndarray:
if len(points) ==0:
return np.array([],dtype=bool)
x,y = points[:,0],points[:,1]
vertices = shapely.get_coordinates(polygon)
n = len(vertices)
inside = np.zeros(len(points),dtype=bool)
j = n-1
for i in range(n):
xi, yi = vertices[i]
xj, yj = vertices[j]
mask = ((yi>y) != (yj>y)) & (x<(xj-xi)*(y-yi)/(yj-yi)+xi)
inside ^= mask
j = i
return inside
def update_state(self,time_step:float)->None:
elapsed = self.current_time - self.state_start_time
if self.status == CrosswalkStatus.ACTIVE:
if elapsed >= self.config.activation_time:
self.deactivate()
elif self.status == CrosswalkStatus.COOLDOWN:
if elapsed >= self.config.cooldown_time:
self.status = CrosswalkStatus.INACTIVE
self.state_start_time = self.current_time
elif self.status ==CrosswalkStatus.INACTIVE:
if (len(self.agents_in_trigger)>=self.config.min_agents_activation and \
elapsed >= self.config.activation_delay):
self.activate()
def activate(self)->None:
self.status = CrosswalkStatus.ACTIVE
self.state_start_time = self.current_time
self.activation_count +=1
self.agents_served += len(self.agents_waiting)
self.agents_waiting.clear()
def deactivate(self)->None:
self.status = CrosswalkStatus.COOLDOWN
self.state_start_time = self.current_time
def update_statistics(self,time_step:float)->None:
if self.status == CrosswalkStatus.ACTIVE:
self.total_active_time += time_step
def get_state(self)->CrosswalkState:
elapsed = self.current_time-self.state_start_time
if self.status == CrosswalkStatus.ACTIVE:
time_remaining = max(0.0,self.config.activation_time-elapsed)
elif self.status == CrosswalkStatus.COOLDOWN:
time_remaining = max(0.0,self.config.cooldown_duration-elapsed)
else:
time_remaining = 0.0
return CrosswalkState(
status=self.status,
is_active=(self.status == CrosswalkStatus.ACTIVE),
time_active=elapsed if self.status == CrosswalkStatus.ACTIVE else 0.0,
time_remaining=time_remaining,
agents_in_trigger=len(self.agents_in_trigger),
agents_waiting=len(self.agents_waiting),
activation_count = self.activation_count
)
def can_agent_cross(self,agent_id:int)->bool:
if self.status == CrosswalkStatus.ACTIVE:
return True
else:
if agent_id in self.agents_in_trigger:
self.agents_waiting.add(agent_id)
return False
@property
def is_active(self)->bool:
return self.status == CrosswalkStatus.ACTIVE
def get_statistics(self)->Dict[str,float]:
return {
"total_activations":self.activation_count,
"total_active_time":self.total_active_time,
"agents_served":self.agents_served,
"current_agents_in_trigger":len(self.agents_in_trigger),
"current_agents_waiting":len(self.agents_waiting)
}
===================
(spawning.py)
===================
from shapely.geometry import Polygon,Point
from typing import Tuple, List, Dict, Set
from scipy.spatial import cKDTree
import sys
import math
import numpy as np
import config
sys.path.insert(0,str(config.AGENTS_DIR))
from agents.agent_setup import AgentSetup
class SpawnManager:
def __init__(self,spawn_area:Polygon,min_spacing:float=0.55):
"""
self.spawn_area: geometry where agents may spawn
self.min_spacing: minimum spacing for spawn_points
self.spawn_coords: all spawn points available
self.filled_coords: spawn points currently filled by agents
self.spawned_agents: all agents already spawned in sim
self.agent_pos: connects agent_id to spawn_index
self.rng: random number generator object
"""
self.spawn_area = spawn_area
self.min_spacing = min_spacing
self.spawn_coords: np.ndarray = np.array([])
self.filled_coords: np.ndarray = np.array([])
self.spawned_agents: set = set()
self.agent_pos:dict={}
self.rng = np.random.default_rng()
def generate_coords(self,max_samples:int=1000)->None:
min_x,min_y,max_x,max_y = self.spawn_area.bounds
points = []
while len(points) ==0:
x = self.rng.uniform(min_x,max_x)
y = self.rng.uniform(min_y,max_y)
if self.spawn_area.contains(Point(x,y)):
points.append([x,y])
for _ in range(max_samples):
idx = self.rng.integers(0,len(points),endpoint=True)
base = points[idx]
for _ in range(25):
angle = self.rng.uniform(0,2*np.pi)
radius = self.rng.uniform(self.min_spacing,2*self.min_spacing)
x = base[0]+radius*np.cos(angle)
y = base[1]+radius*np.sin(angle)
if not self.spawn_area.contains(Point(x,y)):
continue
if len(points)>0:
tree = cKDTree(points)
distance,_ = tree.query([[x,y]],k=1)
if distance[0]<self.min_spacing:
continue
points.append([x,y])
break
self.spawn_coords = np.array(points)
self.filled_coords = np.zeros(len(points),dtype=bool)
#return points
def get_coords(self)->Tuple[float,float]|None:
'''
points = [
pt for pt in self.generate_coords if pt \
not in self.filled_coords.values()
]
return self.rng.choice(points) if points else None
'''
free_idx = np.where(~self.filled_coords)[0]
if len(free_idx) == 0:
return None
idx = self.rng.choice(free_idx)
return tuple(self.spawn_coords[idx])
def spawn_agent(self,all_agents:List[AgentSetup])->AgentSetup|None:
if len(self.spawned_agents) >= len(all_agents):
return None
spawn_point = self.get_coords()
if not spawn_point:
return None
free_agents = [
agent for agent in all_agents \
if agent.id not in self.spawned_agents
]
if not free_agents:
return None
agent = self.rng.choice(free_agents)
self.spawned_agents.add(agent.id)
distances = np.linalg.norm(self.spawn_coords-spawn_point,axis=1)
spawn_idx = np.argmin(distances)
self.filled_coords[spawn_idx] = True
self.agent_pos[agent.id] = spawn_idx
return agent
def unfill_coords(self,agent_id:int)->None:
if agent_id in self.agent_pos:
spawn_idx = self.agent_pos[agent_id]
self.filled_coords[spawn_idx] = False
del self.agent_pos[agent_id]
self.spawned_agents.discard(agent_id)
def get_agent_pos(self,agent_id:int)->Tuple[float,float]:
if agent_id in self.agent_pos:
spawn_idx = self.agent_pos[agent_id]
return tuple(self.spawn_coords[spawn_idx])
return (0,0)
def check_spawn_complete(self,all_agents:List[AgentSetup])->bool:
return (len(self.spawned_agents)<len(all_agents) and\
np.sum(~self.filled_coords) > 0)
===================
(simulation.py)
===================
from typing import Tuple,List,Dict
import sys
import matplotlib
matplotlib.use('QtAgg')
import matplotlib.pyplot as plt
from shapely import Polygon,Point,GeometryCollection,get_coordinates
from shapely.plotting import plot_polygon
import jupedsim as jps
import numpy as np
import pathlib
from geometry_setup import GeometrySetup
from spawning import SpawnManager
from crosswalk_setup import CrosswalkStatus,CrosswalkController,CrosswalkState,CrosswalkConfig
from agents.agent_setup import AgentConfig
from dataclasses import dataclass
@dataclass
class SimulationSetup:
simulation_time:float=200.0
time_step:float=0.01
max_iterations:int=50000
trajectory_file:str="crosswalk_sim.sqlite"
current_setup:bool=True
trigger_dist:float=1.0
activation_time:float=30.0
cooldown_time:float=10.0
min_agents_activation:int=1
activation_delay:float=2.0
class CrosswalkSimulation:
def __init__(self,config:SimulationSetup):
self.config = config
print("\nInitializing Geometry...")
self.geo = GeometrySetup(self.config.current_setup)
self.spawn_area = self.geo.spawn()
self.queue_area = self.geo.queue()
self.grass_area = self.geo.grass()
self.crosswalk_area = self.geo.crosswalk()
self.entry_area = self.geo.entry_polygon()
self.exit_area = self.geo.exit_polygon()
self.walkable_area = GeometryCollection([
self.spawn_area,
self.queue_area,
self.grass_area,
self.crosswalk_area,
]).unary_union
print("Geometry Configuration Complete!")
print("\nInitializing Spawn Manager...")
self.spawn_manager = SpawnManager(self.spawn_area,min_spacing=0.55)
self.spawn_manager.generate_coords()
print("Spawn Manager Setup Complete!")
print("\nInitializing Crosswalk Controller...")
self.crosswalk_controller = CrosswalkController(
CrosswalkConfig(
trigger_dist=self.config.trigger_dist,
activation_time=self.config.activation_time,
cooldown_time=self.config.cooldown_time,
min_agents_activation=self.config.min_agents_activation,
activation_delay=self.config.activation_delay,
crosswalk_area=self.crosswalk_area,
trigger_area=self.entry_area
),
self.config.current_setup
)
print("Crosswalk Controller Setup Complete!")
print("\nInitializing Agent Setup...")
self.all_agents, pop_list = AgentConfig()
self.total_agents = len(self.all_agents)
print(f"Created {self.total_agents} Agents...")
print("Agent Setup Complete!")
print("\nInitializing JuPedSim Model...")
self.trajectory_file = self.config.trajectory_file
self.simulation = jps.Simulation(
model=jps.CollisionFreeSpeedModel(),
geometry=self.walkable_area,
trajectory_writer=jps.SqliteTrajectoryWriter(
output_file=pathlib.Path(self.trajectory_file)
),
dt=self.config.time_step
)
self.journey_setup()
self.jps_agent_ids = {}
# unsure whether these are necessary
#
#self.agent_targets = {}
#self.active_agents = {}
self.iteration_count = 0
self.current_time = 0.0
print(f"\nJuPedSim Model Initialized with:")
print(f"\tTime Step: {self.config.time_step} s")
print(f"\tMax Iterations: {self.config.max_iterations}")
print(f"\tCrosswalk Activation Length: {self.config.activation_time} s")
print(f"\tCrosswalk Cooldown Length: {self.config.cooldown_time} s")
def journey_setup(self):
self.queue_waiting_coords = self.waiting_coords(
self.queue_area,self.spawn_manager.min_spacing
)
self.queue_stage_id = self.simulation.add_waiting_set_stage(
self.queue_waiting_coords
)
self.queue_stage = self.simulation.get_stage(self.queue_stage_id)
self.queue_stage.state = jps.WaitingSetState.ACTIVE
self.crosswalk_entry_id = self.simulation.add_waypoint_stage(
self.entry_area,
self.spawn_manager.min_spacing
)
self.crosswalk_exit_id = self.simulation.add_waypoint_stage(
self.exit_area,
self.spawn_manager.min_spacing
)
self.journey = jps.JourneyDescription([
self.queue_stage_id,
self.crosswalk_entry_id,
self.crosswalk_exit_id,
])
self.journey.set_transition_for_stage(
self.queue_stage_id,
jps.Transition.create_fixed_transition(
self.crosswalk_entry_id
))
self.journey.set_transition_for_stage(
self.crosswalk_entry_id,
jps.Transition.create_fixed_transition(
self.crosswalk_exit_id
))
self.journey_id = self.simulation.add_journey(
self.journey)
def waiting_coords(self,area:Polygon,spacing:float)->List[Tuple[float,float]]:
min_x,min_y,max_x,max_y = area.bounds
points = []
x = (min_x+spacing)/2
while x<max_x:
y = (min_y+spacing)/2
while y < max_y:
if area.contains(Point(x, y)):
points.append((x,y))
y += spacing
x += spacing
return points if points else [(min_x+0.5, min_y+0.5)]
def run(self,max_iterations:int):
print("\n\nStarting Simulation")
print("Target Agents: {len(self.all_agents)}")
while self.iteration_count < max_iterations:
if self.spawn_manager.check_spawn_complete(self.all_agents):
new_agent = self.spawn_manager.spawn_agent(self.all_agents)
if new_agent:
jps_agent_params = jps.CollisionFreeSpeedModelAgentParameters(
journey_id=self.journey_id,
stage_id=self.queue_stage_id,
position=self.spawn_manager.get_agent_pos(new_agent.id),
v0=new_agent.speed,
radius=new_agent.radius
)
jps_agent_id = self.simulation.add_agent(jps_agent_params)
self.jps_agent_ids[new_agent.id] = jps_agent_id
# unsure whether this is necessary
#
# self.agent_targets[new_agent.id] = None
print(f"Iter {self.iteration_count:05d}: Spawned agent {new_agent.id} (JPS-ID: {jps_agent_id})")
current_agent_pos = {}
for spawn_id,jps_id in self.jps_agent_ids.items():
agent = self.simulation.agent(jps_id)
current_agent_pos[spawn_id] = (agent.position[0],agent.position[1])
crosswalk_state = self.crosswalk_controller.update(
current_agent_pos,self.config.time_step,self.current_time
)
if crosswalk_state.is_active:
if self.queue_stage.stage != jps.WaitingSetState.INACTIVE:
self.queue_stage.stage = jps.WaitingSetState.INACTIVE
print(f"Iter {self.iteration_count:05d}: Crosswalk ACTIVATED, releasing queue.")
else:
if self.queue_stage.stage != jps.WaitingSetState.ACTIVE:
self.queue_stage.stage = jps.WaitingSetState.INACTIVE
print(f"Iter {self.iteration_count:05d}: Crosswalk INACTIVE, holding agents.")
self.simulation.iterate()
self.current_time += self.config.time_step
self.iteration_count +=1
agents_to_remove = []
for spawn_id, jps_id in list(self.jps_agent_ids.items()):
try:
_ = self.simulation.agent(jps_id)
except RuntimeError:
agents_to_remove.append(spawn_id)
self.spawn_manager.unfill_coords(spawn_id)
for spawn_id in agents_to_remove:
del self.jps_agent_ids[spawn_id]
print(f"Iter {self.iteration_count:05d}: Agent {spawn_id} reached exit and was removed.")
if (len(self.jps_agent_ids) ==0 and not self.spawn_manager.check_spawn_complete(self.all_agents)):
print(f"\nSimulation Completed at Iteration: {self.iteration_count}")
print(f"Total Simulation Time: {self.current_time:.2f} seconds.")
break
print("\n" + "="*50)
print("Simulation finished.")
stats = self.crosswalk_controller.get_statistics()
print(f"Crosswalk was activated {stats['total_activations']} times.")
print(f"Agents served by crosswalk: {stats['agents_served']}")
print(f"Trajectory saved to: {self.trajectory_file.absolute()}")