Chinaunix首页 | 论坛 | 博客
  • 博客访问: 9147875
  • 博文数量: 1726
  • 博客积分: 12961
  • 博客等级: 上将
  • 技术积分: 19850
  • 用 户 组: 普通用户
  • 注册时间: 2009-01-09 11:25
个人简介

偷得浮生半桶水(半日闲), 好记性不如抄下来(烂笔头). 信息爆炸的时代, 学习是一项持续的工作.

文章分类

全部博文(1726)

文章存档

2024年(2)

2023年(26)

2022年(112)

2021年(217)

2020年(157)

2019年(192)

2018年(81)

2017年(78)

2016年(70)

2015年(52)

2014年(40)

2013年(51)

2012年(85)

2011年(45)

2010年(231)

2009年(287)

分类: 其他平台

2020-11-18 14:10:12

引用
https://blog.csdn.net/liuerin/article/details/108749596


本想使用多线程executor执行node,没想到简单的创建Subscription之后callback funcs竟然是顺序执行的,所以仔细看了一下,原来node中还有callback_group这个东西。结论如下:


1.callback_group

为所有的callback分了组别

类型有

  • MutuallyExclusive;互斥,即这个组别中每时刻只允许1个线程,一个callback在执行时,其他只能等待
  • Reentrant;可重入,这个组别中每时刻允许多个线程,一个Callback在执行时,其他callback可开启新的线程

2.node默认的组别

  • 每个node中有一个默认的callback_group,默认是MutuallyExclusive类型,即便使用了multiThreadedExecutor,也依然默认MutuallyExclusive类型
  • 当node创建一个subscription时,可以设置选项
Node::create_subscription(
  const std::string & topic_name,  //topic名称
  const rclcpp::QoS & qos,
  CallbackT && callback,
  const SubscriptionOptionsWithAllocator & options,  //选项
  typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
    typename rclcpp::subscription_traits::has_message_type::type, AllocatorT>::SharedPtr
  msg_mem_strat)
							
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
this->create_subscription("int1",qos,
			std::bind(&PathSearcherNode::int1Sub, this,
	        std::placeholders::_1),
			rclcpp::SubscriptionOptions());
							
  • 1
  • 2
  • 3
  • 4

这里的SubscriptionOptions()可以设置callback_group组别;如果不设置,默认为node初始化时的callback_group

3.使用MultiThreadedExecutor,想要多线程调用callback

3.1 可设置不同的MutuallyExclusive callback_group

#include  
#include 
using namespace std::chrono_literals;
std::string getTime(){
    time_t tick = (time_t)(rclcpp::Clock().now().seconds());
    struct tm tm;
    char s[100];
    tm = *localtime(&tick);
    strftime(s, sizeof(s), "%Y-%m-%d %H:%M:%S", &tm);
    return std::string(s);
}
std::string string_thread_id()
{
  auto hashed = std::hash()(std::this_thread::get_id());
  return std::to_string(hashed);
}
class MyNode: public rclcpp::Node{
public:
    MyNode(const rclcpp::NodeOptions & options);
    ~MyNode(){}
    void int1Sub(std_msgs::msg::Int32::SharedPtr msg);
    void int2Sub(std_msgs::msg::Int32::SharedPtr msg);

private:
    rclcpp::Subscription::SharedPtr int_sub1_;
    rclcpp::Subscription::SharedPtr int_sub2_;
    rclcpp::callback_group::CallbackGroup::SharedPtr callback_group_sub1_;
    rclcpp::callback_group::CallbackGroup::SharedPtr callback_group_sub2_;
}
MyNode::MyNode(const rclcpp::NodeOptions & options):
    rclcpp::Node("my_node",options){
//两个组别,设置组别类型为MutuallyExclusive
    callback_group_sub1_ = this->create_callback_group(rclcpp::callback_group::CallbackGroupType::MutuallyExclusive);
    callback_group_sub2_ = this->create_callback_group(rclcpp::callback_group::CallbackGroupType::MutuallyExclusive);

    auto sub1_opt = rclcpp::SubscriptionOptions();
    sub1_opt.callback_group = callback_group_sub1_;
    auto sub2_opt = rclcpp::SubscriptionOptions();
    sub2_opt.callback_group = callback_group_sub2_;
    
    int_sub1_ = nh_->create_subscription("int1",10,
        std::bind(&MyNode::int1Sub,this,std::placeholders::_1),
        sub1_opt);
    int_sub2_ = nh_->create_subscription("int2",10,
        std::bind(&MyNode::int2Sub,this,std::placeholders::_1),
        sub2_opt);
}
void MyNode::int1Sub(std_msgs::msg::Int32::SharedPtr msg){
    printf("int1 sub time:%s\n",getTime().c_str());
    printf("thread1 id:%s\n",string_thread_id().c_str());
    rclcpp::sleep_for(10s);
    printf("int1 sub time:%s\n",getTime().c_str());
}
void MyNode::int2Sub(std_msgs::msg::Int32::SharedPtr msg){
    printf("int2 sub time:%s\n",getTime().c_str());
    printf("thread2 id:%s\n",string_thread_id().c_str());
    rclcpp::sleep_for(10s);
    printf("int2 sub time:%s\n",getTime().c_str());
}
int main(int argc, char **argv) {
    rclcpp::init(argc, argv);
    rclcpp::executors::MultiThreadedExecutor executor(rclcpp::executor::ExecutorArgs(),5,true);

    auto node = std::make_shared(rclcpp::NodeOptions());
    executor.add_node(node);
    printf("threads   %d\n",executor.get_number_of_threads());
    executor.spin();
    rclcpp::shutdown();
    return EXIT_SUCCESS;
}
										
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70

这时,如果同时收到了/int1和/int2 的消息,两个callback都能执行,但如果接连收到了两个/int1,只能调1个int1Sub;

3.2 设置同一个Reentrant callback_group

//同一个组别,类型设置为Reentrant
callback_group_sub_ = this->create_callback_group(rclcpp::callback_group::CallbackGroupType::Reentrant);
auto sub_opt = rclcpp::SubscriptionOptions();
sub_opt.callback_group = callback_group_sub_;
    
    
int_sub1_ = nh_->create_subscription("int1",10,
        std::bind(&MyNode::int1Sub,this,std::placeholders::_1),
        sub_opt);
int_sub2_ = nh_->create_subscription("int2",10,
        std::bind(&MyNode::int2Sub,this,std::placeholders::_1),
        sub_opt);
												
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

这时,如果同时收到多个/int1和/int2,开启的线程数会受电脑内核和初始化MultiThreadedExecutor时设置的线程数决

阅读(3832) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~