Chinaunix首页 | 论坛 | 博客
  • 博客访问: 673334
  • 博文数量: 404
  • 博客积分: 10
  • 博客等级: 民兵
  • 技术积分: 1237
  • 用 户 组: 普通用户
  • 注册时间: 2011-03-03 10:45
文章分类

全部博文(404)

文章存档

2017年(1)

2016年(27)

2015年(39)

2014年(55)

2013年(66)

2012年(216)

分类: LINUX

2016-07-04 14:45:11

浅析dbus-session后台daemon如何处理client们发送过来的mesage
《浅析应用程序dbus_bus_get函数在lib库中的实际执行流程》

应用程序首先调用dbus_bus_get()于dbus-session建立连接
dbus_bus_register
==>message = dbus_message_new_method_call (DBUS_SERVICE_DBUS,
                                          DBUS_PATH_DBUS,
                                          DBUS_INTERFACE_DBUS,
                                          "Hello");
// 等待peer发送reply回来
==>reply = dbus_connection_send_with_reply_and_block (connection, message, -1, error);
==>dbus_message_get_args (reply, error,
                          DBUS_TYPE_STRING, &name, // 获取peer返回回来的unique_name数据
                          DBUS_TYPE_INVALID);

这时dbus-session后台daemon发现了该应用程序打算与自己建立连接,随后触发socket_handle_watch()
回调函数,为该应用程序建立该应用程序对应的transport和connection存储体[luther.gliethttp]

socket_handle_watch
==>client_fd = _dbus_accept (listen_fd);
==>handle_new_client_fd_and_unlock (server, client_fd);
==*>为client的连接创建transport,该transport的auth_mechanisms等于server->auth_mechanisms,
    然后在transport之上继续创建connection,最终位于server中的为了该client建立的独立transport,
    也将执行与应用程序端建立的transport同样的_dbus_connection_handle_watch()数据处理函数.


下面是daemon程序dbus-session对来自网络上client们的message进行细化处理的整体代码流程.[luther.gliethttp]
main
==>_dbus_loop_run
==*>_dbus_loop_iterate
    1.添加loop->callbacks中DBUS_WATCH_READABLE﹑DBUS_WATCH_WRITABLE和TIMEOUT_CALLBACK到poll的fds中
    2.n_ready = _dbus_poll (fds, n_fds, timeout); //等待众多事件之一发生[luther.gliethttp]
    3.如果timeout了,那么调用timeout的callback回调函数,如果数据来了,比如:_dbus_connection_handle_watch.

接下来看看_dbus_connection_handle_watch是如何处理来自网络的数据的[luther.gliethttp]
_dbus_connection_handle_watch
==>_dbus_transport_handle_watch (connection->transport, watch, condition);
==>(* transport->vtable->handle_watch) (transport, watch, condition);
即:socket_vtable.socket_handle_watch()处理函数.
==>socket_handle_watch
如果为read,则
==*>do_reading (transport)
将数据读取到transport->loader->data字符串存储空间中.
==**>_dbus_transport_queue_messages
dbus_bool_t
_dbus_transport_queue_messages (DBusTransport *transport)
{
  DBusDispatchStatus status;

#if 0
  _dbus_verbose ("_dbus_transport_queue_messages()\n");
#endif
 
  /* Queue any messages */
  // 将零碎的data片段数据,慢慢堆积到一个message大小,如果已经接收到该message所需的len长度数据
  // 即生成了一封完整的message,
  // 那么_dbus_transport_get_dispatch_status将始终返回DBUS_DISPATCH_DATA_REMAINS,源码见下面.
  while ((status = _dbus_transport_get_dispatch_status (transport)) == DBUS_DISPATCH_DATA_REMAINS)
    {
      DBusMessage *message;
      DBusList *link;
// 从网络上收到了一封完整的message数据.
      link = _dbus_message_loader_pop_message_link (transport->loader); // 从transport->loader弹出该封完整的message
      _dbus_assert (link != NULL);
     
      message = link->data;
           
      _dbus_verbose ("queueing received message %p\n", message);

      if (!_dbus_message_add_size_counter (message, transport->live_messages_size))
// 申请
// link->data->value += message->size_counter_delta;这里link->data指向transport->live_messages_size.
// 所以最终等效于transport->live_messages_size->value += message->size_counter_delta;
// 然后将link添加到message->size_counters链表中,这样作的目的在于,
// 当message处理完毕之后,message释放函数_dbus_message_remove_size_counter将调用
// _dbus_counter_adjust (counter, - message->size_counter_delta);
// 这样当message处理完毕之后,transport->live_messages_size的计数将再减去message->size_counter_delta
// 即: transport->live_messages_size->value -= message->size_counter_delta;[luther.gliethttp]
        {
          _dbus_message_loader_putback_message_link (transport->loader,
                                                     link);
          status = DBUS_DISPATCH_NEED_MEMORY;
          break;
        }
      else
        {
          /* pass ownership of link and message ref to connection */
          // 添加到connection->incoming_messages链表中,源码见下面[luther.gliethttp]
          _dbus_connection_queue_received_message_link (transport->connection,
                                                        link);
        }
    }

  if (_dbus_message_loader_get_is_corrupted (transport->loader))
    {
      _dbus_verbose ("Corrupted message stream, disconnecting\n");
      _dbus_transport_disconnect (transport);
    }

  return status != DBUS_DISPATCH_NEED_MEMORY;
}

