Ark
Loading...
Searching...
No Matches
spaces.ObservationSpace Class Reference

A class representing an observation space that listens for observations over LCM and processes them. More...

Inheritance diagram for spaces.ObservationSpace:
spaces.Space

Public Member Functions

 __init__ (self, List observation_channels, Callable observation_unpacking, LCM lcm_instance)
 Create an observation space.
Any unpack_message (self, Dict observation_dict)
 Unpack a raw observation dictionary.
 check_readiness (self)
 Check whether fresh observations are available.
 wait_until_observation_space_is_ready (self)
 Block until a complete observation has been received.
 empty_data (self)
 Clear cached observation data.
Any get_observation (self)
 Return the latest processed observation.
Public Member Functions inherited from spaces.Space
None shutdown (self)
 Abstract method to shut down the space, ensuring any resources are released.

Public Attributes

 observation_space_listener = MultiChannelListener(observation_channels, lcm_instance)
 observation_unpacking = observation_unpacking
bool is_ready = False
 data = self.observation_space_listener.get()

Detailed Description

A class representing an observation space that listens for observations over LCM and processes them.

Parameters
observationchannels: Channel names where observations will be listened @type observation_channels: List
observation_unpackingA function that converts observation dictionary into any types @type observation_unpacking: Callable
lcm_instanceCommunication variable @type lcm_instance: LCM

Constructor & Destructor Documentation

◆ __init__()

spaces.ObservationSpace.__init__ ( self,
List observation_channels,
Callable observation_unpacking,
LCM lcm_instance )

Create an observation space.

Parameters
observation_channelsChannels to listen for observations.
observation_unpackingCallback used to deserialize messages.
lcm_instanceLCM instance used for communication.

Member Function Documentation

◆ check_readiness()

spaces.ObservationSpace.check_readiness ( self)

Check whether fresh observations are available.

◆ empty_data()

spaces.ObservationSpace.empty_data ( self)

Clear cached observation data.

◆ get_observation()

Any spaces.ObservationSpace.get_observation ( self)

Return the latest processed observation.

◆ unpack_message()

Any spaces.ObservationSpace.unpack_message ( self,
Dict observation_dict )

Unpack a raw observation dictionary.

Parameters
observation_dictDictionary mapping channel names to raw LCM messages.
Returns
The processed observation. @rtype Any

◆ wait_until_observation_space_is_ready()

spaces.ObservationSpace.wait_until_observation_space_is_ready ( self)

Block until a complete observation has been received.

Member Data Documentation

◆ data

spaces.ObservationSpace.data = self.observation_space_listener.get()

◆ is_ready

spaces.ObservationSpace.is_ready = False

◆ observation_space_listener

spaces.ObservationSpace.observation_space_listener = MultiChannelListener(observation_channels, lcm_instance)

◆ observation_unpacking

spaces.ObservationSpace.observation_unpacking = observation_unpacking

The documentation for this class was generated from the following file: