Exercises Writeup
Use this link to find the day 5 exercises.
finite_state_example.py
Source: finite_state_example.py
#!/usr/bin/env python3
""" This code implements the finite state controller described in https://comprobo20.github.io/in-class/day05.
There are three states: moving forward, moving backward, and turning left. The initial state is moving_forward.
The rules for transitioning between these states are as follows.
moving_forward:
- if a bump sensor is activated transition to moving backward
moving_backward:
- once the obstacle in front of the robot is greater than a threshold, transition to turning left
turning_left:
- once 1 second has elapsed, transition to moving_forward
"""
import rospy
from geometry_msgs.msg import Twist
from std_msgs.msg import Int8MultiArray
from sensor_msgs.msg import LaserScan
class FiniteStateController():
""" This class encompasses the finite state controller ROS Node"""
def __init__(self):
rospy.init_node('finite_state_example')
# the angular velocity when in the "turning_left" state
self.angular_velocity = 0.5
# the forward speed when in the "moving_forward" state. The speed is reversed in the "moving_backward" state.
self.forward_speed = 0.1
# the distance threshold to use to transition from "moving_backward" to turning left
self.distance_threshold = 0.8
# a class attribute to track the distance to the obstacle in front of the robot. The last valid measurement
# is always stored (where valid means not equal to 0).
self.distance_to_obstacle = 0.0
# keeps track of whether the robot's bump sensors are active
self.bump_active = False
# holds the current state (note: the type of self.state is actually a function!)
self.state = self.moving_forward
self.vel_pub = rospy.Publisher("cmd_vel", Twist, queue_size=10)
rospy.Subscriber('bump', Int8MultiArray, self.process_bump)
rospy.Subscriber('scan', LaserScan, self.process_scan)
def process_bump(self, msg):
""" Handle bump messages by noting whether any of the sensors are active """
self.bump_active = msg.data[0] == 1 or msg.data[1] == 1 or msg.data[2] == 1 or msg.data[3] == 1
def process_scan(self, msg):
""" Handle laser can messages by storing the distance to the obstacle immediately in front of the robot """
if msg.ranges[0] != 0:
self.distance_to_obstacle = msg.ranges[0]
def run(self):
""" The run loop repeatedly executes the current state function. Each state function will return a function
corresponding to the next state to run. """
# this sleep is to allow any subscribers to cmd_vel to establish a connection to our publisher. This is only
# needed in the case where you send the velocity commands once (in some ways sending the commands repeatedly is
# more robust.
rospy.sleep(1)
while not rospy.is_shutdown():
self.state = self.state()
def moving_forward(self):
""" Implements the moving forward state. The next state to run is returned by the function. """
m = Twist()
m.linear.x = self.forward_speed
self.vel_pub.publish(m)
r = rospy.Rate(10)
while not rospy.is_shutdown():
if self.bump_active:
return self.moving_backward
r.sleep()
def moving_backward(self):
""" Implements the moving backward state. The next state to run is returned by the function. """
m = Twist()
m.linear.x = -self.forward_speed
self.vel_pub.publish(m)
r = rospy.Rate(10)
while not rospy.is_shutdown():
# check distance to forward obstacle to decide when we've reversed "enough"
if self.distance_to_obstacle > self.distance_threshold:
return self.rotating_left
r.sleep()
def rotating_left(self):
""" Implements the rotating left state. The next state to run is returned by the function. """
m = Twist()
m.angular.z = self.angular_velocity
self.vel_pub.publish(m)
# the transition to the next state is purely time-based, so we can use rospy.sleep
rospy.sleep(1.0)
return self.moving_forward
if __name__ == '__main__':
node = FiniteStateController()
node.run()
finite_state_example_smach.py
(example of using the smach
library)
Source: finite_state_example_smach.py
#!/usr/bin/env python3
""" This code implements the finite state controller described in https://comprobo20.github.io/in-class/day05.
In this version we will use the ROS smach library
There are three states: moving forward, moving backward, and turning left. The initial state is moving_forward.
The rules for transitioning between these states are as follows.
moving_forward:
- if a bump sensor is activated transition to moving backward
moving_backward:
- once the obstacle in front of the robot is greater than a threshold, transition to turning left
turning_left:
- once 1 second has elapsed, transition to moving_forward
"""
import rospy
import smach
# if you want to try to the smach viewer, consider uncommenting these lines
# import smach_ros
from geometry_msgs.msg import Twist
from std_msgs.msg import Int8MultiArray
from sensor_msgs.msg import LaserScan
class FiniteStateControllerSmach():
""" This class encompasses the finite state controller ROS Node"""
def __init__(self):
rospy.init_node('finite_state_example')
# the angular velocity when in the "turning_left" state
self.angular_velocity = 0.5
# the forward speed when in the "moving_forward" state. The speed is reversed in the "moving_backward" state.
self.forward_speed = 0.1
# the distance threshold to use to transition from "moving_backward" to turning left
self.distance_threshold = 0.8
self.vel_pub = rospy.Publisher("cmd_vel", Twist, queue_size=10)
def run(self):
# needed in the case where you send the velocity commands once (in some ways sending the commands repeatedly is
# more robust.
rospy.sleep(1)
# Create a SMACH state machine
sm = smach.StateMachine(outcomes=[])
# Open the container
with sm:
# Add states to the container
smach.StateMachine.add('moving_forward', MovingForward(self.forward_speed, self.vel_pub),
transitions={'bumped object': 'moving_backward'})
smach.StateMachine.add('moving_backward', MovingBackward(self.vel_pub, self.forward_speed, self.distance_threshold),
transitions={'obstacle far enough away': 'rotating_left'})
smach.StateMachine.add('rotating_left', RotatingLeft(self.vel_pub, self.angular_velocity),
transitions={'rotated for 1 second': 'moving_forward'})
# Create and start the introspection server
# if you want to try smach viewer, consider uncommenting these lines
# sis = smach_ros.IntrospectionServer('server_name', sm, '/SM_ROOT')
# sis.start()
# Execute SMACH plan
outcome = sm.execute()
class MovingForward(smach.State):
""" Implements the moving forward state. """
def __init__(self, forward_speed, vel_pub):
self.forward_speed = forward_speed
self.vel_pub = vel_pub
# keeps track of whether the robot's bump sensors are active
self.bump_active = False
rospy.Subscriber('bump', Int8MultiArray, self.process_bump)
smach.State.__init__(self, outcomes=['bumped object'])
def process_bump(self, msg):
""" Handle bump messages by noting whether any of the sensors are active """
self.bump_active = msg.data[0] == 1 or msg.data[1] == 1 or msg.data[2] == 1 or msg.data[3] == 1
def execute(self, user_data):
m = Twist()
m.linear.x = self.forward_speed
self.vel_pub.publish(m)
r = rospy.Rate(10)
while not rospy.is_shutdown():
if self.bump_active:
return 'bumped object'
r.sleep()
class MovingBackward(smach.State):
""" Implements the moving backward state. """
def __init__(self, vel_pub, forward_speed, distance_threshold):
self.vel_pub = vel_pub
self.forward_speed = forward_speed
self.distance_threshold = distance_threshold
# a class attribute to track the distance to the obstacle in front of the robot. The last valid measurement
# is always stored (where valid means not equal to 0).
self.distance_to_obstacle = 0.0
rospy.Subscriber('scan', LaserScan, self.process_scan)
smach.State.__init__(self, outcomes=['obstacle far enough away'])
def process_scan(self, msg):
""" Handle laser can messages by storing the distance to the obstacle immediately in front of the robot """
if msg.ranges[0] != 0:
self.distance_to_obstacle = msg.ranges[0]
def execute(self, user_data):
m = Twist()
m.linear.x = -self.forward_speed
self.vel_pub.publish(m)
r = rospy.Rate(10)
while not rospy.is_shutdown():
# check distance to forward obstacle to decide when we've reversed "enough"
if self.distance_to_obstacle > self.distance_threshold:
return 'obstacle far enough away'
r.sleep()
class RotatingLeft(smach.State):
""" Implements the rotating left state. The next state to run is returned by the function. """
def __init__(self, vel_pub, angular_velocity):
self.vel_pub = vel_pub
self.angular_velocity = angular_velocity
smach.State.__init__(self, outcomes=['rotated for 1 second'])
def execute(self, user_data):
m = Twist()
m.angular.z = self.angular_velocity
self.vel_pub.publish(m)
# the transition to the next state is purely time-based, so we can use rospy.sleep
rospy.sleep(1.0)
return 'rotated for 1 second'
if __name__ == '__main__':
node = FiniteStateControllerSmach()
node.run()