Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
360 changes: 282 additions & 78 deletions src/scenic/domains/driving/behaviors.scenic

Large diffs are not rendered by default.

166 changes: 165 additions & 1 deletion src/scenic/domains/driving/controllers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,14 @@
"""

from collections import deque
import math

import numpy as np
from shapely.geometry import LineString, MultiPoint, Point as ShapelyPoint

from scenic.core.regions import CircularRegion, PolylineRegion
from scenic.domains.driving.actions import *
from scenic.domains.driving.roads import ManeuverType


class PIDLongitudinalController:
Expand Down Expand Up @@ -80,7 +86,7 @@ def __init__(self, K_P=0.3, K_D=0.2, K_I=0, dt=0.1):
self.windup_guard = 20.0
self.output = 0

def run_step(self, cte):
def run_step(self, input_trajectory, ego, opposite_traffic):
"""Estimate the steering angle of the vehicle based on the PID equations.

Arguments:
Expand All @@ -89,6 +95,14 @@ def run_step(self, cte):
Returns:
a signal between -1 and 1, with -1 meaning maximum steering to the left.
"""

nearest_line_points = input_trajectory.nearestSegmentTo(ego.position)
nearest_line_segment = PolylineRegion(nearest_line_points)
cte = nearest_line_segment.signedDistanceTo(ego.position)

if opposite_traffic:
cte = -1 * cte

error = cte
delta_error = error - self.last_error
self.PTerm = self.Kp * error
Expand All @@ -107,3 +121,153 @@ def run_step(self, cte):
self.output = self.PTerm + (self.Ki * self.ITerm) + (self.Kd * self.DTerm)

return np.clip(self.output, -1, 1)


class PurePursuitLateralController:
"""
Pure Pursuit Controller

Arguments:
wb: wheelbase length
ld: lookahead distance
dt: time step

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably pull in timestep directly from the simulator (see above comment)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timestep is taken from simulator.timestep and then passed to the controller upon initialization. See line 49 in Scenic/src/scenic/domains/driving/simulators.py

cl: car length

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can go ahead and pull this from self in run_step instead of having it as a parameter.

@aryavenkatesan aryavenkatesan Aug 2, 2025

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed it so the car length gets pulled from the ego. Do you think the lookahead distance should be able to be changed as a parameter in run_step? There may be cases where it would lead to better performance but in the papers i've read usually its fixed for the algorithm.

clwbr: car length to wheel base ratio
"""

def __init__(self, cl, ld=7, dt=0.1, clwbr=0.65):
"""
Todo:
find the actual wheelbase and update the default number
- it says car length is 4.5 meters, but im not sure if thats wheelbase
experiment with the lookahead distance to see what works the best
"""
self.dt = dt
self.wb = cl * clwbr
self.clwbr = clwbr
self.ld = ld
self.past_cte = 0
self.max_steering_angle = np.arctan((2 * self.wb) / self.ld)

def run_step(self, input_trajectory, ego, opposite_traffic):
"""Estimate the steering angle of the vehicle based on the pure pursuit equations.

Arguments:
cte: cross-track error (distance to right of desired trajectory)

Returns:
a signal between -1 and 1, with -1 meaning maximum steering to the left.

Todo:
Find if the equation needs to be updated to consider negative vs positive cte
find what maximum steering distance is in radians and scale the return value accordingly
-It is not listed in the documentation


