🗣️

Publisher & Subscriber



Summary

This tutorial walks you through the process of setting up a basic publisher-subscriber system using Ark. You'll learn how to create a publisher to send messages and a subscriber to listen for them, all within a simple server-client architecture.


Writing the Publisher Node

“Node” in Ark is a script that is connected to the Ark Network, that has the ability to publish messages.

Here we will create the publisher (”talker”) node which will continually broadcast a message

The Code

Viewing the code located at examples/basics/publisher_subscriber/talker.py

from ark.client.comm_infrastructure.base_node import BaseNode,  main
from arktypes import string_t
from ark.tools.log import log
from typing import Dict, Any, Optional
import time

class TalkerNode(BaseNode):

    def __init__(self, config: Optional[Dict[str, Any]] = None):
        super().__init__("Talker")
        self.pub = self.create_publisher("chatter", string_t)
        self.create_stepper(10, self.step) # 10 Hz

    def step(self):
        msg = string_t()
        msg.data = f"Hello World {time.time()}"
        self.pub.publish(msg)
        log.info(f"Published message data: {msg.data}")


if __name__ == "__main__":
    main(TalkerNode)

The Code Explained

from ark.client.comm_infrastructure.base_node import BaseNode,  main
from arktypes import string_t
from ark.tools.log import log
from typing import Dict, Any, Optional
import time

You need to import BaseNode, main to create a node. The ark types sting_t is so that we can use the string_t message type for publishing.

super().__init__("Talker")
self.pub = self.create_publisher("chatter", string_t)

The line super().__init__("Talker") tells Ark the name of the node, this is required first to set up the node to allow communication. In this case the node has the name “Talker”

The line self.pub = self.create_publisher("chatter", string_t) defines the talkers interface with Ark. It says that the node will publish to the “chatter” channel using the message type string_t.

self.create_stepper(10, self.step) # 10 Hz

This line creates a stepper that will run the self.step callback function at the frequency of 10 times per second. (as long as our processing time does not exceed 1/10th of a second).

  def step(self):
      msg = string_t()
      msg.data = f"Hello World {time.time()}"
      self.pub.publish(msg)
      log.info(f"Published message data: {msg.data}")

The string_t message is a simple message type. You can see the way to pack this message in default_structs.lcm string_t . Here we just need to set the data parameter to the string we want to send.

We can publish the message using the publisher we defined earlier self.pub.publish(msg)

This callback also calls log.info(str) which prints the message that has been published to the screen.

Writing the Subscriber Node

The Code

Viewing the code located at examples/basics/publisher_subscriber/listener.py

from ark.client.comm_infrastructure.base_node import BaseNode,  main
from ark.tools.log import log
from arktypes import string_t
import time
from typing import Dict, Any, Optional

class ListenerNode(BaseNode):

    def __init__(self, config: Optional[Dict[str, Any]] = None):
        super().__init__("Listener")
        self.create_subscriber("chatter", string_t, self.callback)

    def callback(self, t, channel_name, msg):
        log.info(f"at {t} I heard {msg.data=} on channel '{channel_name}'")

if __name__ == "__main__":
    main(ListenerNode)

The Code Explained

The code for listener.py is similar to talker.py . Except we have introduced a callback-mechanism for subscribing to messages.

super().__init__("Listener")
self.create_subscriber("chatter", string_t, self.callback)

This declares that the node subscribes to the chatter topic which is of the type string_t. When new messages are heard on the channel chatter the callback function will be triggered.

The callback function will be triggered with the parameters of t, channel_name, msg . t is the time the message was received. channel_name was the channel the message was heard on. msg is the message heard on that channel.