void
_dbus_connection_queue_received_message_link (DBusConnection  *connection,
                                              DBusList        *link)
{
  DBusPendingCall *pending;
  dbus_uint32_t reply_serial;
  DBusMessage *message;
 
  _dbus_assert (_dbus_transport_get_is_authenticated (connection->transport));
 
  _dbus_list_append_link (&connection->incoming_messages,
                          link); // 将link添加到connection->incoming_messages中,link->data就等于message.
  message = link->data;

  /* If this is a reply we're waiting on, remove timeout for it */
  reply_serial = dbus_message_get_reply_serial (message); // 获取刚刚接收到的message的serial id号.
  if (reply_serial != 0)
    {
      pending = _dbus_hash_table_lookup_int (connection->pending_replies,
                                             reply_serial);
// 查看pending_replies哈西表中是否有等待以该serial id号作为reply回应数据的pending操作.
      if (pending != NULL)
    {
      if (_dbus_pending_call_is_timeout_added_unlocked (pending))
// 如果添加了timeout到pending中,那么移除它[luther.gliethttp]
            _dbus_connection_remove_timeout_unlocked (connection,
                                                      _dbus_pending_call_get_timeout_unlocked (pending));

      _dbus_pending_call_set_timeout_added_unlocked (pending, FALSE); // 修改状态为timeout未添加[luther.gliethttp]
    }
    }
 
 

  connection->n_incoming += 1; // 来自网络的报文个数加1.

  _dbus_connection_wakeup_mainloop (connection); // 唤醒主loop
 
}

DBusDispatchStatus
_dbus_transport_get_dispatch_status (DBusTransport *transport)
{
  if (_dbus_counter_get_value (transport->live_messages_size) >= transport->max_live_messages_size)
    return DBUS_DISPATCH_COMPLETE; /* complete for now */

  if (!_dbus_transport_get_is_authenticated (transport))
    {
      if (_dbus_auth_do_work (transport->auth) ==
          DBUS_AUTH_STATE_WAITING_FOR_MEMORY)
        return DBUS_DISPATCH_NEED_MEMORY;
      else if (!_dbus_transport_get_is_authenticated (transport))
        return DBUS_DISPATCH_COMPLETE;
    }

  if (!transport->unused_bytes_recovered &&
      !recover_unused_bytes (transport))
    return DBUS_DISPATCH_NEED_MEMORY;

  transport->unused_bytes_recovered = TRUE;
 
  if (!_dbus_message_loader_queue_messages (transport->loader))
// 尝试从网络数据区transport->loader->data组建一封完整message,源码见下面
    return DBUS_DISPATCH_NEED_MEMORY;

  if (_dbus_message_loader_peek_message (transport->loader) != NULL) // 查看loader->messages链表是否有完整的message了.
    return DBUS_DISPATCH_DATA_REMAINS;
  else
    return DBUS_DISPATCH_COMPLETE;
}

dbus_bool_t
_dbus_message_loader_queue_messages (DBusMessageLoader *loader)
{
  while (!loader->corrupted &&
         _dbus_string_get_length (&loader->data) >= DBUS_MINIMUM_HEADER_SIZE) // 当前网络接收到的数据至少保证存放头部控制信息的16字节
    {
      DBusValidity validity;
      int byte_order, fields_array_len, header_len, body_len;

      if (_dbus_header_have_message_untrusted (loader->max_message_size,
                                               &validity,
                                               &byte_order,
                                               &fields_array_len,
                                               &header_len,
                                               &body_len,
                                               &loader->data, 0,
                                               _dbus_string_get_length (&loader->data)))
//  当前网络接收到的数据个数如果已经超过了该message下header+body_len长度,那么说明该message已经完整接收
//  所以执行下面的函数进行message重建,最后将重建完成的有效的message追加到loader->messages链表中[luther.gliethttp]
        {
          DBusMessage *message;

          _dbus_assert (validity == DBUS_VALID);

          message = dbus_message_new_empty_header ();
          if (message == NULL)
            return FALSE;
// 进行message重建,最后将重建完成的有效的message追加到loader->messages链表中[luther.gliethttp]
          if (!load_message (loader, message,
                             byte_order, fields_array_len,
                             header_len, body_len))
            {
              dbus_message_unref (message);
              /* load_message() returns false if corrupted or OOM; if
               * corrupted then return TRUE for not OOM
               */
              return loader->corrupted;
            }

          _dbus_assert (loader->messages != NULL);
          _dbus_assert (_dbus_list_find_last (&loader->messages, message) != NULL);
    }
      else
        {
          _dbus_verbose ("Initial peek at header says we don't have a whole message yet, or data broken with invalid code %d\n",
                         validity);
          if (validity != DBUS_VALID)
            {
              loader->corrupted = TRUE;
              loader->corruption_reason = validity;
            }
          return TRUE;
        }
    }

  return TRUE;
}
// 上面_dbus_connection_handle_watch将调用
// dispatch_status_function将connection添加到loop->need_dispatch中
_dbus_connection_update_dispatch_status_and_unlock
==>connection->dispatch_status_function

connection->dispatch_status_function = dispatch_status_function;
server->new_connection_function = bus_connections_setup_connection; // 当有新连接时socket_handle_watch将添加新连接对应的transport和对应的connection,这时将调用server->new_connection_function()回调函数.

dispatch_status_function或者bus_connections_setup_connection
==>_dbus_loop_queue_dispatch
==>_dbus_list_append (&loop->need_dispatch, connection)




好了上面_dbus_connection_handle_watch()回调处理函数,已经从网络数据中解析出一封message消息,接下来就是调用
_dbus_loop_dispatch()函数进行message细分处理了[luther.gliethttp].

==>_dbus_loop_dispatch
==*>DBusConnection *connection = _dbus_list_pop_first (&loop->need_dispatch);
    dbus_connection_dispatch(connection); // 处理该connection接收到的incoming_messages数据包[luther.gliethttp].
==**>message_link = _dbus_connection_pop_message_link_unlocked (connection); // 从incoming_messages弹出一个message.
     reply_serial = dbus_message_get_reply_serial (message); // 读取message的serial id号
     // 查找是否pending_replies中有人正在等待该serial id号消息到来[luther.gliethttp]
     pending = _dbus_hash_table_lookup_int (connection->pending_replies,
                                         reply_serial);
     // 如果有pending,即正在等待reply消息返回,那么执行
     // complete_pending_call_and_unlock (connection, pending, message); // 设置pending->reply = message;
     // 然后goto out;完成退出.
     // 否则继续执行下面的代码[luther.gliethttp]
     // 先检查该message是否可被builtin的东西处理,主要是处理interface为"org.freedesktop.DBus.Peer",
     // 同时method为"Ping"或"GetMachineId"的消息.
     result = _dbus_connection_run_builtin_filters_unlocked_no_update (connection, message);
     // 接下来执行connection->filter_list链表上的所有过滤函数[luther.gliethttp]
     // 首先,_dbus_message_filter_ref增减引用计数filter->refcount++;
      _dbus_list_foreach (&filter_list_copy,
              (DBusForeachFunction)_dbus_message_filter_ref,
              NULL);
     // 接下来执行 connection->filter_list链表上的所有过滤函数
     // 所有connection->filter_list链表添加都是由dbus_connection_add_filter()完成.
     // 只有一个地方调用了dbus_connection_add_filter()函数,就是bus_dispatch_add_connection()
     // 它设置的回调函数为bus_dispatch_message_filter();
     //
     // 整个流程是这样的
     // server_watch_callback
