Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ The tools in this package are provided as composable ROS 2 component nodes, so t

- [Relay](#relay): Subscribes to a topic and republishes to another.
- [RelayField](#relayfield): Republishes data in a different message type.
- [FilterRelay](#filterrelay): Republishes messages on different topics depending on matching filters.
- [Transform](#transform): Manipulates a topic or a field and outputs data on another topic.
- [Throttle](#throttle): Republishes data with bandwidth or rate limit.
- [Drop](#drop): Republishes by dropping X out of every Y incoming messages.
Expand Down Expand Up @@ -64,6 +65,24 @@ E.g. publish the contents of the `data` field in a `std_msgs/msg/String` onto th
ros2 run topic_tools relay_field /chatter /header std_msgs/Header "{stamp: {sec: 0, nanosec: 0}, frame_id: m.data}"
```

### FilterRelay

FilterRelay is a ROS 2 node that subscribes to a topic, and depending on conditional filters can republish a message on different topics.

#### Usage

```shell
ros2 run topic_tools filter_relay <input topic> <output topic 1> <output filter 1> <output topic 2> <output filter 2> [--import <modules>]
```

Subscribe to `input topic` and if incoming message matches output filter N, republish on output topic N.

E.g. Split a pointcloud topic into two different topics depending on the frame_id:

```shell
ros2 run topic_tools filter_relay /chatter /one "m.header.frame_id == 'one'" /two "m.header.frame_id == 'two'"
```

### Transform

Transform is a ROS 2 node that allows to take a topic or one of it fields and output it on another topic.
Expand Down
124 changes: 124 additions & 0 deletions topic_tools/topic_tools/filter_relay.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Copyright 2025 Alon Nusem
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Usage summary.

@author: Alon Nusem
Allows to republish messages on a different topic based on a matching filter.
* Examples
$ ros2 run topic_tools filter_relay /chatter /one "m.data == 1" /two "m.data == 2"
""" # noqa

import argparse
import importlib
import os
import sys

import rclpy
from rclpy.utilities import remove_ros_args
from ros2topic.api import get_msg_class

from .tool_base_node import ToolBase, add_qos_args

class FilterRelay(ToolBase):

def __init__(self, args, rules):
super().__init__(f'filter_relay_{os.getpid()}')

input_class = get_msg_class(
self, args.input, blocking=args.wait_for_start, include_hidden_topics=True)

if input_class is None:
raise RuntimeError(f'ERROR: Wrong input topic: {args.input}')
if len(rules) % 2 != 0 :
raise RuntimeError(f'ERROR: Odd number of extra args, format should come in "topic" "filter" pairs: {args.input}')

self.modules = {}
for module in args.modules:
try:
mod = importlib.import_module(module)
except ImportError:
print(f'Failed to import module: {module}', file=sys.stderr)
else:
self.modules[module] = mod

qos_profile = self.choose_qos(args, args.input)

self.pubs = []
self.filters = []
for topic, filter in zip(rules[::2], rules[1::2]):
try:
self.filters.append(eval(f'lambda m: {filter}', self.modules))
except NameError as e:
print(f"Expression using variables other than 'm': {e.message}", file=sys.stderr)
raise
except UnboundLocalError as e:
print(f'Wrong expression: {e.message}', file=sys.stderr)
raise
except Exception:
raise
self.pubs.append(self.create_publisher(input_class, topic, qos_profile))

self.sub = self.create_subscription(
input_class, args.input, self.callback, qos_profile)

def callback(self, m):
for filter, publisher in zip(self.filters, self.pubs):
try:
match = filter(m)
except Exception:
raise
if match:
publisher.publish(m)

def main(argv=sys.argv[1:]):
parser = argparse.ArgumentParser(
formatter_class=argparse.RawTextHelpFormatter,
description=(
'Allows to relay messages from one topic to others based on filter rules.'
'Usage:\n\tros2 run topic_tools transform '
'<input topic> <output topic> <output type> '
'[<expression on m>] [--import numpy tf] [--field <topic_field>]\n\n'
'Example:\n\tros2 run topic_tools filter_relay /chatter '
'/one "m.data == 1" /two "m.data == 2"')
)
parser.add_argument('input', help='Input topic or topic field.')
parser.add_argument(
'-i', '--import', dest='modules', nargs='+', default=['numpy'],
help='List of Python modules to import.'
)
parser.add_argument(
'--wait-for-start', action='store_true',
help='Wait for input messages.'
)
add_qos_args(parser)

# get and strip out ros args first
args, rules = parser.parse_known_args(remove_ros_args(args=argv))
rclpy.init(args=argv)
node = FilterRelay(args, rules)
try:
rclpy.spin(node)
except KeyboardInterrupt:
print('relay_field stopped cleanly')
except BaseException:
print('exception in relay_field:', file=sys.stderr)
raise
finally:
node.destroy_node()
rclpy.shutdown()
96 changes: 3 additions & 93 deletions topic_tools/topic_tools/relay_field.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,15 @@
import sys

import rclpy
from rclpy.node import Node
from rclpy.qos import QoSDurabilityPolicy
from rclpy.qos import QoSPresetProfiles
from rclpy.qos import QoSReliabilityPolicy
from rclpy.utilities import remove_ros_args
from ros2topic.api import get_msg_class
from ros2topic.api import qos_profile_from_short_keys
from rosidl_runtime_py import set_message_fields
from rosidl_runtime_py.utilities import get_message
import yaml

from .tool_base_node import ToolBase, add_qos_args

class RelayField(Node):
class RelayField(ToolBase):

def __init__(self, args):
super().__init__(f'relay_field_{os.getpid()}')
Expand Down Expand Up @@ -80,65 +76,6 @@ def _eval_in_dict_impl(self, dict_, globals_, locals_):
pass
return res

def choose_qos(self, args, topic_name):

if (args.qos_profile is not None or
args.qos_reliability is not None or
args.qos_durability is not None or
args.qos_depth is not None or
args.qos_history is not None):

if args.qos_profile is None:
args.qos_profile = 'sensor_data'
return qos_profile_from_short_keys(args.qos_profile,
reliability=args.qos_reliability,
durability=args.qos_durability,
depth=args.qos_depth,
history=args.qos_history)

qos_profile = QoSPresetProfiles.get_from_short_key('sensor_data')
reliability_reliable_endpoints_count = 0
durability_transient_local_endpoints_count = 0

pubs_info = self.get_publishers_info_by_topic(topic_name)
publishers_count = len(pubs_info)
if publishers_count == 0:
return qos_profile

for info in pubs_info:
if (info.qos_profile.reliability == QoSReliabilityPolicy.RELIABLE):
reliability_reliable_endpoints_count += 1
if (info.qos_profile.durability == QoSDurabilityPolicy.TRANSIENT_LOCAL):
durability_transient_local_endpoints_count += 1

# If all endpoints are reliable, ask for reliable
if reliability_reliable_endpoints_count == publishers_count:
qos_profile.reliability = QoSReliabilityPolicy.RELIABLE
else:
if reliability_reliable_endpoints_count > 0:
print(
'Some, but not all, publishers are offering '
'QoSReliabilityPolicy.RELIABLE. Falling back to '
'QoSReliabilityPolicy.BEST_EFFORT as it will connect '
'to all publishers'
)
qos_profile.reliability = QoSReliabilityPolicy.BEST_EFFORT

# If all endpoints are transient_local, ask for transient_local
if durability_transient_local_endpoints_count == publishers_count:
qos_profile.durability = QoSDurabilityPolicy.TRANSIENT_LOCAL
else:
if durability_transient_local_endpoints_count > 0:
print(
'Some, but not all, publishers are offering '
'QoSDurabilityPolicy.TRANSIENT_LOCAL. Falling back to '
'QoSDurabilityPolicy.VOLATILE as it will connect '
'to all publishers'
)
qos_profile.durability = QoSDurabilityPolicy.VOLATILE

return qos_profile

def callback(self, m):
try:
pub_args = self.msg_generation_lambda(m)
Expand Down Expand Up @@ -177,34 +114,7 @@ def main(argv=sys.argv[1:]):
'--wait-for-start', action='store_true',
help='Wait for input messages.'
)
parser.add_argument(
'--qos-profile',
choices=rclpy.qos.QoSPresetProfiles.short_keys(),
help='Quality of service preset profile to subscribe with (default: sensor_data)'
)
default_profile = rclpy.qos.QoSPresetProfiles.get_from_short_key('sensor_data')
parser.add_argument(
'--qos-depth', metavar='N', type=int,
help='Queue size setting to subscribe with '
'(overrides depth value of --qos-profile option)')
parser.add_argument(
'--qos-history',
choices=rclpy.qos.QoSHistoryPolicy.short_keys(),
help='History of samples setting to subscribe with '
'(overrides history value of --qos-profile option, default: {})'
.format(default_profile.history.short_key))
parser.add_argument(
'--qos-reliability',
choices=rclpy.qos.QoSReliabilityPolicy.short_keys(),
help='Quality of service reliability setting to subscribe with '
'(overrides reliability value of --qos-profile option, default: '
'Automatically match existing publishers )')
parser.add_argument(
'--qos-durability',
choices=rclpy.qos.QoSDurabilityPolicy.short_keys(),
help='Quality of service durability setting to subscribe with '
'(overrides durability value of --qos-profile option, default: '
'Automatically match existing publishers )')
add_qos_args(parser)

# get and strip out ros args first
args = parser.parse_args(remove_ros_args(args=argv))
Expand Down
1 change: 1 addition & 0 deletions topic_tools/topic_tools/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ script_dir=$base/lib/topic_tools
install_scripts=$base/lib/topic_tools
[options.entry_points]
console_scripts =
filter_relay = topic_tools.filter_relay:main
relay_field = topic_tools.relay_field:main
transform = topic_tools.transform:main
Loading
Loading