Exercises Writeup

Use this link to find the day 5 exercises.

finite_state_example.py

Source: finite_state_example.py

#!/usr/bin/env python3

""" This code implements the finite state controller described in https://comprobo20.github.io/in-class/day05.

    There are three states: moving forward, moving backward, and turning left.  The initial state is moving_forward.
    The rules for transitioning between these states are as follows.

    moving_forward:
        - if a bump sensor is activated transition to moving backward
    moving_backward:
        - once the obstacle in front of the robot is greater than a threshold, transition to turning left
    turning_left:
        - once 1 second has elapsed, transition to moving_forward
"""


import rospy
from geometry_msgs.msg import Twist
from std_msgs.msg import Int8MultiArray
from sensor_msgs.msg import LaserScan

class FiniteStateController():
    """ This class encompasses the finite state controller ROS Node"""
    def __init__(self):
        rospy.init_node('finite_state_example')
        # the angular velocity when in the "turning_left" state
        self.angular_velocity = 0.5
        # the forward speed when in the "moving_forward" state.  The speed is reversed in the "moving_backward" state.
        self.forward_speed = 0.1
        # the distance threshold to use to transition from "moving_backward" to turning left
        self.distance_threshold = 0.8
        # a class attribute to track the distance to the obstacle in front of the robot.  The last valid measurement
        # is always stored (where valid means not equal to 0).
        self.distance_to_obstacle = 0.0
        # keeps track of whether the robot's bump sensors are active
        self.bump_active = False
        # holds the current state (note: the type of self.state is actually a function!)
        self.state = self.moving_forward

        self.vel_pub = rospy.Publisher("cmd_vel", Twist, queue_size=10)
        rospy.Subscriber('bump', Int8MultiArray, self.process_bump)
        rospy.Subscriber('scan', LaserScan, self.process_scan)

    def process_bump(self, msg):
        """ Handle bump messages by noting whether any of the sensors are active """
        self.bump_active = msg.data[0] == 1 or msg.data[1] == 1 or msg.data[2] == 1 or msg.data[3] == 1

    def process_scan(self, msg):
        """ Handle laser can messages by storing the distance to the obstacle immediately in front of the robot """
        if msg.ranges[0] != 0:
            self.distance_to_obstacle = msg.ranges[0]

    def run(self):
        """ The run loop repeatedly executes the current state function.  Each state function will return a function
            corresponding to the next state to run. """
        # this sleep is to allow any subscribers to cmd_vel to establish a connection to our publisher.  This is only
        # needed in the case where you send the velocity commands once (in some ways sending the commands repeatedly is
        # more robust.
        rospy.sleep(1)
        while not rospy.is_shutdown():
            self.state = self.state()

    def moving_forward(self):
        """ Implements the moving forward state.  The next state to run is returned by the function. """
        m = Twist()
        m.linear.x = self.forward_speed
        self.vel_pub.publish(m)
        r = rospy.Rate(10)
        while not rospy.is_shutdown():
            if self.bump_active:
                return self.moving_backward
            r.sleep()

    def moving_backward(self):
        """ Implements the moving backward state.  The next state to run is returned by the function. """
        m = Twist()
        m.linear.x = -self.forward_speed
        self.vel_pub.publish(m)
        r = rospy.Rate(10)
        while not rospy.is_shutdown():
            # check distance to forward obstacle to decide when we've reversed "enough"
            if self.distance_to_obstacle > self.distance_threshold:
                return self.rotating_left
            r.sleep()

    def rotating_left(self):
        """ Implements the rotating left state.  The next state to run is returned by the function. """
        m = Twist()
        m.angular.z = self.angular_velocity
        self.vel_pub.publish(m)
        # the transition to the next state is purely time-based, so we can use rospy.sleep
        rospy.sleep(1.0)
        return self.moving_forward

if __name__ == '__main__':
    node = FiniteStateController()
    node.run()

finite_state_example_smach.py (example of using the smach library)

Source: finite_state_example_smach.py

#!/usr/bin/env python3

""" This code implements the finite state controller described in https://comprobo20.github.io/in-class/day05.
    In this version we will use the ROS smach library
    There are three states: moving forward, moving backward, and turning left.  The initial state is moving_forward.
    The rules for transitioning between these states are as follows.

    moving_forward:
        - if a bump sensor is activated transition to moving backward
    moving_backward:
        - once the obstacle in front of the robot is greater than a threshold, transition to turning left
    turning_left:
        - once 1 second has elapsed, transition to moving_forward
"""