// DBUS_SESSION_BUS_ADDRESS=unix:abstract=/tmp/dbus-0jpvwWTzEb,guid=e4295f4ec1c581bb5a971bfb4a401c5e
// 该unix通道有client尝试连接,将执行下面的一系列函数.
     // ==>socket_handle_watch
     // ==*>handle_new_client_fd_and_unlock
     // ==**>server->new_connection_function = new_connection_callback;
     // ==***>new_connection_callback
     // ==****>bus_connections_setup_connection
     // ==*****>bus_dispatch_add_connection (connection)
     // ==******>_dbus_list_append (&connection->filter_list, bus_dispatch_message_filter);

     (* filter->function) (connection, message, filter->user_data);
     // 这之后数据处理工作就算完成了.


接下来我们就来看看唯一的一个filter_func函数bus_dispatch_message_filter()[luther.gliethttp]
bus_dispatch_message_filter
==>bus_dispatch
// 如果应用程序发送了
// dbus_message_new_method_call (DBUS_SERVICE_DBUS, DBUS_PATH_DBUS, DBUS_INTERFACE_DBUS, "Hello");
   service_name = dbus_message_get_destination (message); // 获取目的路径DBUS_SERVICE_DBUS
   transaction = bus_transaction_new (context); // 创建一个transaction操作片
// 1.如果service_name等于DBUS_SERVICE_DBUS那么
   bus_context_check_security_policy(context, transaction,
                                    connection, NULL, NULL, message, &error); // 做安全检查
   bus_driver_handle_message (connection, transaction, message, &error)
// 交由bus总线驱动处理该message[luther.gliethttp]
// 2.如果service_name非DBUS_SERVICE_DBUS,那么dbus-session将作route路由该message消息[luther.gliethttp]
   bus_registry_lookup (registry, &service_string);
// 当安全校验全部通过之后,将该transaction操作片route路由到addressed_recipient连接对应的transport上[luther.gliethttp]
   bus_transaction_send (transaction, addressed_recipient, message);

我们现在关心的只是应用程序启动注册部分,所以我们下面将着重讨论的是bus_driver_handle_message--这个bus总线
相应interface接口下method方法处理集[luther.gliethttp]
==>bus_driver_handle_message
static struct
{
  const char *name;
  const char *in_args;
  const char *out_args;
  dbus_bool_t (* handler) (DBusConnection *connection,
                           BusTransaction *transaction,
                           DBusMessage    *message,
                           DBusError      *error);
} message_handlers[] = {
  { "Hello",
    "",
    DBUS_TYPE_STRING_AS_STRING,
    bus_driver_handle_hello },
    ......
 };
==>bus_driver_handle_hello
static dbus_bool_t
bus_driver_handle_hello (DBusConnection *connection,
                         BusTransaction *transaction,
                         DBusMessage    *message,
                         DBusError      *error)
{
  DBusString unique_name;
  BusService *service;
  dbus_bool_t retval;
  BusRegistry *registry;
  BusConnections *connections;

  _DBUS_ASSERT_ERROR_IS_CLEAR (error);

  if (bus_connection_is_active (connection))
    {
      /* We already handled an Hello message for this connection. */
      dbus_set_error (error, DBUS_ERROR_FAILED,
                      "Already handled an Hello message");
      return FALSE;
    }

  /* Note that when these limits are exceeded we don't disconnect the
   * connection; we just sort of leave it hanging there until it times
   * out or disconnects itself or is dropped due to the max number of
   * incomplete connections. It's even OK if the connection wants to
   * retry the hello message, we support that.
   */
  connections = bus_connection_get_connections (connection);
  if (!bus_connections_check_limits (connections, connection,
                                     error))
    {
      _DBUS_ASSERT_ERROR_IS_SET (error);
      return FALSE;
    }
 
  if (!_dbus_string_init (&unique_name))
    {
      BUS_SET_OOM (error);
      return FALSE;
    }

  retval = FALSE;

  registry = bus_connection_get_registry (connection);
// 为连接到该dbus-session daemon的应用程序创建unique_name, 源码见下面
// unique_name将等于":1.12345678"之类[luther.gliethttp]
  if (!create_unique_client_name (registry, &unique_name))
    {
      BUS_SET_OOM (error);
      goto out_0;
    }
// d = BUS_CONNECTION_DATA (connection);
// 设置_dbus_string_copy_data (name, &d->name)
// 即:connection->slot_list[connection_data_slot]->name = ":1.12345678";
// 设置 policy
// d->policy = bus_context_create_client_policy (d->connections->context, connection, error);
  if (!bus_connection_complete (connection, &unique_name, error))
    {
      _DBUS_ASSERT_ERROR_IS_SET (error);
      goto out_0;
    }
// 设置sender域为":1.12345678"
  if (!dbus_message_set_sender (message,
                                bus_connection_get_name (connection)))
    {
      BUS_SET_OOM (error);
      goto out_0;
    }
// 发送带有参数unique_name的message给应用程序,应用程序正等待该unique_name参数到来[luther.gliethttp]
// 源码见下面
  if (!bus_driver_send_welcome_message (connection, message, transaction, error))
    goto out_0;

  /* Create the service */
  service = bus_registry_ensure (registry,
                                 &unique_name, connection, 0, transaction, error);
  if (service == NULL)
    goto out_0;
 
  _dbus_assert (bus_connection_is_active (connection));
  retval = TRUE;
 
 out_0:
  _dbus_string_free (&unique_name);
  return retval;
}

