The text file includes all the files which makeup my simulation configuration. could you please run through them and fix any potential errors, delete any unused or unnecessary variables? =================== (geometry_setup.py) =================== import sys import matplotlib matplotlib.use('QtAgg') import matplotlib.pyplot as plt from matplotlib.figure import Figure from matplotlib.axes import Axes from shapely import Polygon from shapely.plotting import plot_polygon from typing import Tuple class GeometrySetup: def __init__(self,current_setup:bool): self.current_setup = current_setup self.spawn_area = self.spawn() self.queue_area = self.queue() self.grass_area = self.grass() self.crosswalk_area = self.crosswalk() self.entry_area = self.entry_polygon() self.exit_area = self.exit_polygon() def spawn(self)->Polygon: spawn_area = Polygon([ (25.0,16.823), (25.0,4.569), (26.163,4.569), (28.214,5.07), (28.214,17.761), (25.787,17.761), ]) return spawn_area def queue(self)->Polygon: if self.current_setup is True: queue_area = Polygon([ (18.767,9.395), (16.924,9.395), (15.214,9.395), #prev trigger area (15.214,0), # || (16.924,0), # || (16.924,4.569), (25.0,4.569), (25.0,16.823) ]) else: queue_area = Polygon([ (19.531,10.306), (15.214,10.306), (13.88,10.306), #prev trigger area (12.98,9.896), # || (12.98,0), # || (15.214,0), # (15.214,0), (16.924,0), (16.924,4.569), (25.0,4.569), (25.0,16.823) ]) return queue_area def grass(self)->Polygon: if self.current_setup is True: grass_area = Polygon([ (4,0), (0,0), (0,17.761), (4,17.761) ]) else: grass_area = Polygon([ (6.23,0), (0,0), (0,17.761), (4,17.761), (4,10.306), (5.33,10.306), (6.23,9.896) ]) return grass_area def crosswalk(self)->Polygon: if self.current_setup is True: crosswalk_area = Polygon([ (4,6.068), (4,7.896), (4,7.896), (15.214,7.896), (15.214,6.068), (4,6.068) ]) else: crosswalk_area = Polygon([ (6.23,4.982), (6.23,8.982), (12.98,8.982), (12.98,4.982) ]) return crosswalk_area def entry_polygon(self)->Polygon: if self.current_setup is True: entry_area = Polygon([ # x: 2.9m, y: 3.428m (15.314,5.268), # dx: 0.1m, dy: 0.8m (15.314,8.696), (18.214,8.696), (18.214,5.268) ]) else: entry_area = Polygon([ # x: 2.9m, y: 5.6m (15.98,9.782), (15.98,4.182), (13.08,4.182), (13.08,9.782) ]) return entry_area def exit_polygon(self)->Polygon: if self.current_setup is True: exit_area = Polygon([ # x: 2.9m, y: 3.428m (1,5.268), (1,8.696), (3.9,8.696), (3.9,5.268) ]) else: exit_area = Polygon([ # x: 2.9m, y: 5.6m (3.23,4.182), (3.23,9.782), (6.13,9.782), (6.13,4.182) ]) return exit_area def plot_all(self)->Tuple[Figure,Axes]: plot_polygon(self.spawn_area,color="green",add_points=False) plot_polygon(self.queue_area,color="blue",add_points=False) plot_polygon(self.grass_area,color="blue",add_points=False) plot_polygon(self.crosswalk_area,color="red",add_points=False) plot_polygon(self.entry_area,color="black",add_points=False) plot_polygon(self.exit_area,color="black",add_points=False) return if __name__ == "__main__": from PyQt6 import QtWidgets app = QtWidgets.QApplication(sys.argv) GeometrySetup(False).plot_all() plt.show(block=False) sys.exit(app.exec()) =================== (agent_setup.py) =================== from agent_data import grade_data from dataclasses import dataclass import numpy as np from typing import List @dataclass class AgentSetup: id:int grade:str speed:float radius:float def AgentConfig()->List[List[AgentSetup],List[int]]: agent_id = 1 pop_list = [] all_agents = [] for idx,key in enumerate(grade_data.keys()): grade = grade_data[key] pop_list.append(grade["Pop Current"]) for i in range(pop_list[idx]): rng = np.random.default_rng() agent = AgentSetup( id=agent_id, grade=key, speed=rng.normal( grade["Speed Mean"], grade["Speed Std Dev"] ), radius=grade["Radius"] ) all_agents.append(agent) agent_id +=1 return all_agents, pop_list =================== (crosswalk_setup.py) =================== from dataclasses import dataclass,field from enum import Enum from typing import List,Dict,Tuple,Set from shapely import Polygon,Point from geometry_setup import GeometrySetup import shapely import numpy as np class CrosswalkStatus(Enum): INACTIVE = "inactive" ACTIVE = "active" COOLDOWN = "cooldown" @dataclass(frozen=True) class CrosswalkConfig: trigger_dist:float activation_time:float cooldown_time:float min_agents_activation:int activation_delay:float crosswalk_area:Polygon trigger_area:Polygon @dataclass class CrosswalkState: status:CrosswalkStatus is_active:bool time_active:float time_remaining:float agents_in_trigger:int agents_waiting:int activation_count:int class CrosswalkController: def __init__(self,config:CrosswalkConfig,current_setup:bool): self.config = config self.crosswalk_area = config.crosswalk_area self.trigger_area = config.trigger_area self.status = CrosswalkStatus.INACTIVE self.state_start_time = 0.0 self.current_time = 0.0 self.activation_count = 0 self.agents_in_trigger: Set[int] = set() self.agents_waiting: Set[int] = set() self.total_active_time = 0.0 self.agents_served = 0 def update( self, agent_pos:Dict[int,Tuple[float,float]], time_step:float, current_time:float=None )->CrosswalkState: if current_time is not None: self.current_time = current_time else: self.current_time +=time_step self.update_agents_in_trigger(agent_pos) self.update_state(time_step) self.update_statistics(time_step) return self.get_state() def update_agents_in_trigger( self, agent_pos:Dict[int,Tuple[float,float]] )->None: self.agents_in_trigger.clear() if not agent_pos: return agent_ids = list(agent_pos.keys()) positions = np.array(list(agent_pos.values()),dtype=np.float32) in_trigger = self.points_in_area(positions,self.trigger_area) for i, agent_id in enumerate(agent_ids): if in_trigger[i]: self.agents_in_trigger.add(agent_id) @staticmethod def points_in_area(points:np.ndarray,polygon:Polygon)->np.ndarray: if len(points) ==0: return np.array([],dtype=bool) x,y = points[:,0],points[:,1] vertices = shapely.get_coordinates(polygon) n = len(vertices) inside = np.zeros(len(points),dtype=bool) j = n-1 for i in range(n): xi, yi = vertices[i] xj, yj = vertices[j] mask = ((yi>y) != (yj>y)) & (x<(xj-xi)*(y-yi)/(yj-yi)+xi) inside ^= mask j = i return inside def update_state(self,time_step:float)->None: elapsed = self.current_time - self.state_start_time if self.status == CrosswalkStatus.ACTIVE: if elapsed >= self.config.activation_time: self.deactivate() elif self.status == CrosswalkStatus.COOLDOWN: if elapsed >= self.config.cooldown_time: self.status = CrosswalkStatus.INACTIVE self.state_start_time = self.current_time elif self.status ==CrosswalkStatus.INACTIVE: if (len(self.agents_in_trigger)>=self.config.min_agents_activation and \ elapsed >= self.config.activation_delay): self.activate() def activate(self)->None: self.status = CrosswalkStatus.ACTIVE self.state_start_time = self.current_time self.activation_count +=1 self.agents_served += len(self.agents_waiting) self.agents_waiting.clear() def deactivate(self)->None: self.status = CrosswalkStatus.COOLDOWN self.state_start_time = self.current_time def update_statistics(self,time_step:float)->None: if self.status == CrosswalkStatus.ACTIVE: self.total_active_time += time_step def get_state(self)->CrosswalkState: elapsed = self.current_time-self.state_start_time if self.status == CrosswalkStatus.ACTIVE: time_remaining = max(0.0,self.config.activation_time-elapsed) elif self.status == CrosswalkStatus.COOLDOWN: time_remaining = max(0.0,self.config.cooldown_duration-elapsed) else: time_remaining = 0.0 return CrosswalkState( status=self.status, is_active=(self.status == CrosswalkStatus.ACTIVE), time_active=elapsed if self.status == CrosswalkStatus.ACTIVE else 0.0, time_remaining=time_remaining, agents_in_trigger=len(self.agents_in_trigger), agents_waiting=len(self.agents_waiting), activation_count = self.activation_count ) def can_agent_cross(self,agent_id:int)->bool: if self.status == CrosswalkStatus.ACTIVE: return True else: if agent_id in self.agents_in_trigger: self.agents_waiting.add(agent_id) return False @property def is_active(self)->bool: return self.status == CrosswalkStatus.ACTIVE def get_statistics(self)->Dict[str,float]: return { "total_activations":self.activation_count, "total_active_time":self.total_active_time, "agents_served":self.agents_served, "current_agents_in_trigger":len(self.agents_in_trigger), "current_agents_waiting":len(self.agents_waiting) } =================== (spawning.py) =================== from shapely.geometry import Polygon,Point from typing import Tuple, List, Dict, Set from scipy.spatial import cKDTree import sys import math import numpy as np import config sys.path.insert(0,str(config.AGENTS_DIR)) from agents.agent_setup import AgentSetup class SpawnManager: def __init__(self,spawn_area:Polygon,min_spacing:float=0.55): """ self.spawn_area: geometry where agents may spawn self.min_spacing: minimum spacing for spawn_points self.spawn_coords: all spawn points available self.filled_coords: spawn points currently filled by agents self.spawned_agents: all agents already spawned in sim self.agent_pos: connects agent_id to spawn_index self.rng: random number generator object """ self.spawn_area = spawn_area self.min_spacing = min_spacing self.spawn_coords: np.ndarray = np.array([]) self.filled_coords: np.ndarray = np.array([]) self.spawned_agents: set = set() self.agent_pos:dict={} self.rng = np.random.default_rng() def generate_coords(self,max_samples:int=1000)->None: min_x,min_y,max_x,max_y = self.spawn_area.bounds points = [] while len(points) ==0: x = self.rng.uniform(min_x,max_x) y = self.rng.uniform(min_y,max_y) if self.spawn_area.contains(Point(x,y)): points.append([x,y]) for _ in range(max_samples): idx = self.rng.integers(0,len(points),endpoint=True) base = points[idx] for _ in range(25): angle = self.rng.uniform(0,2*np.pi) radius = self.rng.uniform(self.min_spacing,2*self.min_spacing) x = base[0]+radius*np.cos(angle) y = base[1]+radius*np.sin(angle) if not self.spawn_area.contains(Point(x,y)): continue if len(points)>0: tree = cKDTree(points) distance,_ = tree.query([[x,y]],k=1) if distance[0]Tuple[float,float]|None: ''' points = [ pt for pt in self.generate_coords if pt \ not in self.filled_coords.values() ] return self.rng.choice(points) if points else None ''' free_idx = np.where(~self.filled_coords)[0] if len(free_idx) == 0: return None idx = self.rng.choice(free_idx) return tuple(self.spawn_coords[idx]) def spawn_agent(self,all_agents:List[AgentSetup])->AgentSetup|None: if len(self.spawned_agents) >= len(all_agents): return None spawn_point = self.get_coords() if not spawn_point: return None free_agents = [ agent for agent in all_agents \ if agent.id not in self.spawned_agents ] if not free_agents: return None agent = self.rng.choice(free_agents) self.spawned_agents.add(agent.id) distances = np.linalg.norm(self.spawn_coords-spawn_point,axis=1) spawn_idx = np.argmin(distances) self.filled_coords[spawn_idx] = True self.agent_pos[agent.id] = spawn_idx return agent def unfill_coords(self,agent_id:int)->None: if agent_id in self.agent_pos: spawn_idx = self.agent_pos[agent_id] self.filled_coords[spawn_idx] = False del self.agent_pos[agent_id] self.spawned_agents.discard(agent_id) def get_agent_pos(self,agent_id:int)->Tuple[float,float]: if agent_id in self.agent_pos: spawn_idx = self.agent_pos[agent_id] return tuple(self.spawn_coords[spawn_idx]) return (0,0) def check_spawn_complete(self,all_agents:List[AgentSetup])->bool: return (len(self.spawned_agents) 0) =================== (simulation.py) =================== from typing import Tuple,List,Dict import sys import matplotlib matplotlib.use('QtAgg') import matplotlib.pyplot as plt from shapely import Polygon,Point,GeometryCollection,get_coordinates from shapely.plotting import plot_polygon import jupedsim as jps import numpy as np import pathlib from geometry_setup import GeometrySetup from spawning import SpawnManager from crosswalk_setup import CrosswalkStatus,CrosswalkController,CrosswalkState,CrosswalkConfig from agents.agent_setup import AgentConfig from dataclasses import dataclass @dataclass class SimulationSetup: simulation_time:float=200.0 time_step:float=0.01 max_iterations:int=50000 trajectory_file:str="crosswalk_sim.sqlite" current_setup:bool=True trigger_dist:float=1.0 activation_time:float=30.0 cooldown_time:float=10.0 min_agents_activation:int=1 activation_delay:float=2.0 class CrosswalkSimulation: def __init__(self,config:SimulationSetup): self.config = config print("\nInitializing Geometry...") self.geo = GeometrySetup(self.config.current_setup) self.spawn_area = self.geo.spawn() self.queue_area = self.geo.queue() self.grass_area = self.geo.grass() self.crosswalk_area = self.geo.crosswalk() self.entry_area = self.geo.entry_polygon() self.exit_area = self.geo.exit_polygon() self.walkable_area = GeometryCollection([ self.spawn_area, self.queue_area, self.grass_area, self.crosswalk_area, ]).unary_union print("Geometry Configuration Complete!") print("\nInitializing Spawn Manager...") self.spawn_manager = SpawnManager(self.spawn_area,min_spacing=0.55) self.spawn_manager.generate_coords() print("Spawn Manager Setup Complete!") print("\nInitializing Crosswalk Controller...") self.crosswalk_controller = CrosswalkController( CrosswalkConfig( trigger_dist=self.config.trigger_dist, activation_time=self.config.activation_time, cooldown_time=self.config.cooldown_time, min_agents_activation=self.config.min_agents_activation, activation_delay=self.config.activation_delay, crosswalk_area=self.crosswalk_area, trigger_area=self.entry_area ), self.config.current_setup ) print("Crosswalk Controller Setup Complete!") print("\nInitializing Agent Setup...") self.all_agents, pop_list = AgentConfig() self.total_agents = len(self.all_agents) print(f"Created {self.total_agents} Agents...") print("Agent Setup Complete!") print("\nInitializing JuPedSim Model...") self.trajectory_file = self.config.trajectory_file self.simulation = jps.Simulation( model=jps.CollisionFreeSpeedModel(), geometry=self.walkable_area, trajectory_writer=jps.SqliteTrajectoryWriter( output_file=pathlib.Path(self.trajectory_file) ), dt=self.config.time_step ) self.journey_setup() self.jps_agent_ids = {} # unsure whether these are necessary # #self.agent_targets = {} #self.active_agents = {} self.iteration_count = 0 self.current_time = 0.0 print(f"\nJuPedSim Model Initialized with:") print(f"\tTime Step: {self.config.time_step} s") print(f"\tMax Iterations: {self.config.max_iterations}") print(f"\tCrosswalk Activation Length: {self.config.activation_time} s") print(f"\tCrosswalk Cooldown Length: {self.config.cooldown_time} s") def journey_setup(self): self.queue_waiting_coords = self.waiting_coords( self.queue_area,self.spawn_manager.min_spacing ) self.queue_stage_id = self.simulation.add_waiting_set_stage( self.queue_waiting_coords ) self.queue_stage = self.simulation.get_stage(self.queue_stage_id) self.queue_stage.state = jps.WaitingSetState.ACTIVE self.crosswalk_entry_id = self.simulation.add_waypoint_stage( self.entry_area, self.spawn_manager.min_spacing ) self.crosswalk_exit_id = self.simulation.add_waypoint_stage( self.exit_area, self.spawn_manager.min_spacing ) self.journey = jps.JourneyDescription([ self.queue_stage_id, self.crosswalk_entry_id, self.crosswalk_exit_id, ]) self.journey.set_transition_for_stage( self.queue_stage_id, jps.Transition.create_fixed_transition( self.crosswalk_entry_id )) self.journey.set_transition_for_stage( self.crosswalk_entry_id, jps.Transition.create_fixed_transition( self.crosswalk_exit_id )) self.journey_id = self.simulation.add_journey( self.journey) def waiting_coords(self,area:Polygon,spacing:float)->List[Tuple[float,float]]: min_x,min_y,max_x,max_y = area.bounds points = [] x = (min_x+spacing)/2 while x