import rospy
import smach
# if you want to try to the smach viewer, consider uncommenting these lines
# import smach_ros
from geometry_msgs.msg import Twist
from std_msgs.msg import Int8MultiArray
from sensor_msgs.msg import LaserScan

class FiniteStateControllerSmach():
    """ This class encompasses the finite state controller ROS Node"""
    def __init__(self):
        rospy.init_node('finite_state_example')
        # the angular velocity when in the "turning_left" state
        self.angular_velocity = 0.5
        # the forward speed when in the "moving_forward" state.  The speed is reversed in the "moving_backward" state.
        self.forward_speed = 0.1
        # the distance threshold to use to transition from "moving_backward" to turning left
        self.distance_threshold = 0.8

        self.vel_pub = rospy.Publisher("cmd_vel", Twist, queue_size=10)

    def run(self):
        # needed in the case where you send the velocity commands once (in some ways sending the commands repeatedly is
        # more robust.
        rospy.sleep(1)

        # Create a SMACH state machine
        sm = smach.StateMachine(outcomes=[])

        # Open the container
        with sm:
            # Add states to the container
            smach.StateMachine.add('moving_forward', MovingForward(self.forward_speed, self.vel_pub),
                                   transitions={'bumped object': 'moving_backward'})
            smach.StateMachine.add('moving_backward', MovingBackward(self.vel_pub, self.forward_speed, self.distance_threshold),
                                   transitions={'obstacle far enough away': 'rotating_left'})
            smach.StateMachine.add('rotating_left', RotatingLeft(self.vel_pub, self.angular_velocity),
                                   transitions={'rotated for 1 second': 'moving_forward'})

        # Create and start the introspection server
        # if you want to try smach viewer, consider uncommenting these lines
        # sis = smach_ros.IntrospectionServer('server_name', sm, '/SM_ROOT')
        # sis.start()
        # Execute SMACH plan
        outcome = sm.execute()

class MovingForward(smach.State):
    """ Implements the moving forward state. """

    def __init__(self, forward_speed, vel_pub):
        self.forward_speed = forward_speed
        self.vel_pub = vel_pub
        # keeps track of whether the robot's bump sensors are active
        self.bump_active = False
        rospy.Subscriber('bump', Int8MultiArray, self.process_bump)
        smach.State.__init__(self, outcomes=['bumped object'])

    def process_bump(self, msg):
        """ Handle bump messages by noting whether any of the sensors are active """
        self.bump_active = msg.data[0] == 1 or msg.data[1] == 1 or msg.data[2] == 1 or msg.data[3] == 1

    def execute(self, user_data):
        m = Twist()
        m.linear.x = self.forward_speed
        self.vel_pub.publish(m)
        r = rospy.Rate(10)
        while not rospy.is_shutdown():
            if self.bump_active:
                return 'bumped object'
            r.sleep()

class MovingBackward(smach.State):
    """ Implements the moving backward state. """

    def __init__(self, vel_pub, forward_speed, distance_threshold):
        self.vel_pub = vel_pub
        self.forward_speed = forward_speed
        self.distance_threshold = distance_threshold
        # a class attribute to track the distance to the obstacle in front of the robot.  The last valid measurement
        # is always stored (where valid means not equal to 0).
        self.distance_to_obstacle = 0.0
        rospy.Subscriber('scan', LaserScan, self.process_scan)
        smach.State.__init__(self, outcomes=['obstacle far enough away'])

    def process_scan(self, msg):
        """ Handle laser can messages by storing the distance to the obstacle immediately in front of the robot """
        if msg.ranges[0] != 0:
            self.distance_to_obstacle = msg.ranges[0]

    def execute(self, user_data):
        m = Twist()
        m.linear.x = -self.forward_speed
        self.vel_pub.publish(m)
        r = rospy.Rate(10)
        while not rospy.is_shutdown():
            # check distance to forward obstacle to decide when we've reversed "enough"
            if self.distance_to_obstacle > self.distance_threshold:
                return 'obstacle far enough away'
            r.sleep()

class RotatingLeft(smach.State):
    """ Implements the rotating left state.  The next state to run is returned by the function. """

    def __init__(self,  vel_pub, angular_velocity):
        self.vel_pub = vel_pub
        self.angular_velocity = angular_velocity
        smach.State.__init__(self, outcomes=['rotated for 1 second'])

    def execute(self, user_data):
        m = Twist()
        m.angular.z = self.angular_velocity
        self.vel_pub.publish(m)
        # the transition to the next state is purely time-based, so we can use rospy.sleep
        rospy.sleep(1.0)
        return 'rotated for 1 second'

if __name__ == '__main__':
    node = FiniteStateControllerSmach()
    node.run()