static dbus_bool_t
create_unique_client_name (BusRegistry *registry,
                           DBusString  *str)
{
  /* We never want to use the same unique client name twice, because
   * we want to guarantee that if you send a message to a given unique
   * name, you always get the same application. So we use two numbers
   * for INT_MAX * INT_MAX combinations, should be pretty safe against
   * wraparound.
   */
  /* FIXME these should be in BusRegistry rather than static vars */
  static int next_major_number = 0;
  static int next_minor_number = 0;
  int len;
 
  len = _dbus_string_get_length (str); // 这里len等于0
 
  while (TRUE)
    {
      /* start out with 1-0, go to 1-1, 1-2, 1-3,
       * up to 1-MAXINT, then 2-0, 2-1, etc.
       */
      if (next_minor_number <= 0) // minor大到了0x7fffffff,那么major加1,minor归0,进入下一轮.
        {
          next_major_number += 1;
          next_minor_number = 0;
          if (next_major_number <= 0)
            _dbus_assert_not_reached ("INT_MAX * INT_MAX clients were added");
        }

      _dbus_assert (next_major_number > 0);
      _dbus_assert (next_minor_number >= 0);

      /* appname:MAJOR-MINOR */
     
      if (!_dbus_string_append (str, ":"))
        return FALSE;
     
      if (!_dbus_string_append_int (str, next_major_number))
        return FALSE;

      if (!_dbus_string_append (str, "."))
        return FALSE;
     
      if (!_dbus_string_append_int (str, next_minor_number))
        return FALSE;

      next_minor_number += 1;
     
      /* Check if a client with the name exists */
      // 务必确认该unique_name在所有连接中确实是唯一的,没有存在过的[luther.gliethttp]
      if (bus_registry_lookup (registry, str) == NULL)
    break;

      /* drop the number again, try the next one. */
      _dbus_string_set_length (str, len);
    }

  return TRUE;
}

static dbus_bool_t
bus_driver_send_welcome_message (DBusConnection *connection,
                                 DBusMessage    *hello_message,
                                 BusTransaction *transaction,
                                 DBusError      *error)
{
  DBusMessage *welcome;
  const char *name;

  _DBUS_ASSERT_ERROR_IS_CLEAR (error);
 
  name = bus_connection_get_name (connection);
  _dbus_assert (name != NULL);
 
  welcome = dbus_message_new_method_return (hello_message); // 生成method方法调用返回message内存单元.
  if (welcome == NULL)
    {
      BUS_SET_OOM (error);
      return FALSE;
    }
 
  if (!dbus_message_append_args (welcome,
// 向该message追加参数,unique_name内容为":1.12345678", 调用dbus_bus_get()库函数的应用程序正在
// 饥渴的等着unique_name到来呢[luther.gliethttp].
                                 DBUS_TYPE_STRING, &name,
                                 DBUS_TYPE_INVALID))
    {
      dbus_message_unref (welcome);
      BUS_SET_OOM (error);
      return FALSE;
    }

  _dbus_assert (dbus_message_has_signature (welcome, DBUS_TYPE_STRING_AS_STRING));
// 将数据从dbus-session这个daemon发送到调用dbus_bus_get()库函数的应用程序[luther.gliethttp]
// 在这里会做各种合法性检查,包括该connection是否允许send发送数据,是否允许recive接收数据,selinux安全等[lutehr.gliethttp]
  if (!bus_transaction_send_from_driver (transaction, connection, welcome))
    {
      dbus_message_unref (welcome);
      BUS_SET_OOM (error);
      return FALSE;
    }
  else
    {
      dbus_message_unref (welcome);
      return TRUE;
    }
}
阅读(1453) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~