691 lines
24 KiB
Plaintext
691 lines
24 KiB
Plaintext
The text file includes all the files which makeup my simulation configuration. could you please run through them and fix any potential errors, delete any unused or unnecessary variables?
|
|
|
|
|
|
|
|
===================
|
|
(geometry_setup.py)
|
|
===================
|
|
|
|
import sys
|
|
import matplotlib
|
|
matplotlib.use('QtAgg')
|
|
import matplotlib.pyplot as plt
|
|
from matplotlib.figure import Figure
|
|
from matplotlib.axes import Axes
|
|
from shapely import Polygon
|
|
from shapely.plotting import plot_polygon
|
|
from typing import Tuple
|
|
|
|
class GeometrySetup:
|
|
def __init__(self,current_setup:bool):
|
|
self.current_setup = current_setup
|
|
self.spawn_area = self.spawn()
|
|
self.queue_area = self.queue()
|
|
self.grass_area = self.grass()
|
|
self.crosswalk_area = self.crosswalk()
|
|
self.entry_area = self.entry_polygon()
|
|
self.exit_area = self.exit_polygon()
|
|
|
|
def spawn(self)->Polygon:
|
|
spawn_area = Polygon([
|
|
(25.0,16.823),
|
|
(25.0,4.569),
|
|
(26.163,4.569),
|
|
(28.214,5.07),
|
|
(28.214,17.761),
|
|
(25.787,17.761),
|
|
])
|
|
return spawn_area
|
|
|
|
def queue(self)->Polygon:
|
|
if self.current_setup is True:
|
|
queue_area = Polygon([
|
|
(18.767,9.395),
|
|
(16.924,9.395),
|
|
(15.214,9.395), #prev trigger area
|
|
(15.214,0), # ||
|
|
(16.924,0), # ||
|
|
(16.924,4.569),
|
|
(25.0,4.569),
|
|
(25.0,16.823)
|
|
])
|
|
else:
|
|
queue_area = Polygon([
|
|
(19.531,10.306),
|
|
(15.214,10.306),
|
|
(13.88,10.306), #prev trigger area
|
|
(12.98,9.896), # ||
|
|
(12.98,0), # ||
|
|
(15.214,0), #
|
|
(15.214,0),
|
|
(16.924,0),
|
|
(16.924,4.569),
|
|
(25.0,4.569),
|
|
(25.0,16.823)
|
|
])
|
|
return queue_area
|
|
|
|
def grass(self)->Polygon:
|
|
if self.current_setup is True:
|
|
grass_area = Polygon([
|
|
(4,0),
|
|
(0,0),
|
|
(0,17.761),
|
|
(4,17.761)
|
|
])
|
|
else:
|
|
grass_area = Polygon([
|
|
(6.23,0),
|
|
(0,0),
|
|
(0,17.761),
|
|
(4,17.761),
|
|
(4,10.306),
|
|
(5.33,10.306),
|
|
(6.23,9.896)
|
|
])
|
|
return grass_area
|
|
|
|
def crosswalk(self)->Polygon:
|
|
if self.current_setup is True:
|
|
crosswalk_area = Polygon([
|
|
(4,6.068),
|
|
(4,7.896),
|
|
(4,7.896),
|
|
(15.214,7.896),
|
|
(15.214,6.068),
|
|
(4,6.068)
|
|
])
|
|
else:
|
|
crosswalk_area = Polygon([
|
|
(6.23,4.982),
|
|
(6.23,8.982),
|
|
(12.98,8.982),
|
|
(12.98,4.982)
|
|
])
|
|
return crosswalk_area
|
|
|
|
def entry_polygon(self)->Polygon:
|
|
if self.current_setup is True:
|
|
entry_area = Polygon([ # x: 2.9m, y: 3.428m
|
|
(15.314,5.268), # dx: 0.1m, dy: 0.8m
|
|
(15.314,8.696),
|
|
(18.214,8.696),
|
|
(18.214,5.268)
|
|
])
|
|
else:
|
|
entry_area = Polygon([ # x: 2.9m, y: 5.6m
|
|
(15.98,9.782),
|
|
(15.98,4.182),
|
|
(13.08,4.182),
|
|
(13.08,9.782)
|
|
])
|
|
return entry_area
|
|
|
|
def exit_polygon(self)->Polygon:
|
|
if self.current_setup is True:
|
|
exit_area = Polygon([ # x: 2.9m, y: 3.428m
|
|
(1,5.268),
|
|
(1,8.696),
|
|
(3.9,8.696),
|
|
(3.9,5.268)
|
|
])
|
|
else:
|
|
exit_area = Polygon([ # x: 2.9m, y: 5.6m
|
|
(3.23,4.182),
|
|
(3.23,9.782),
|
|
(6.13,9.782),
|
|
(6.13,4.182)
|
|
])
|
|
return exit_area
|
|
|
|
def plot_all(self)->Tuple[Figure,Axes]:
|
|
plot_polygon(self.spawn_area,color="green",add_points=False)
|
|
plot_polygon(self.queue_area,color="blue",add_points=False)
|
|
plot_polygon(self.grass_area,color="blue",add_points=False)
|
|
plot_polygon(self.crosswalk_area,color="red",add_points=False)
|
|
plot_polygon(self.entry_area,color="black",add_points=False)
|
|
plot_polygon(self.exit_area,color="black",add_points=False)
|
|
return
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
from PyQt6 import QtWidgets
|
|
app = QtWidgets.QApplication(sys.argv)
|
|
GeometrySetup(False).plot_all()
|
|
plt.show(block=False)
|
|
sys.exit(app.exec())
|
|
|
|
===================
|
|
(agent_setup.py)
|
|
===================
|
|
from agent_data import grade_data
|
|
from dataclasses import dataclass
|
|
import numpy as np
|
|
from typing import List
|
|
|
|
@dataclass
|
|
class AgentSetup:
|
|
id:int
|
|
grade:str
|
|
speed:float
|
|
radius:float
|
|
|
|
def AgentConfig()->List[List[AgentSetup],List[int]]:
|
|
agent_id = 1
|
|
pop_list = []
|
|
all_agents = []
|
|
for idx,key in enumerate(grade_data.keys()):
|
|
grade = grade_data[key]
|
|
pop_list.append(grade["Pop Current"])
|
|
for i in range(pop_list[idx]):
|
|
rng = np.random.default_rng()
|
|
agent = AgentSetup(
|
|
id=agent_id,
|
|
grade=key,
|
|
speed=rng.normal(
|
|
grade["Speed Mean"],
|
|
grade["Speed Std Dev"]
|
|
),
|
|
radius=grade["Radius"]
|
|
)
|
|
all_agents.append(agent)
|
|
agent_id +=1
|
|
return all_agents, pop_list
|
|
|
|
|
|
===================
|
|
(crosswalk_setup.py)
|
|
===================
|
|
|
|
from dataclasses import dataclass,field
|
|
from enum import Enum
|
|
from typing import List,Dict,Tuple,Set
|
|
from shapely import Polygon,Point
|
|
from geometry_setup import GeometrySetup
|
|
import shapely
|
|
import numpy as np
|
|
|
|
class CrosswalkStatus(Enum):
|
|
INACTIVE = "inactive"
|
|
ACTIVE = "active"
|
|
COOLDOWN = "cooldown"
|
|
|
|
@dataclass(frozen=True)
|
|
class CrosswalkConfig:
|
|
trigger_dist:float
|
|
activation_time:float
|
|
cooldown_time:float
|
|
min_agents_activation:int
|
|
activation_delay:float
|
|
crosswalk_area:Polygon
|
|
trigger_area:Polygon
|
|
|
|
@dataclass
|
|
class CrosswalkState:
|
|
status:CrosswalkStatus
|
|
is_active:bool
|
|
time_active:float
|
|
time_remaining:float
|
|
agents_in_trigger:int
|
|
agents_waiting:int
|
|
activation_count:int
|
|
|
|
class CrosswalkController:
|
|
def __init__(self,config:CrosswalkConfig,current_setup:bool):
|
|
self.config = config
|
|
self.crosswalk_area = config.crosswalk_area
|
|
self.trigger_area = config.trigger_area
|
|
self.status = CrosswalkStatus.INACTIVE
|
|
self.state_start_time = 0.0
|
|
self.current_time = 0.0
|
|
self.activation_count = 0
|
|
self.agents_in_trigger: Set[int] = set()
|
|
self.agents_waiting: Set[int] = set()
|
|
self.total_active_time = 0.0
|
|
self.agents_served = 0
|
|
|
|
def update(
|
|
self,
|
|
agent_pos:Dict[int,Tuple[float,float]],
|
|
time_step:float,
|
|
current_time:float=None
|
|
)->CrosswalkState:
|
|
if current_time is not None:
|
|
self.current_time = current_time
|
|
else:
|
|
self.current_time +=time_step
|
|
self.update_agents_in_trigger(agent_pos)
|
|
self.update_state(time_step)
|
|
self.update_statistics(time_step)
|
|
return self.get_state()
|
|
|
|
def update_agents_in_trigger(
|
|
self,
|
|
agent_pos:Dict[int,Tuple[float,float]]
|
|
)->None:
|
|
self.agents_in_trigger.clear()
|
|
if not agent_pos:
|
|
return
|
|
agent_ids = list(agent_pos.keys())
|
|
positions = np.array(list(agent_pos.values()),dtype=np.float32)
|
|
in_trigger = self.points_in_area(positions,self.trigger_area)
|
|
for i, agent_id in enumerate(agent_ids):
|
|
if in_trigger[i]:
|
|
self.agents_in_trigger.add(agent_id)
|
|
|
|
@staticmethod
|
|
def points_in_area(points:np.ndarray,polygon:Polygon)->np.ndarray:
|
|
if len(points) ==0:
|
|
return np.array([],dtype=bool)
|
|
x,y = points[:,0],points[:,1]
|
|
vertices = shapely.get_coordinates(polygon)
|
|
n = len(vertices)
|
|
inside = np.zeros(len(points),dtype=bool)
|
|
j = n-1
|
|
for i in range(n):
|
|
xi, yi = vertices[i]
|
|
xj, yj = vertices[j]
|
|
mask = ((yi>y) != (yj>y)) & (x<(xj-xi)*(y-yi)/(yj-yi)+xi)
|
|
inside ^= mask
|
|
j = i
|
|
return inside
|
|
|
|
def update_state(self,time_step:float)->None:
|
|
elapsed = self.current_time - self.state_start_time
|
|
if self.status == CrosswalkStatus.ACTIVE:
|
|
if elapsed >= self.config.activation_time:
|
|
self.deactivate()
|
|
elif self.status == CrosswalkStatus.COOLDOWN:
|
|
if elapsed >= self.config.cooldown_time:
|
|
self.status = CrosswalkStatus.INACTIVE
|
|
self.state_start_time = self.current_time
|
|
elif self.status ==CrosswalkStatus.INACTIVE:
|
|
if (len(self.agents_in_trigger)>=self.config.min_agents_activation and \
|
|
elapsed >= self.config.activation_delay):
|
|
self.activate()
|
|
|
|
def activate(self)->None:
|
|
self.status = CrosswalkStatus.ACTIVE
|
|
self.state_start_time = self.current_time
|
|
self.activation_count +=1
|
|
self.agents_served += len(self.agents_waiting)
|
|
self.agents_waiting.clear()
|
|
|
|
def deactivate(self)->None:
|
|
self.status = CrosswalkStatus.COOLDOWN
|
|
self.state_start_time = self.current_time
|
|
|
|
def update_statistics(self,time_step:float)->None:
|
|
if self.status == CrosswalkStatus.ACTIVE:
|
|
self.total_active_time += time_step
|
|
|
|
def get_state(self)->CrosswalkState:
|
|
elapsed = self.current_time-self.state_start_time
|
|
if self.status == CrosswalkStatus.ACTIVE:
|
|
time_remaining = max(0.0,self.config.activation_time-elapsed)
|
|
elif self.status == CrosswalkStatus.COOLDOWN:
|
|
time_remaining = max(0.0,self.config.cooldown_duration-elapsed)
|
|
else:
|
|
time_remaining = 0.0
|
|
return CrosswalkState(
|
|
status=self.status,
|
|
is_active=(self.status == CrosswalkStatus.ACTIVE),
|
|
time_active=elapsed if self.status == CrosswalkStatus.ACTIVE else 0.0,
|
|
time_remaining=time_remaining,
|
|
agents_in_trigger=len(self.agents_in_trigger),
|
|
agents_waiting=len(self.agents_waiting),
|
|
activation_count = self.activation_count
|
|
)
|
|
def can_agent_cross(self,agent_id:int)->bool:
|
|
if self.status == CrosswalkStatus.ACTIVE:
|
|
return True
|
|
else:
|
|
if agent_id in self.agents_in_trigger:
|
|
self.agents_waiting.add(agent_id)
|
|
return False
|
|
|
|
@property
|
|
def is_active(self)->bool:
|
|
return self.status == CrosswalkStatus.ACTIVE
|
|
|
|
def get_statistics(self)->Dict[str,float]:
|
|
return {
|
|
"total_activations":self.activation_count,
|
|
"total_active_time":self.total_active_time,
|
|
"agents_served":self.agents_served,
|
|
"current_agents_in_trigger":len(self.agents_in_trigger),
|
|
"current_agents_waiting":len(self.agents_waiting)
|
|
}
|
|
|
|
===================
|
|
(spawning.py)
|
|
===================
|
|
from shapely.geometry import Polygon,Point
|
|
from typing import Tuple, List, Dict, Set
|
|
from scipy.spatial import cKDTree
|
|
import sys
|
|
import math
|
|
import numpy as np
|
|
import config
|
|
sys.path.insert(0,str(config.AGENTS_DIR))
|
|
from agents.agent_setup import AgentSetup
|
|
|
|
|
|
class SpawnManager:
|
|
def __init__(self,spawn_area:Polygon,min_spacing:float=0.55):
|
|
"""
|
|
self.spawn_area: geometry where agents may spawn
|
|
self.min_spacing: minimum spacing for spawn_points
|
|
self.spawn_coords: all spawn points available
|
|
self.filled_coords: spawn points currently filled by agents
|
|
self.spawned_agents: all agents already spawned in sim
|
|
self.agent_pos: connects agent_id to spawn_index
|
|
self.rng: random number generator object
|
|
"""
|
|
|
|
self.spawn_area = spawn_area
|
|
self.min_spacing = min_spacing
|
|
self.spawn_coords: np.ndarray = np.array([])
|
|
self.filled_coords: np.ndarray = np.array([])
|
|
self.spawned_agents: set = set()
|
|
self.agent_pos:dict={}
|
|
self.rng = np.random.default_rng()
|
|
|
|
def generate_coords(self,max_samples:int=1000)->None:
|
|
min_x,min_y,max_x,max_y = self.spawn_area.bounds
|
|
points = []
|
|
while len(points) ==0:
|
|
x = self.rng.uniform(min_x,max_x)
|
|
y = self.rng.uniform(min_y,max_y)
|
|
if self.spawn_area.contains(Point(x,y)):
|
|
points.append([x,y])
|
|
for _ in range(max_samples):
|
|
idx = self.rng.integers(0,len(points),endpoint=True)
|
|
base = points[idx]
|
|
for _ in range(25):
|
|
angle = self.rng.uniform(0,2*np.pi)
|
|
radius = self.rng.uniform(self.min_spacing,2*self.min_spacing)
|
|
x = base[0]+radius*np.cos(angle)
|
|
y = base[1]+radius*np.sin(angle)
|
|
if not self.spawn_area.contains(Point(x,y)):
|
|
continue
|
|
if len(points)>0:
|
|
tree = cKDTree(points)
|
|
distance,_ = tree.query([[x,y]],k=1)
|
|
if distance[0]<self.min_spacing:
|
|
continue
|
|
points.append([x,y])
|
|
break
|
|
self.spawn_coords = np.array(points)
|
|
self.filled_coords = np.zeros(len(points),dtype=bool)
|
|
#return points
|
|
|
|
|
|
|
|
def get_coords(self)->Tuple[float,float]|None:
|
|
'''
|
|
points = [
|
|
pt for pt in self.generate_coords if pt \
|
|
not in self.filled_coords.values()
|
|
]
|
|
return self.rng.choice(points) if points else None
|
|
'''
|
|
free_idx = np.where(~self.filled_coords)[0]
|
|
if len(free_idx) == 0:
|
|
return None
|
|
idx = self.rng.choice(free_idx)
|
|
return tuple(self.spawn_coords[idx])
|
|
|
|
|
|
def spawn_agent(self,all_agents:List[AgentSetup])->AgentSetup|None:
|
|
if len(self.spawned_agents) >= len(all_agents):
|
|
return None
|
|
spawn_point = self.get_coords()
|
|
if not spawn_point:
|
|
return None
|
|
free_agents = [
|
|
agent for agent in all_agents \
|
|
if agent.id not in self.spawned_agents
|
|
]
|
|
if not free_agents:
|
|
return None
|
|
agent = self.rng.choice(free_agents)
|
|
self.spawned_agents.add(agent.id)
|
|
distances = np.linalg.norm(self.spawn_coords-spawn_point,axis=1)
|
|
spawn_idx = np.argmin(distances)
|
|
self.filled_coords[spawn_idx] = True
|
|
self.agent_pos[agent.id] = spawn_idx
|
|
return agent
|
|
|
|
def unfill_coords(self,agent_id:int)->None:
|
|
if agent_id in self.agent_pos:
|
|
spawn_idx = self.agent_pos[agent_id]
|
|
self.filled_coords[spawn_idx] = False
|
|
del self.agent_pos[agent_id]
|
|
self.spawned_agents.discard(agent_id)
|
|
|
|
def get_agent_pos(self,agent_id:int)->Tuple[float,float]:
|
|
if agent_id in self.agent_pos:
|
|
spawn_idx = self.agent_pos[agent_id]
|
|
return tuple(self.spawn_coords[spawn_idx])
|
|
return (0,0)
|
|
|
|
def check_spawn_complete(self,all_agents:List[AgentSetup])->bool:
|
|
return (len(self.spawned_agents)<len(all_agents) and\
|
|
np.sum(~self.filled_coords) > 0)
|
|
|
|
|
|
===================
|
|
(simulation.py)
|
|
===================
|
|
|
|
from typing import Tuple,List,Dict
|
|
import sys
|
|
import matplotlib
|
|
matplotlib.use('QtAgg')
|
|
import matplotlib.pyplot as plt
|
|
from shapely import Polygon,Point,GeometryCollection,get_coordinates
|
|
from shapely.plotting import plot_polygon
|
|
import jupedsim as jps
|
|
import numpy as np
|
|
import pathlib
|
|
from geometry_setup import GeometrySetup
|
|
from spawning import SpawnManager
|
|
from crosswalk_setup import CrosswalkStatus,CrosswalkController,CrosswalkState,CrosswalkConfig
|
|
from agents.agent_setup import AgentConfig
|
|
from dataclasses import dataclass
|
|
|
|
@dataclass
|
|
class SimulationSetup:
|
|
simulation_time:float=200.0
|
|
time_step:float=0.01
|
|
max_iterations:int=50000
|
|
trajectory_file:str="crosswalk_sim.sqlite"
|
|
current_setup:bool=True
|
|
trigger_dist:float=1.0
|
|
activation_time:float=30.0
|
|
cooldown_time:float=10.0
|
|
min_agents_activation:int=1
|
|
activation_delay:float=2.0
|
|
|
|
class CrosswalkSimulation:
|
|
def __init__(self,config:SimulationSetup):
|
|
self.config = config
|
|
|
|
print("\nInitializing Geometry...")
|
|
self.geo = GeometrySetup(self.config.current_setup)
|
|
self.spawn_area = self.geo.spawn()
|
|
self.queue_area = self.geo.queue()
|
|
self.grass_area = self.geo.grass()
|
|
self.crosswalk_area = self.geo.crosswalk()
|
|
self.entry_area = self.geo.entry_polygon()
|
|
self.exit_area = self.geo.exit_polygon()
|
|
self.walkable_area = GeometryCollection([
|
|
self.spawn_area,
|
|
self.queue_area,
|
|
self.grass_area,
|
|
self.crosswalk_area,
|
|
]).unary_union
|
|
print("Geometry Configuration Complete!")
|
|
|
|
print("\nInitializing Spawn Manager...")
|
|
self.spawn_manager = SpawnManager(self.spawn_area,min_spacing=0.55)
|
|
self.spawn_manager.generate_coords()
|
|
print("Spawn Manager Setup Complete!")
|
|
|
|
print("\nInitializing Crosswalk Controller...")
|
|
self.crosswalk_controller = CrosswalkController(
|
|
CrosswalkConfig(
|
|
trigger_dist=self.config.trigger_dist,
|
|
activation_time=self.config.activation_time,
|
|
cooldown_time=self.config.cooldown_time,
|
|
min_agents_activation=self.config.min_agents_activation,
|
|
activation_delay=self.config.activation_delay,
|
|
crosswalk_area=self.crosswalk_area,
|
|
trigger_area=self.entry_area
|
|
),
|
|
self.config.current_setup
|
|
)
|
|
print("Crosswalk Controller Setup Complete!")
|
|
|
|
print("\nInitializing Agent Setup...")
|
|
self.all_agents, pop_list = AgentConfig()
|
|
self.total_agents = len(self.all_agents)
|
|
print(f"Created {self.total_agents} Agents...")
|
|
print("Agent Setup Complete!")
|
|
|
|
print("\nInitializing JuPedSim Model...")
|
|
self.trajectory_file = self.config.trajectory_file
|
|
self.simulation = jps.Simulation(
|
|
model=jps.CollisionFreeSpeedModel(),
|
|
geometry=self.walkable_area,
|
|
trajectory_writer=jps.SqliteTrajectoryWriter(
|
|
output_file=pathlib.Path(self.trajectory_file)
|
|
),
|
|
dt=self.config.time_step
|
|
)
|
|
self.journey_setup()
|
|
self.jps_agent_ids = {}
|
|
# unsure whether these are necessary
|
|
#
|
|
#self.agent_targets = {}
|
|
#self.active_agents = {}
|
|
self.iteration_count = 0
|
|
self.current_time = 0.0
|
|
print(f"\nJuPedSim Model Initialized with:")
|
|
print(f"\tTime Step: {self.config.time_step} s")
|
|
print(f"\tMax Iterations: {self.config.max_iterations}")
|
|
print(f"\tCrosswalk Activation Length: {self.config.activation_time} s")
|
|
print(f"\tCrosswalk Cooldown Length: {self.config.cooldown_time} s")
|
|
|
|
|
|
def journey_setup(self):
|
|
self.queue_waiting_coords = self.waiting_coords(
|
|
self.queue_area,self.spawn_manager.min_spacing
|
|
)
|
|
self.queue_stage_id = self.simulation.add_waiting_set_stage(
|
|
self.queue_waiting_coords
|
|
)
|
|
self.queue_stage = self.simulation.get_stage(self.queue_stage_id)
|
|
self.queue_stage.state = jps.WaitingSetState.ACTIVE
|
|
self.crosswalk_entry_id = self.simulation.add_waypoint_stage(
|
|
self.entry_area,
|
|
self.spawn_manager.min_spacing
|
|
)
|
|
self.crosswalk_exit_id = self.simulation.add_waypoint_stage(
|
|
self.exit_area,
|
|
self.spawn_manager.min_spacing
|
|
)
|
|
self.journey = jps.JourneyDescription([
|
|
self.queue_stage_id,
|
|
self.crosswalk_entry_id,
|
|
self.crosswalk_exit_id,
|
|
])
|
|
self.journey.set_transition_for_stage(
|
|
self.queue_stage_id,
|
|
jps.Transition.create_fixed_transition(
|
|
self.crosswalk_entry_id
|
|
))
|
|
self.journey.set_transition_for_stage(
|
|
self.crosswalk_entry_id,
|
|
jps.Transition.create_fixed_transition(
|
|
self.crosswalk_exit_id
|
|
))
|
|
self.journey_id = self.simulation.add_journey(
|
|
self.journey)
|
|
|
|
def waiting_coords(self,area:Polygon,spacing:float)->List[Tuple[float,float]]:
|
|
min_x,min_y,max_x,max_y = area.bounds
|
|
points = []
|
|
x = (min_x+spacing)/2
|
|
while x<max_x:
|
|
y = (min_y+spacing)/2
|
|
while y < max_y:
|
|
if area.contains(Point(x, y)):
|
|
points.append((x,y))
|
|
y += spacing
|
|
x += spacing
|
|
return points if points else [(min_x+0.5, min_y+0.5)]
|
|
|
|
def run(self,max_iterations:int):
|
|
print("\n\nStarting Simulation")
|
|
print("Target Agents: {len(self.all_agents)}")
|
|
while self.iteration_count < max_iterations:
|
|
if self.spawn_manager.check_spawn_complete(self.all_agents):
|
|
new_agent = self.spawn_manager.spawn_agent(self.all_agents)
|
|
if new_agent:
|
|
jps_agent_params = jps.CollisionFreeSpeedModelAgentParameters(
|
|
journey_id=self.journey_id,
|
|
stage_id=self.queue_stage_id,
|
|
position=self.spawn_manager.get_agent_pos(new_agent.id),
|
|
v0=new_agent.speed,
|
|
radius=new_agent.radius
|
|
)
|
|
jps_agent_id = self.simulation.add_agent(jps_agent_params)
|
|
self.jps_agent_ids[new_agent.id] = jps_agent_id
|
|
# unsure whether this is necessary
|
|
#
|
|
# self.agent_targets[new_agent.id] = None
|
|
print(f"Iter {self.iteration_count:05d}: Spawned agent {new_agent.id} (JPS-ID: {jps_agent_id})")
|
|
current_agent_pos = {}
|
|
for spawn_id,jps_id in self.jps_agent_ids.items():
|
|
agent = self.simulation.agent(jps_id)
|
|
current_agent_pos[spawn_id] = (agent.position[0],agent.position[1])
|
|
crosswalk_state = self.crosswalk_controller.update(
|
|
current_agent_pos,self.config.time_step,self.current_time
|
|
)
|
|
if crosswalk_state.is_active:
|
|
if self.queue_stage.stage != jps.WaitingSetState.INACTIVE:
|
|
self.queue_stage.stage = jps.WaitingSetState.INACTIVE
|
|
print(f"Iter {self.iteration_count:05d}: Crosswalk ACTIVATED, releasing queue.")
|
|
else:
|
|
if self.queue_stage.stage != jps.WaitingSetState.ACTIVE:
|
|
self.queue_stage.stage = jps.WaitingSetState.INACTIVE
|
|
print(f"Iter {self.iteration_count:05d}: Crosswalk INACTIVE, holding agents.")
|
|
self.simulation.iterate()
|
|
self.current_time += self.config.time_step
|
|
self.iteration_count +=1
|
|
agents_to_remove = []
|
|
for spawn_id, jps_id in list(self.jps_agent_ids.items()):
|
|
try:
|
|
_ = self.simulation.agent(jps_id)
|
|
except RuntimeError:
|
|
agents_to_remove.append(spawn_id)
|
|
self.spawn_manager.unfill_coords(spawn_id)
|
|
for spawn_id in agents_to_remove:
|
|
del self.jps_agent_ids[spawn_id]
|
|
print(f"Iter {self.iteration_count:05d}: Agent {spawn_id} reached exit and was removed.")
|
|
if (len(self.jps_agent_ids) ==0 and not self.spawn_manager.check_spawn_complete(self.all_agents)):
|
|
print(f"\nSimulation Completed at Iteration: {self.iteration_count}")
|
|
print(f"Total Simulation Time: {self.current_time:.2f} seconds.")
|
|
break
|
|
print("\n" + "="*50)
|
|
print("Simulation finished.")
|
|
stats = self.crosswalk_controller.get_statistics()
|
|
print(f"Crosswalk was activated {stats['total_activations']} times.")
|
|
print(f"Agents served by crosswalk: {stats['agents_served']}")
|
|
print(f"Trajectory saved to: {self.trajectory_file.absolute()}")
|
|
|
|
|