Создадим простой пример с двумя нодами:
- Первая нода будет публиковать сообщения типа String в топик "chatter"
- Вторая нода будет подписываться на этот топик и выводить полученные сообщения
Нода-издатель (Publisher)
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class MinimalPublisher(Node):
def __init__(self):
super().__init__('minimal_publisher') # Имя ноды
# Создаем паблишер, который будет отправлять сообщения типа String в топик 'chatter'
self.publisher_ = self.create_publisher(String, 'chatter', 10) # 10 - размер очереди
timer_period = 1.0 # секунды
# Создаем таймер, который будет вызывать функцию timer_callback каждую секунду
self.timer = self.create_timer(timer_period, self.timer_callback)
self.i = 0
def timer_callback(self):
# Создаем сообщение
msg = String()
msg.data = f'Hello World: {self.i}'
# Публикуем сообщение
self.publisher_.publish(msg)
# Выводим информацию о публикации
self.get_logger().info(f'Publishing: "{msg.data}"')
self.i += 1
def main(args=None):
# Инициализация ROS2
rclpy.init(args=args)
# Создаем экземпляр ноды
minimal_publisher = MinimalPublisher()
# Запускаем ноду
rclpy.spin(minimal_publisher)
# Уничтожаем ноду и завершаем работу
minimal_publisher.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Нода-подписчик (Subscriber)
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class MinimalSubscriber(Node):
def __init__(self):
super().__init__('minimal_subscriber') # Имя ноды
# Создаем подписчика на топик 'chatter', который будет вызывать
# функцию listener_callback при получении сообщения
self.subscription = self.create_subscription(
String, # Тип сообщения
'chatter', # Имя топика
self.listener_callback, # Функция обратного вызова
10) # Размер очереди
self.subscription # Предотвращает предупреждение о неиспользуемой переменной
def listener_callback(self, msg):
# Обрабатываем полученное сообщение
self.get_logger().info(f'I heard: "{msg.data}"')
def main(args=None):
# Инициализация ROS2
rclpy.init(args=args)
# Создаем экземпляр ноды
minimal_subscriber = MinimalSubscriber()
# Запускаем ноду
rclpy.spin(minimal_subscriber)
# Уничтожаем ноду и завершаем работу
minimal_subscriber.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Как это работает
- Инициализация ROS2:
rclpy.init(args=args)инициализирует систему ROS2. - Создание нод:
-MinimalPublisherиMinimalSubscriber— это классы, наследующие от базового классаNode.
- При создании ноды мы задаем ей имя:super().__init__('minimal_publisher'). - Паблишер:
- Создается с помощьюself.create_publisher(String, 'chatter', 10), где:
-String— тип сообщения
-'chatter'— имя топика
-10— размер очереди сообщений - Подписчик:
- Создается с помощьюself.create_subscription(String, 'chatter', self.listener_callback, 10), где:
-String— тип сообщения
-'chatter'— имя топика
-self.listener_callback— функция, которая будет вызвана при получении сообщения
-10— размер очереди сообщений - Обмен сообщениями:
- Паблишер отправляет сообщения черезself.publisher_.publish(msg)
- Подписчик автоматически получает сообщения через функцию обратного вызоваlistener_callback - Запуск нод:
-rclpy.spin(node)запускает обработку сообщений и колбэков ноды
- Нода работает до тех пор, пока не будет вызванrclpy.shutdown()
Запуск примера
Чтобы запустить этот пример, нужно:1. Создать пакет ROS2:
source /opt/ros/jazzy/setup.bash
ros2 pkg create --build-type ament_python my_package

- Поместить код в файлы
my_package/my_package/publisher_node.pyГде создадим Python файлы:
# Перейдем в директорию пакета
cd my_package
# Создадим файлы в подкаталоге my_package
touch my_package/publisher_node.py
touch my_package/subscriber_node.py

Добавим код в соответствующие файлы:
nano publisher_node.py
nano subscriber_node.py
- Настроим точки входа в
setup.py:Когда мы создаем пакет ROS2 с помощью командыros2 pkg create, в корневой директории пакета автоматически создается файлsetup.py. Нужно открыть этот файл в любом текстовом редакторе:
nano setup.py
Так выглядит setup.py по умолчанию:
from setuptools import find_packages, setup
package_name = 'my_package'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='wtf',
maintainer_email='wtf@todo.todo',
description='TODO: Package description',
license='TODO: License declaration',
tests_require=['pytest'],
entry_points={
'console_scripts': [],
},
)
Нужно добавить или изменить раздел entry_points в функции setup(). Найдем этот раздел (если он уже есть) или добавим его перед закрывающей скобкой функции setup():
entry_points={
'console_scripts': [
'talker = my_package.publisher_node:main',
'listener = my_package.subscriber_node:main',
],
},
Полный файл setup.py после редактирования должен выглядеть так:
from setuptools import find_packages, setup
package_name = 'my_package'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
(
'share/ament_index/resource_index/packages',
['resource/' + package_name],
),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='wtf',
maintainer_email='wtf@todo.todo',
description='TODO: Package description',
license='TODO: License declaration',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'talker = my_package.publisher_node:main',
'listener = my_package.subscriber_node:main',
],
},
)
Что означают эти точки входа
Каждая строка в разделе 'console_scripts' имеет формат:
'имя_команды = путь_к_модулю:функция'
Где:
- имя_команды — это имя, которое будет использоваться для запуска программы через командную строку (например, ros2 run my_package talker)
- путь_к_модулю — это путь к Python-модулю относительно корня пакета
- функция — это имя функции в модуле, которая будет вызвана при запуске команды
В нашем примере:
- talker = my_package.publisher_node:main означает, что команда ros2 run my_package talker будет вызывать функцию main() из файла my_package/publisher_node.py
- listener = my_package.subscriber_node:main означает, что команда ros2 run my_package listener будет вызывать функцию main() из файла my_package/subscriber_node.py
Сохраним файл и соберем пакет
После редактирования сохраним файл и соберем пакет:
# Соберем пакет
colcon build --packages-select my_package
# Загрузим настройки окружения
source install/setup.bash

Запуск
Запустим в первом терминале
# В первом терминале:
source install/setup.bash
ros2 run my_package talker
Запустим во втором терминале:
# Во втором терминале:
source install/setup.bash
export ROS_AUTOMATIC_DISCOVERY_RANGE=LOCALHOST
ros2 run my_package listener
Команда export ROS_AUTOMATIC_DISCOVERY_RANGE=LOCALHOST используется для ограничения области обнаружения узлов ROS2 только локальным компьютером.

Удаление пакета
Удалим папку с пакетом:
rm -rf my_package
Перестроим рабочее пространство без этого пакета:
colcon build

Основные причины перестройки рабочего пространства
1. Обновление индексов пакетов
- ROS2 поддерживает индексы доступных пакетов
- После удаления пакета нужно обновить эти индексы, чтобы система «знала», что пакет больше не доступен
1. Обновление зависимостей
- Если другие пакеты зависели от удаленного пакета, они должны быть перестроены
- colcon build проверит зависимости и перестроит только то, что необходимо
1. Очистка ссылок
- В директориях install и build остаются ссылки на удаленный пакет
- Перестройка обновит эти директории, удалив неактуальные ссылки
1. Обновление overlay
- ROS2 использует систему overlay для пакетов
- Перестройка обновляет overlay, чтобы удаленный пакет не был доступен при использовании команды source install/setup.bash
Перестройку можно пропустить, если:
- Удаляем весь рабочий каталог целиком
- Не планируется использовать оставшиеся пакеты из этого рабочего пространства
- Собираемся создать новое рабочее пространство с нуля