diff --git a/src/scenic/simulators/carla/model.scenic b/src/scenic/simulators/carla/model.scenic index 6fbb94ffb..d2fe3040a 100644 --- a/src/scenic/simulators/carla/model.scenic +++ b/src/scenic/simulators/carla/model.scenic @@ -51,6 +51,7 @@ from scenic.simulators.utils.colors import Color # Sensor imports from scenic.simulators.carla.sensors import CarlaRGBSensor as RGBSensor from scenic.simulators.carla.sensors import CarlaSSSensor as SSSensor +from scenic.simulators.carla.sensors import CarlaCollisionSensor as CollisionSensor try: from scenic.simulators.carla.simulator import CarlaSimulator # for use in scenarios diff --git a/src/scenic/simulators/carla/sensors.py b/src/scenic/simulators/carla/sensors.py index f5d64e15d..95ce224e1 100644 --- a/src/scenic/simulators/carla/sensors.py +++ b/src/scenic/simulators/carla/sensors.py @@ -72,3 +72,72 @@ def process(self, data): array = array[:, :, 2] # Take only R return array.copy() + + +class CarlaCollisionSensor(CallbackSensor): + """Collision sensor that detects collisions with other actors/objects.""" + + blueprint = "sensor.other.collision" + + def __init__(self): + super().__init__() + self.collision_history = [] + self.has_collision = False + self.last_collision_intensity = 0.0 + self.last_collision_data = None + self.frame = 0 + self._initialized = False + self._is_event_based = True + + self.offset = (0, 0, 0) + self.rotation = (0, 0, 0) + self.attributes = {} + self.convert = None + self.carla_sensor = None + + def onData(self, event): + self.frame = event.frame + self._initialized = True + + impulse = event.normal_impulse + intensity = np.sqrt(impulse.x**2 + impulse.y**2 + impulse.z**2) + + collision_info = { + "frame": event.frame, + "timestamp": event.timestamp, + "intensity": intensity, + "other_actor": event.other_actor.type_id if event.other_actor else None, + "impulse": (impulse.x, impulse.y, impulse.z), + } + + self.collision_history.append(collision_info) + self.has_collision = True + self.last_collision_intensity = intensity + self.last_collision_data = collision_info + + def process(self, data): + return self.last_collision_data + + def getObservation(self): + return { + "has_collision": self.has_collision, + "last_collision_intensity": self.last_collision_intensity, + "last_collision_data": self.last_collision_data, + "collision_history": self.collision_history, + } + + def updateFrame(self, frame): + """Advance frame number for event-based sensors that didn't fire this tick.""" + if not self._initialized: + self._initialized = True + if self.frame < frame: + self.frame = frame + + def reset(self): + """Reset collision state between simulations.""" + self.collision_history = [] + self.has_collision = False + self.last_collision_intensity = 0.0 + self.last_collision_data = None + self.frame = 0 + self._initialized = False diff --git a/src/scenic/simulators/carla/simulator.py b/src/scenic/simulators/carla/simulator.py index e69b4f860..30a24349b 100644 --- a/src/scenic/simulators/carla/simulator.py +++ b/src/scenic/simulators/carla/simulator.py @@ -19,6 +19,7 @@ from scenic.core.simulators import SimulationCreationError from scenic.domains.driving.simulators import DrivingSimulation, DrivingSimulator from scenic.simulators.carla.blueprints import oldBlueprintNames +from scenic.simulators.carla.sensors import CarlaCollisionSensor import scenic.simulators.carla.utils.utils as utils import scenic.simulators.carla.utils.visuals as visuals from scenic.syntax.veneer import verbosePrint @@ -250,6 +251,11 @@ def createObjectInSimulator(self, obj): ) obj.carlaController = controller + # Auto-attach a collision sensor to the ego so users can check + # ego.sensors['collision'].has_collision without explicit declaration. + if obj is self.scene.egoObject and "collision" not in obj.sensors: + obj.sensors["collision"] = CarlaCollisionSensor() + # Adding sensors if available if obj.sensors: for sensor_key, sensor in obj.sensors.items(): @@ -301,12 +307,18 @@ def step(self): # Run simulation for one timestep self.current_frame = self.world.tick() - # Wait for sensors to get updates + # Wait for sensors to deliver this frame's data. Event-based sensors + # (e.g. collision) don't fire every tick, so just bump their frame + # number rather than blocking on it. for obj in self.objects: if obj.sensors: for sensor in obj.sensors.values(): - while sensor.frame != self.current_frame: - pass + if getattr(sensor, "_is_event_based", False): + if hasattr(sensor, "updateFrame"): + sensor.updateFrame(self.current_frame) + else: + while sensor.frame != self.current_frame: + pass # Render simulation if self.render: @@ -352,6 +364,8 @@ def destroy(self): if sensor.carla_sensor is not None and sensor.carla_sensor.is_alive: sensor.carla_sensor.stop() sensor.carla_sensor.destroy() + if hasattr(sensor, "reset"): + sensor.reset() if obj.carlaActor is not None: if isinstance(obj.carlaActor, carla.Vehicle): obj.carlaActor.set_autopilot(False, self.tm.get_port()) diff --git a/src/scenic/simulators/carla/utils/visuals.py b/src/scenic/simulators/carla/utils/visuals.py index 4995c0ee9..540a89184 100644 --- a/src/scenic/simulators/carla/utils/visuals.py +++ b/src/scenic/simulators/carla/utils/visuals.py @@ -103,6 +103,25 @@ def tick(self, world, ego, clock, showLabels=True): "Height: % 18.0f m" % t.location.z, ] + collision = ( + getattr(ego, "sensors", {}).get("collision") + if hasattr(ego, "sensors") + else None + ) + if collision is not None: + if getattr(collision, "has_collision", False): + intensity = getattr(collision, "last_collision_intensity", 0.0) + data = getattr(collision, "last_collision_data", None) + other = data.get("other_actor") if isinstance(data, dict) else None + value = ( + f"hit {other} ({intensity:.0f})" + if other + else f"detected ({intensity:.0f})" + ) + self._info_text.append(("Collision:", value, (255, 80, 80))) + else: + self._info_text.append(("Collision:", "none", (80, 220, 80))) + try: _control_text = [ "", @@ -158,6 +177,15 @@ def render(self, display): item = None v_offset += 18 elif isinstance(item, tuple): + if len(item) == 3 and isinstance(item[2], tuple): + label_surface = self._font_mono.render(item[0], True, (255, 255, 255)) + display.blit(label_surface, (8, v_offset)) + value_surface = self._font_mono.render( + " " + str(item[1]), True, item[2] + ) + display.blit(value_surface, (8 + label_surface.get_width(), v_offset)) + v_offset += 18 + continue if isinstance(item[1], bool): rect = pygame.Rect((bar_h_offset, v_offset + 8), (6, 6)) pygame.draw.rect(display, (255, 255, 255), rect, 0 if item[1] else 1) diff --git a/src/scenic/simulators/newtonian/simulator.py b/src/scenic/simulators/newtonian/simulator.py index 4c23adaec..f67855ac6 100644 --- a/src/scenic/simulators/newtonian/simulator.py +++ b/src/scenic/simulators/newtonian/simulator.py @@ -10,6 +10,8 @@ from PIL import Image import numpy as np +import shapely.affinity +import shapely.geometry import scenic.core.errors as errors # isort: skip @@ -59,16 +61,30 @@ class NewtonianSimulator(DrivingSimulator): when not otherwise specified is still 0.1 seconds. """ - def __init__(self, network=None, render=False, debug_render=False, export_gif=False): + def __init__( + self, + network=None, + render=False, + debug_render=False, + export_gif=False, + enable_crash_physics=True, + ): super().__init__() self.export_gif = export_gif self.render = render self.debug_render = debug_render self.network = network + self.enable_crash_physics = enable_crash_physics def createSimulation(self, scene, **kwargs): simulation = NewtonianSimulation( - scene, self.network, self.render, self.export_gif, self.debug_render, **kwargs + scene, + self.network, + self.render, + self.export_gif, + self.debug_render, + self.enable_crash_physics, + **kwargs, ) if self.export_gif and self.render: simulation.generate_gif("simulation.gif") @@ -79,7 +95,15 @@ class NewtonianSimulation(DrivingSimulation): """Implementation of `Simulation` for the Newtonian simulator.""" def __init__( - self, scene, network, render, export_gif, debug_render, timestep, **kwargs + self, + scene, + network, + render, + export_gif, + debug_render, + enable_crash_physics, + timestep, + **kwargs, ): self.export_gif = export_gif self.render = render @@ -87,6 +111,7 @@ def __init__( self.screen = None self.frames = [] self.debug_render = debug_render + self.enable_crash_physics = enable_crash_physics if timestep is None: timestep = 0.1 @@ -190,11 +215,199 @@ def createObjectInSimulator(self, obj): if hasattr(obj, "elevation"): obj.elevation = 0.0 + # Default mass for crash physics (typical car ~1500 kg, pedestrian ~70 kg). + if not hasattr(obj, "mass"): + obj.mass = 1500 if getattr(obj, "isCar", False) else 70 + + obj.crashed = False + obj.crash_angular_velocity = 0.0 + def isOnScreen(self, x, y): return self.min_x <= x <= self.max_x and self.min_y <= y <= self.max_y + def handle_collisions(self): + """Detect and resolve all pairwise collisions in the current timestep.""" + for i, obj1 in enumerate(self.objects): + for obj2 in self.objects[i + 1 :]: + if self.detect_collision(obj1, obj2): + self.resolve_collision(obj1, obj2) + + def get_object_polygon(self, obj): + return shapely.geometry.Polygon(obj._corners2D) + + def detect_collision(self, obj1, obj2): + return self.get_object_polygon(obj1).intersects(self.get_object_polygon(obj2)) + + def resolve_collision(self, obj1, obj2, restitution=0.5): + """Apply 2D collision response between two objects. + + Computes the collision normal via SAT, applies an impulse along it, and + positionally separates the objects to remove penetration. + """ + poly1 = self.get_object_polygon(obj1) + poly2 = self.get_object_polygon(obj2) + if not poly1.intersects(poly2): + return + + collision_normal, penetration_depth = self.compute_collision_normal( + poly1, poly2, obj1, obj2 + ) + if collision_normal is None: + return + + nx, ny = collision_normal.x, collision_normal.y + rel_vel = obj2.velocity - obj1.velocity + vel_along_normal = rel_vel.x * nx + rel_vel.y * ny + + # Already separating; no impulse needed. + if vel_along_normal > 0: + return + + impulse_scalar = -(1 + restitution) * vel_along_normal + impulse_scalar /= 1 / obj1.mass + 1 / obj2.mass + impulse = Vector(impulse_scalar * nx, impulse_scalar * ny) + obj1.velocity -= impulse / obj1.mass + obj2.velocity += impulse / obj2.mass + + if self.enable_crash_physics: + self.apply_crash_physics(obj1, obj2, collision_normal, abs(vel_along_normal)) + + # Split the penetration evenly between the two objects. + separation_factor = 0.5 + obj1.position -= collision_normal * penetration_depth * separation_factor + obj2.position += collision_normal * penetration_depth * separation_factor + + def apply_crash_physics(self, obj1, obj2, collision_normal, impact_speed): + """Mark objects crashed and apply post-impact dynamics for cars/pedestrians.""" + CAR_CRASH_THRESHOLD = 3.0 # m/s (~10 km/h) for car-car crashes + PEDESTRIAN_HIT_THRESHOLD = 1.5 # m/s (~5 km/h) for car-pedestrian hits + + is_car1 = getattr(obj1, "isCar", False) + is_car2 = getattr(obj2, "isCar", False) + is_pedestrian1 = ( + not is_car1 and hasattr(obj1, "control") and "speed" in obj1.control + ) + is_pedestrian2 = ( + not is_car2 and hasattr(obj2, "control") and "speed" in obj2.control + ) + + # Car-pedestrian collision + if (is_car1 and is_pedestrian2) or (is_car2 and is_pedestrian1): + if impact_speed < PEDESTRIAN_HIT_THRESHOLD: + return + pedestrian = obj2 if is_pedestrian2 else obj1 + car = obj1 if is_car1 else obj2 + + hit_severity = min(impact_speed / 15.0, 1.0) + pedestrian.crashed = True + pedestrian.velocity = car.velocity * (hit_severity * 0.3) + pedestrian.crash_angular_velocity = 0.0 + + car.crashed = True + car.crash_angular_velocity = 0.0 + car.velocity *= 1.0 - hit_severity * 0.5 + return + + # Car-car collision + if impact_speed < CAR_CRASH_THRESHOLD: + return + + if is_car1: + crash_severity = min(impact_speed / 20.0, 1.0) + obj1.crashed = True + obj1.crash_angular_velocity = ( + (np.random.random() - 0.5) * crash_severity * 3.0 + ) + obj1.velocity *= 1.0 - crash_severity * 0.7 + if is_car2: + crash_severity = min(impact_speed / 20.0, 1.0) + obj2.crashed = True + obj2.crash_angular_velocity = ( + (np.random.random() - 0.5) * crash_severity * 3.0 + ) + obj2.velocity *= 1.0 - crash_severity * 0.7 + + def compute_collision_normal(self, poly1, poly2, obj1, obj2): + """Compute collision normal + penetration depth via SAT. + + Returns (Vector, float) or (None, 0) if the objects aren't actually overlapping. + Falls back to a center-to-center direction if the SAT search yields nothing. + """ + try: + intersection = poly1.intersection(poly2) + if intersection.is_empty: + return None, 0 + + min_overlap = float("inf") + best_normal = None + coords1 = list(poly1.exterior.coords) + coords2 = list(poly2.exterior.coords) + + for coords in (coords1, coords2): + for i in range(len(coords) - 1): + edge_x = coords[i + 1][0] - coords[i][0] + edge_y = coords[i + 1][1] - coords[i][1] + edge_len = math.hypot(edge_x, edge_y) + if edge_len < 0.001: + continue + normal_x = -edge_y / edge_len + normal_y = edge_x / edge_len + + proj1 = [c[0] * normal_x + c[1] * normal_y for c in coords1[:-1]] + proj2 = [c[0] * normal_x + c[1] * normal_y for c in coords2[:-1]] + overlap = min(max(proj1), max(proj2)) - max(min(proj1), min(proj2)) + if 0 < overlap < min_overlap: + min_overlap = overlap + c1 = obj1.position.x * normal_x + obj1.position.y * normal_y + c2 = obj2.position.x * normal_x + obj2.position.y * normal_y + if c2 > c1: + best_normal = Vector(normal_x, normal_y) + else: + best_normal = Vector(-normal_x, -normal_y) + + if best_normal is None: + dx = obj2.position.x - obj1.position.x + dy = obj2.position.y - obj1.position.y + distance = math.hypot(dx, dy) + if distance < 0.001: + return None, 0 + best_normal = Vector(dx / distance, dy / distance) + min_overlap = intersection.area**0.5 + return best_normal, min_overlap + except Exception: + dx = obj2.position.x - obj1.position.x + dy = obj2.position.y - obj1.position.y + distance = math.hypot(dx, dy) + if distance < 0.001: + return None, 0 + normal = Vector(dx / distance, dy / distance) + penetration = min(obj1.width, obj1.height, obj2.width, obj2.height) * 0.1 + return normal, penetration + def step(self): for obj in self.objects: + # Crashed cars: apply heavy friction and uncontrolled spin, ignore inputs. + if obj.crashed and getattr(obj, "isCar", False): + obj.velocity *= 0.92 + obj.angularSpeed = obj.crash_angular_velocity + obj.crash_angular_velocity *= 0.95 + obj.throttle = 0 + obj.brake = 1.0 + obj.steer = 0 + obj.position += obj.velocity * self.timestep + obj.heading += obj.angularSpeed * self.timestep + obj.speed = obj.velocity.norm() + continue + + # Crashed pedestrians: freeze controls and decelerate. + if obj.crashed and hasattr(obj, "control") and "speed" in obj.control: + obj.velocity *= 0.85 + obj.control["speed"] = 0 + obj.control["heading"] = obj.heading + obj.position += obj.velocity * self.timestep + obj.speed = obj.velocity.norm() + continue + current_speed = obj.velocity.norm() # 1) Pedestrians using walking controls (SetWalkingSpeed/Direction) if ( @@ -250,6 +463,9 @@ def step(self): obj.position += obj.velocity * self.timestep obj.heading += obj.angularSpeed * self.timestep + # 5) Detect and resolve any pairwise collisions for this tick. + self.handle_collisions() + if self.render: # Handle closing out pygame screen for event in pygame.event.get():