Publisher & Subscriber
Summary
This tutorial walks you through the process of setting up a basic publisher-subscriber system using Ark. You'll learn how to create a publisher to send messages and a subscriber to listen for them, all within a simple server-client architecture.
Writing the Publisher Node
“Node” in Ark is a script that is connected to the Ark Network, that has the ability to publish messages.
Here we will create the publisher (”talker”) node which will continually broadcast a message
The Code
Viewing the code located at examples/basics/publisher_subscriber/talker.py
from ark.client.comm_infrastructure.base_node import BaseNode, main
from arktypes import string_t
from ark.tools.log import log
from typing import Dict, Any, Optional
import time
class TalkerNode(BaseNode):
def __init__(self, config: Optional[Dict[str, Any]] = None):
super().__init__("Talker")
self.pub = self.create_publisher("chatter", string_t)
self.create_stepper(10, self.step) # 10 Hz
def step(self):
msg = string_t()
msg.data = f"Hello World {time.time()}"
self.pub.publish(msg)
log.info(f"Published message data: {msg.data}")
if __name__ == "__main__":
main(TalkerNode)
The Code Explained
from ark.client.comm_infrastructure.base_node import BaseNode, main
from arktypes import string_t
from ark.tools.log import log
from typing import Dict, Any, Optional
import time
You need to import BaseNode
, main
to create a node. The ark types
sting_t
is so that we can use the string_t
message type for publishing.
super().__init__("Talker")
self.pub = self.create_publisher("chatter", string_t)
The line super().__init__("Talker")
tells Ark the name of the node, this is required first to set up the node to allow communication. In this case the node has the name “Talker”
The line self.pub = self.create_publisher("chatter", string_t)
defines the talkers interface with Ark. It says that the node will publish to the “chatter” channel using the message type string_t
.
self.create_stepper(10, self.step) # 10 Hz
This line creates a stepper that will run the self.step
callback function at the frequency of 10 times per second. (as long as our processing time does not exceed 1/10th of a second).
def step(self):
msg = string_t()
msg.data = f"Hello World {time.time()}"
self.pub.publish(msg)
log.info(f"Published message data: {msg.data}")
The string_t
message is a simple message type. You can see the way to pack this message in default_structs.lcm string_t
. Here we just need to set the data parameter to the string we want to send.
We can publish the message using the publisher we defined earlier self.pub.publish(msg)
This callback also calls log.info(str)
which prints the message that has been published to the screen.
Writing the Subscriber Node
The Code
Viewing the code located at examples/basics/publisher_subscriber/listener.py
from ark.client.comm_infrastructure.base_node import BaseNode, main
from ark.tools.log import log
from arktypes import string_t
import time
from typing import Dict, Any, Optional
class ListenerNode(BaseNode):
def __init__(self, config: Optional[Dict[str, Any]] = None):
super().__init__("Listener")
self.create_subscriber("chatter", string_t, self.callback)
def callback(self, t, channel_name, msg):
log.info(f"at {t} I heard {msg.data=} on channel '{channel_name}'")
if __name__ == "__main__":
main(ListenerNode)
The Code Explained
The code for listener.py
is similar to talker.py
. Except we have introduced a callback-mechanism for subscribing to messages.
super().__init__("Listener")
self.create_subscriber("chatter", string_t, self.callback)
This declares that the node subscribes to the chatter topic which is of the type string_t
. When new messages are heard on the channel chatter the callback
function will be triggered.
The callback function will be triggered with the parameters of t, channel_name, msg
. t
is the time the message was received. channel_name
was the channel the message was heard on. msg
is the message heard on that channel.