#takes current position and the path
#add plot
"""
# Define variables
lookahead_distance = self.ld
circlular_region = CircularRegion(ego.position, lookahead_distance, resolution=64)
polyline_circle = circlular_region.boundary
shapely_boundary = (
polyline_circle.lineString
) # extract shapely circle and shapely path
line = input_trajectory.lineString
distance = input_trajectory.lineString.project(
ShapelyPoint(ego.position.coordinates[0], ego.position.coordinates[1])
)
coords = []
try:
coords = list(line.coords) # lineString case
except NotImplementedError:
for geom in line.geoms: # multilinestring case
coords.extend(list(geom.coords))

# Splitting the path into two parts, the part in front of the ego and behind it

split_trajectory = []
for j, p in enumerate(coords):
pd = line.project(ShapelyPoint(p[0], p[1]))
if pd == distance:
split_trajectory = [
LineString(coords[: j + 1]),
LineString(coords[j:]),
] # shapely.errors.GEOSException: IllegalArgumentException: point array must contain 0 or >1 elements
break
if pd > distance:
cp = line.interpolate(distance)
split_trajectory = [
LineString(coords[:j] + [(cp.x, cp.y, 0)]),
LineString([(cp.x, cp.y, 0)] + coords[j:]),
]
break

# try:
# split_trajectory = []
# for j, p in enumerate(coords):
# pd = line.project(ShapelyPoint(p[0], p[1]))
# if pd == distance:
# split_trajectory = [
# LineString(coords[:j+1]),
# LineString(coords[j:])]
# break
# if pd > distance:
# cp = line.interpolate(distance)
# split_trajectory = [
# LineString(coords[:j] + [(cp.x, cp.y, 0)]),
# LineString([(cp.x, cp.y, 0)] + coords[j:])]
# break
# except Exception as e: # ValueError is the closest built-in to "illegal argument" in Python
# print(f"Illegal argument encountered while splitting trajectory: {e}")
# steering_angle = np.arctan((2 * self.wb * math.sin(self.past_cte)) / self.ld)
# rv = np.clip((steering_angle / np.radians(35)), -1, 1)

# return rv

# Get intersection points of the circle with the second half of the path

shapely_intersection = shapely_boundary.intersection(split_trajectory[1])

if isinstance(shapely_intersection, ShapelyPoint):
candidate_points = [shapely_intersection]
elif isinstance(shapely_intersection, MultiPoint):
candidate_points = list(shapely_intersection.geoms)
else:
candidate_points = []
assert all(isinstance(p, ShapelyPoint) for p in candidate_points)
Comment thread
aryavenkatesan marked this conversation as resolved.

# Find lookahead point by traversing the path

if len(candidate_points) == 0:
# print("No candidate point found")
# Sometimes no point is found, this is an error
# In that case, dont touch steering wheel
# Pure Pursuit is a self correcting algorithm so it should be fine
cte = self.past_cte

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we handle reaching the end of a desired trajectory? We might want to add some proper handling for this. Any thoughts @psduggirala?

else:
candidate_points.sort(
key=lambda point: split_trajectory[1].project(
ShapelyPoint(point.x, point.y)
)
)
lookahead_point = candidate_points[0]
Comment thread
aryavenkatesan marked this conversation as resolved.

# Find the theta value/cte to feed to the pure pursuit algorithm

theta = math.atan2(
lookahead_point.y - ego.position.coordinates[1],
lookahead_point.x - ego.position.coordinates[0],
)
cte = (ego.heading + math.pi / 2) - theta
self.past_cte = cte

if opposite_traffic:
cte = -1 * cte

# perform pure pursuit calculation and normalize it from -1 to 1
steering_angle = np.arctan((2 * self.wb * math.sin(cte)) / self.ld)
rv = np.clip((steering_angle / np.radians(35)), -1, 1)

return rv
11 changes: 11 additions & 0 deletions src/scenic/domains/driving/simulators.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from scenic.domains.driving.controllers import (
PIDLateralController,
PIDLongitudinalController,
PurePursuitLateralController,
)


Expand Down Expand Up @@ -44,6 +45,16 @@ def getLaneFollowingControllers(self, agent):
lat_controller = PIDLateralController(K_P=0.2, K_D=0.1, K_I=0.0, dt=dt)
return lon_controller, lat_controller

def getPurePursuitControllers(self, agent, cl=4.5, ld=7, clwbr=0.65):
dt = self.timestep

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above, let's grab this directly from simulator().timestep

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this to dt = Simulation.timestep and got AttributeError: type object 'Simulation' has no attribute 'timestep'

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, simulator is an undefined variable

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make this being passed from scenic to python


lon_controller = PIDLongitudinalController(K_P=0.5, K_D=0.1, K_I=0.7, dt=dt)
lat_controller = PurePursuitLateralController(
cl=agent.length, ld=7, dt=dt, clwbr=0.65
)

return lon_controller, lat_controller

def getTurningControllers(self, agent):
"""Get longitudinal and lateral controllers for turning."""
dt = self.timestep
Expand Down
2 changes: 2 additions & 0 deletions src/scenic/utils/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
plots/*
!plots/.gitkeep
Empty file added src/scenic/utils/__init__.py
Empty file.
Loading