from warnings import simplefilter, warn
from controller import Supervisor
from deepbots.supervisor.controllers.deepbots_supervisor_env import \
DeepbotsSupervisorEnv
[docs]class EmitterReceiverSupervisorEnv(DeepbotsSupervisorEnv):
"""
This is the base class for the emitter - receiver scheme.
Subclasses implement a variety of communication formats such as CSV
messages.
"""
def __init__(self,
emitter_name="emitter",
receiver_name="receiver",
timestep=None):
"""
The constructor sets up the timestep and calls the method that
initializes the emitter and receiver devices with the names provided.
:param emitter_name: The name of the emitter device on the
supervisor node
:param receiver_name: The name of the receiver device on the
supervisor node
:param timestep: The supervisor controller timestep
"""
super(EmitterReceiverSupervisorEnv, self).__init__()
if timestep is None:
self.timestep = int(self.getBasicTimeStep())
else:
self.timestep = timestep
self.emitter, self.receiver = self.initialize_comms(
emitter_name, receiver_name)
[docs] def initialize_comms(self, emitter_name, receiver_name):
"""
Initializes the emitter and receiver devices with the names provided.
:param emitter_name: The name of the emitter device on the
supervisor node
:param receiver_name: The name of the receiver device on the
supervisor node
:return: The initialized emitter and receiver references
"""
emitter = self.getDevice(emitter_name)
receiver = self.getDevice(receiver_name)
receiver.enable(self.timestep)
return emitter, receiver
[docs] def step(self, action):
"""
The basic step method that steps the controller,
calls the method that sends the action through the emitter
and returns the (observations, reward, done, info) object.
:param action: Whatever the use-case uses as an action, e.g.
an integer representing discrete actions
:type action: Defined by the implementation of handle_emitter
:return: (observations, reward, done, info) as provided by the
corresponding methods as implemented for the use-case
"""
self.handle_emitter(action)
if super(Supervisor, self).step(self.timestep) == -1:
exit()
return (
self.get_observations(),
self.get_reward(action),
self.is_done(),
self.get_info(),
)
[docs] def handle_emitter(self, action):
"""
This method is implemented by subclasses depending on the
communication format used.
:param action: The action that is sent through the emitter device
to the robot, e.g. an integer representing discrete actions
"""
raise NotImplementedError
[docs] def handle_receiver(self):
"""
This method is implemented by subclasses depending on the
communication format used.
"""
raise NotImplementedError
[docs] def get_timestep(self):
# The filter is required so as to not ignore the Deprecation warning
simplefilter("once")
warn("get_timestep is deprecated, use .timestep instead",
DeprecationWarning)
return self.timestep
@property
def timestep(self):
"""
Getter of _timestep field. Timestep is defined in milliseconds
:return: The timestep of the controller in milliseconds
"""
return self._timestep
@timestep.setter
def timestep(self, value):
"""
Setter of timestep field. Automatically converts to int as
required by Webots.
:param value: The new controller timestep in milliseconds
"""
self._timestep = int(value)