diff --git a/README.md b/README.md index 52600394..cefdffa7 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,7 @@ The tools in this package are provided as composable ROS 2 component nodes, so t - [Relay](#relay): Subscribes to a topic and republishes to another. - [RelayField](#relayfield): Republishes data in a different message type. +- [FilterRelay](#filterrelay): Republishes messages on different topics depending on matching filters. - [Transform](#transform): Manipulates a topic or a field and outputs data on another topic. - [Throttle](#throttle): Republishes data with bandwidth or rate limit. - [Drop](#drop): Republishes by dropping X out of every Y incoming messages. @@ -64,6 +65,24 @@ E.g. publish the contents of the `data` field in a `std_msgs/msg/String` onto th ros2 run topic_tools relay_field /chatter /header std_msgs/Header "{stamp: {sec: 0, nanosec: 0}, frame_id: m.data}" ``` +### FilterRelay + +FilterRelay is a ROS 2 node that subscribes to a topic, and depending on conditional filters can republish a message on different topics. + +#### Usage + +```shell +ros2 run topic_tools filter_relay [--import ] +``` + +Subscribe to `input topic` and if incoming message matches output filter N, republish on output topic N. + +E.g. Split a pointcloud topic into two different topics depending on the frame_id: + +```shell +ros2 run topic_tools filter_relay /chatter /one "m.header.frame_id == 'one'" /two "m.header.frame_id == 'two'" +``` + ### Transform Transform is a ROS 2 node that allows to take a topic or one of it fields and output it on another topic. diff --git a/topic_tools/topic_tools/filter_relay.py b/topic_tools/topic_tools/filter_relay.py new file mode 100644 index 00000000..3cb7897e --- /dev/null +++ b/topic_tools/topic_tools/filter_relay.py @@ -0,0 +1,124 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Copyright 2025 Alon Nusem +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Usage summary. + +@author: Alon Nusem +Allows to republish messages on a different topic based on a matching filter. +* Examples +$ ros2 run topic_tools filter_relay /chatter /one "m.data == 1" /two "m.data == 2" +""" # noqa + +import argparse +import importlib +import os +import sys + +import rclpy +from rclpy.utilities import remove_ros_args +from ros2topic.api import get_msg_class + +from .tool_base_node import ToolBase, add_qos_args + +class FilterRelay(ToolBase): + + def __init__(self, args, rules): + super().__init__(f'filter_relay_{os.getpid()}') + + input_class = get_msg_class( + self, args.input, blocking=args.wait_for_start, include_hidden_topics=True) + + if input_class is None: + raise RuntimeError(f'ERROR: Wrong input topic: {args.input}') + if len(rules) % 2 != 0 : + raise RuntimeError(f'ERROR: Odd number of extra args, format should come in "topic" "filter" pairs: {args.input}') + + self.modules = {} + for module in args.modules: + try: + mod = importlib.import_module(module) + except ImportError: + print(f'Failed to import module: {module}', file=sys.stderr) + else: + self.modules[module] = mod + + qos_profile = self.choose_qos(args, args.input) + + self.pubs = [] + self.filters = [] + for topic, filter in zip(rules[::2], rules[1::2]): + try: + self.filters.append(eval(f'lambda m: {filter}', self.modules)) + except NameError as e: + print(f"Expression using variables other than 'm': {e.message}", file=sys.stderr) + raise + except UnboundLocalError as e: + print(f'Wrong expression: {e.message}', file=sys.stderr) + raise + except Exception: + raise + self.pubs.append(self.create_publisher(input_class, topic, qos_profile)) + + self.sub = self.create_subscription( + input_class, args.input, self.callback, qos_profile) + + def callback(self, m): + for filter, publisher in zip(self.filters, self.pubs): + try: + match = filter(m) + except Exception: + raise + if match: + publisher.publish(m) + +def main(argv=sys.argv[1:]): + parser = argparse.ArgumentParser( + formatter_class=argparse.RawTextHelpFormatter, + description=( + 'Allows to relay messages from one topic to others based on filter rules.' + 'Usage:\n\tros2 run topic_tools transform ' + ' ' + '[] [--import numpy tf] [--field ]\n\n' + 'Example:\n\tros2 run topic_tools filter_relay /chatter ' + '/one "m.data == 1" /two "m.data == 2"') + ) + parser.add_argument('input', help='Input topic or topic field.') + parser.add_argument( + '-i', '--import', dest='modules', nargs='+', default=['numpy'], + help='List of Python modules to import.' + ) + parser.add_argument( + '--wait-for-start', action='store_true', + help='Wait for input messages.' + ) + add_qos_args(parser) + + # get and strip out ros args first + args, rules = parser.parse_known_args(remove_ros_args(args=argv)) + rclpy.init(args=argv) + node = FilterRelay(args, rules) + try: + rclpy.spin(node) + except KeyboardInterrupt: + print('relay_field stopped cleanly') + except BaseException: + print('exception in relay_field:', file=sys.stderr) + raise + finally: + node.destroy_node() + rclpy.shutdown() diff --git a/topic_tools/topic_tools/relay_field.py b/topic_tools/topic_tools/relay_field.py index f6440d88..640e9133 100644 --- a/topic_tools/topic_tools/relay_field.py +++ b/topic_tools/topic_tools/relay_field.py @@ -30,19 +30,15 @@ import sys import rclpy -from rclpy.node import Node -from rclpy.qos import QoSDurabilityPolicy -from rclpy.qos import QoSPresetProfiles -from rclpy.qos import QoSReliabilityPolicy from rclpy.utilities import remove_ros_args from ros2topic.api import get_msg_class -from ros2topic.api import qos_profile_from_short_keys from rosidl_runtime_py import set_message_fields from rosidl_runtime_py.utilities import get_message import yaml +from .tool_base_node import ToolBase, add_qos_args -class RelayField(Node): +class RelayField(ToolBase): def __init__(self, args): super().__init__(f'relay_field_{os.getpid()}') @@ -80,65 +76,6 @@ def _eval_in_dict_impl(self, dict_, globals_, locals_): pass return res - def choose_qos(self, args, topic_name): - - if (args.qos_profile is not None or - args.qos_reliability is not None or - args.qos_durability is not None or - args.qos_depth is not None or - args.qos_history is not None): - - if args.qos_profile is None: - args.qos_profile = 'sensor_data' - return qos_profile_from_short_keys(args.qos_profile, - reliability=args.qos_reliability, - durability=args.qos_durability, - depth=args.qos_depth, - history=args.qos_history) - - qos_profile = QoSPresetProfiles.get_from_short_key('sensor_data') - reliability_reliable_endpoints_count = 0 - durability_transient_local_endpoints_count = 0 - - pubs_info = self.get_publishers_info_by_topic(topic_name) - publishers_count = len(pubs_info) - if publishers_count == 0: - return qos_profile - - for info in pubs_info: - if (info.qos_profile.reliability == QoSReliabilityPolicy.RELIABLE): - reliability_reliable_endpoints_count += 1 - if (info.qos_profile.durability == QoSDurabilityPolicy.TRANSIENT_LOCAL): - durability_transient_local_endpoints_count += 1 - - # If all endpoints are reliable, ask for reliable - if reliability_reliable_endpoints_count == publishers_count: - qos_profile.reliability = QoSReliabilityPolicy.RELIABLE - else: - if reliability_reliable_endpoints_count > 0: - print( - 'Some, but not all, publishers are offering ' - 'QoSReliabilityPolicy.RELIABLE. Falling back to ' - 'QoSReliabilityPolicy.BEST_EFFORT as it will connect ' - 'to all publishers' - ) - qos_profile.reliability = QoSReliabilityPolicy.BEST_EFFORT - - # If all endpoints are transient_local, ask for transient_local - if durability_transient_local_endpoints_count == publishers_count: - qos_profile.durability = QoSDurabilityPolicy.TRANSIENT_LOCAL - else: - if durability_transient_local_endpoints_count > 0: - print( - 'Some, but not all, publishers are offering ' - 'QoSDurabilityPolicy.TRANSIENT_LOCAL. Falling back to ' - 'QoSDurabilityPolicy.VOLATILE as it will connect ' - 'to all publishers' - ) - qos_profile.durability = QoSDurabilityPolicy.VOLATILE - - return qos_profile - def callback(self, m): try: pub_args = self.msg_generation_lambda(m) @@ -177,34 +114,7 @@ def main(argv=sys.argv[1:]): '--wait-for-start', action='store_true', help='Wait for input messages.' ) - parser.add_argument( - '--qos-profile', - choices=rclpy.qos.QoSPresetProfiles.short_keys(), - help='Quality of service preset profile to subscribe with (default: sensor_data)' - ) - default_profile = rclpy.qos.QoSPresetProfiles.get_from_short_key('sensor_data') - parser.add_argument( - '--qos-depth', metavar='N', type=int, - help='Queue size setting to subscribe with ' - '(overrides depth value of --qos-profile option)') - parser.add_argument( - '--qos-history', - choices=rclpy.qos.QoSHistoryPolicy.short_keys(), - help='History of samples setting to subscribe with ' - '(overrides history value of --qos-profile option, default: {})' - .format(default_profile.history.short_key)) - parser.add_argument( - '--qos-reliability', - choices=rclpy.qos.QoSReliabilityPolicy.short_keys(), - help='Quality of service reliability setting to subscribe with ' - '(overrides reliability value of --qos-profile option, default: ' - 'Automatically match existing publishers )') - parser.add_argument( - '--qos-durability', - choices=rclpy.qos.QoSDurabilityPolicy.short_keys(), - help='Quality of service durability setting to subscribe with ' - '(overrides durability value of --qos-profile option, default: ' - 'Automatically match existing publishers )') + add_qos_args(parser) # get and strip out ros args first args = parser.parse_args(remove_ros_args(args=argv)) diff --git a/topic_tools/topic_tools/setup.cfg b/topic_tools/topic_tools/setup.cfg index 0e120fee..182d6dd6 100644 --- a/topic_tools/topic_tools/setup.cfg +++ b/topic_tools/topic_tools/setup.cfg @@ -4,5 +4,6 @@ script_dir=$base/lib/topic_tools install_scripts=$base/lib/topic_tools [options.entry_points] console_scripts = + filter_relay = topic_tools.filter_relay:main relay_field = topic_tools.relay_field:main transform = topic_tools.transform:main diff --git a/topic_tools/topic_tools/tool_base_node.py b/topic_tools/topic_tools/tool_base_node.py new file mode 100644 index 00000000..ed52e77a --- /dev/null +++ b/topic_tools/topic_tools/tool_base_node.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Copyright 2021 Daisuke Nishimatsu +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from rclpy.node import Node +from rclpy.qos import QoSDurabilityPolicy +from rclpy.qos import QoSPresetProfiles +from rclpy.qos import QoSHistoryPolicy +from rclpy.qos import QoSReliabilityPolicy +from ros2topic.api import qos_profile_from_short_keys + + +class ToolBase(Node): + + def choose_qos(self, args, topic_name): + + if (args.qos_profile is not None or + args.qos_reliability is not None or + args.qos_durability is not None or + args.qos_depth is not None or + args.qos_history is not None): + + if args.qos_profile is None: + args.qos_profile = 'sensor_data' + return qos_profile_from_short_keys(args.qos_profile, + reliability=args.qos_reliability, + durability=args.qos_durability, + depth=args.qos_depth, + history=args.qos_history) + + qos_profile = QoSPresetProfiles.get_from_short_key('sensor_data') + reliability_reliable_endpoints_count = 0 + durability_transient_local_endpoints_count = 0 + + pubs_info = self.get_publishers_info_by_topic(topic_name) + publishers_count = len(pubs_info) + if publishers_count == 0: + return qos_profile + + for info in pubs_info: + if (info.qos_profile.reliability == QoSReliabilityPolicy.RELIABLE): + reliability_reliable_endpoints_count += 1 + if (info.qos_profile.durability == QoSDurabilityPolicy.TRANSIENT_LOCAL): + durability_transient_local_endpoints_count += 1 + + # If all endpoints are reliable, ask for reliable + if reliability_reliable_endpoints_count == publishers_count: + qos_profile.reliability = QoSReliabilityPolicy.RELIABLE + else: + if reliability_reliable_endpoints_count > 0: + print( + 'Some, but not all, publishers are offering ' + 'QoSReliabilityPolicy.RELIABLE. Falling back to ' + 'QoSReliabilityPolicy.BEST_EFFORT as it will connect ' + 'to all publishers' + ) + qos_profile.reliability = QoSReliabilityPolicy.BEST_EFFORT + + # If all endpoints are transient_local, ask for transient_local + if durability_transient_local_endpoints_count == publishers_count: + qos_profile.durability = QoSDurabilityPolicy.TRANSIENT_LOCAL + else: + if durability_transient_local_endpoints_count > 0: + print( + 'Some, but not all, publishers are offering ' + 'QoSDurabilityPolicy.TRANSIENT_LOCAL. Falling back to ' + 'QoSDurabilityPolicy.VOLATILE as it will connect ' + 'to all publishers' + ) + qos_profile.durability = QoSDurabilityPolicy.VOLATILE + + return qos_profile + +def add_qos_args(parser): + parser.add_argument( + '--qos-profile', + choices=QoSPresetProfiles.short_keys(), + help='Quality of service preset profile to subscribe with (default: sensor_data)' + ) + default_profile = QoSPresetProfiles.get_from_short_key('sensor_data') + parser.add_argument( + '--qos-depth', metavar='N', type=int, + help='Queue size setting to subscribe with ' + '(overrides depth value of --qos-profile option)') + parser.add_argument( + '--qos-history', + choices=QoSHistoryPolicy.short_keys(), + help='History of samples setting to subscribe with ' + '(overrides history value of --qos-profile option, default: {})' + .format(default_profile.history.short_key)) + parser.add_argument( + '--qos-reliability', + choices=QoSReliabilityPolicy.short_keys(), + help='Quality of service reliability setting to subscribe with ' + '(overrides reliability value of --qos-profile option, default: ' + 'Automatically match existing publishers )') + parser.add_argument( + '--qos-durability', + choices=QoSDurabilityPolicy.short_keys(), + help='Quality of service durability setting to subscribe with ' + '(overrides durability value of --qos-profile option, default: ' + 'Automatically match existing publishers )') diff --git a/topic_tools/topic_tools/transform.py b/topic_tools/topic_tools/transform.py index 624c2f31..463410f5 100755 --- a/topic_tools/topic_tools/transform.py +++ b/topic_tools/topic_tools/transform.py @@ -35,17 +35,13 @@ import sys import rclpy -from rclpy.node import Node -from rclpy.qos import QoSDurabilityPolicy -from rclpy.qos import QoSPresetProfiles -from rclpy.qos import QoSReliabilityPolicy from rclpy.utilities import remove_ros_args from ros2topic.api import get_msg_class -from ros2topic.api import qos_profile_from_short_keys from rosidl_runtime_py.utilities import get_message +from .tool_base_node import ToolBase, add_qos_args -class Transform(Node): +class Transform(ToolBase): def __init__(self, args): super().__init__(f'transform_{os.getpid()}') @@ -92,65 +88,6 @@ def __init__(self, args): self.sub = self.create_subscription( input_class, args.input, self.callback, qos_profile) - def choose_qos(self, args, topic_name): - - if (args.qos_profile is not None or - args.qos_reliability is not None or - args.qos_durability is not None or - args.qos_depth is not None or - args.qos_history is not None): - - if args.qos_profile is None: - args.qos_profile = 'sensor_data' - return qos_profile_from_short_keys(args.qos_profile, - reliability=args.qos_reliability, - durability=args.qos_durability, - depth=args.qos_depth, - history=args.qos_history) - - qos_profile = QoSPresetProfiles.get_from_short_key('sensor_data') - reliability_reliable_endpoints_count = 0 - durability_transient_local_endpoints_count = 0 - - pubs_info = self.get_publishers_info_by_topic(topic_name) - publishers_count = len(pubs_info) - if publishers_count == 0: - return qos_profile - - for info in pubs_info: - if (info.qos_profile.reliability == QoSReliabilityPolicy.RELIABLE): - reliability_reliable_endpoints_count += 1 - if (info.qos_profile.durability == QoSDurabilityPolicy.TRANSIENT_LOCAL): - durability_transient_local_endpoints_count += 1 - - # If all endpoints are reliable, ask for reliable - if reliability_reliable_endpoints_count == publishers_count: - qos_profile.reliability = QoSReliabilityPolicy.RELIABLE - else: - if reliability_reliable_endpoints_count > 0: - print( - 'Some, but not all, publishers are offering ' - 'QoSReliabilityPolicy.RELIABLE. Falling back to ' - 'QoSReliabilityPolicy.BEST_EFFORT as it will connect ' - 'to all publishers' - ) - qos_profile.reliability = QoSReliabilityPolicy.BEST_EFFORT - - # If all endpoints are transient_local, ask for transient_local - if durability_transient_local_endpoints_count == publishers_count: - qos_profile.durability = QoSDurabilityPolicy.TRANSIENT_LOCAL - else: - if durability_transient_local_endpoints_count > 0: - print( - 'Some, but not all, publishers are offering ' - 'QoSDurabilityPolicy.TRANSIENT_LOCAL. Falling back to ' - 'QoSDurabilityPolicy.VOLATILE as it will connect ' - 'to all publishers' - ) - qos_profile.durability = QoSDurabilityPolicy.VOLATILE - - return qos_profile - def callback(self, m): if self.field is not None: for field in self.field: @@ -198,34 +135,7 @@ def main(argv=sys.argv[1:]): '--wait-for-start', action='store_true', help='Wait for input messages.' ) - parser.add_argument( - '--qos-profile', - choices=rclpy.qos.QoSPresetProfiles.short_keys(), - help='Quality of service preset profile to subscribe with (default: sensor_data)' - ) - default_profile = rclpy.qos.QoSPresetProfiles.get_from_short_key('sensor_data') - parser.add_argument( - '--qos-depth', metavar='N', type=int, - help='Queue size setting to subscribe with ' - '(overrides depth value of --qos-profile option)') - parser.add_argument( - '--qos-history', - choices=rclpy.qos.QoSHistoryPolicy.short_keys(), - help='History of samples setting to subscribe with ' - '(overrides history value of --qos-profile option, default: {})' - .format(default_profile.history.short_key)) - parser.add_argument( - '--qos-reliability', - choices=rclpy.qos.QoSReliabilityPolicy.short_keys(), - help='Quality of service reliability setting to subscribe with ' - '(overrides reliability value of --qos-profile option, default: ' - 'Automatically match existing publishers )') - parser.add_argument( - '--qos-durability', - choices=rclpy.qos.QoSDurabilityPolicy.short_keys(), - help='Quality of service durability setting to subscribe with ' - '(overrides durability value of --qos-profile option, default: ' - 'Automatically match existing publishers )') + add_qos_args(parser) parser.add_argument( '--field', type=str, default=None, help='Transform a selected field of a message. '