浅析dbus-session后台daemon如何处理client们发送过来的mesage
《浅析应用程序dbus_bus_get函数在lib库中的实际执行流程》应用程序首先调用dbus_bus_get()于dbus-session建立连接
dbus_bus_register
==>message = dbus_message_new_method_call (DBUS_SERVICE_DBUS,
DBUS_PATH_DBUS,
DBUS_INTERFACE_DBUS,
"Hello");
// 等待peer发送reply回来
==>reply = dbus_connection_send_with_reply_and_block (connection, message, -1, error);
==>dbus_message_get_args (reply, error,
DBUS_TYPE_STRING, &name, // 获取peer返回回来的unique_name数据
DBUS_TYPE_INVALID);
这时dbus-session后台daemon发现了该应用程序打算与自己建立连接,随后触发socket_handle_watch()
回调函数,为该应用程序建立该应用程序对应的transport和connection存储体[luther.gliethttp]
socket_handle_watch
==>client_fd = _dbus_accept (listen_fd);
==>handle_new_client_fd_and_unlock (server, client_fd);
==*>为client的连接创建transport,该transport的auth_mechanisms等于server->auth_mechanisms,
然后在transport之上继续创建connection,最终位于server中的为了该client建立的独立transport,
也将执行与应用程序端建立的transport同样的_dbus_connection_handle_watch()数据处理函数.
下面是daemon程序dbus-session对来自网络上client们的message进行细化处理的整体代码流程.[luther.gliethttp]
main
==>_dbus_loop_run
==*>_dbus_loop_iterate
1.添加loop->callbacks中DBUS_WATCH_READABLE﹑DBUS_WATCH_WRITABLE和TIMEOUT_CALLBACK到poll的fds中
2.n_ready = _dbus_poll (fds, n_fds, timeout); //等待众多事件之一发生[luther.gliethttp]
3.如果timeout了,那么调用timeout的callback回调函数,如果数据来了,比如:_dbus_connection_handle_watch.
接下来看看_dbus_connection_handle_watch是如何处理来自网络的数据的[luther.gliethttp]
_dbus_connection_handle_watch
==>_dbus_transport_handle_watch (connection->transport, watch, condition);
==>(* transport->vtable->handle_watch) (transport, watch, condition);
即:socket_vtable.socket_handle_watch()处理函数.
==>socket_handle_watch
如果为read,则
==*>do_reading (transport)
将数据读取到transport->loader->data字符串存储空间中.
==**>_dbus_transport_queue_messages
dbus_bool_t
_dbus_transport_queue_messages (DBusTransport *transport)
{
DBusDispatchStatus status;
#if 0
_dbus_verbose ("_dbus_transport_queue_messages()\n");
#endif
/* Queue any messages */
// 将零碎的data片段数据,慢慢堆积到一个message大小,如果已经接收到该message所需的len长度数据
// 即生成了一封完整的message,
// 那么_dbus_transport_get_dispatch_status将始终返回DBUS_DISPATCH_DATA_REMAINS,源码见下面.
while ((status = _dbus_transport_get_dispatch_status (transport)) == DBUS_DISPATCH_DATA_REMAINS)
{
DBusMessage *message;
DBusList *link;
// 从网络上收到了一封完整的message数据.
link = _dbus_message_loader_pop_message_link (transport->loader); // 从transport->loader弹出该封完整的message
_dbus_assert (link != NULL);
message = link->data;
_dbus_verbose ("queueing received message %p\n", message);
if (!_dbus_message_add_size_counter (message, transport->live_messages_size))
// 申请
// link->data->value += message->size_counter_delta;这里link->data指向transport->live_messages_size.
// 所以最终等效于transport->live_messages_size->value += message->size_counter_delta;
// 然后将link添加到message->size_counters链表中,这样作的目的在于,
// 当message处理完毕之后,message释放函数_dbus_message_remove_size_counter将调用
// _dbus_counter_adjust (counter, - message->size_counter_delta);
// 这样当message处理完毕之后,transport->live_messages_size的计数将再减去message->size_counter_delta
// 即: transport->live_messages_size->value -= message->size_counter_delta;[luther.gliethttp]
{
_dbus_message_loader_putback_message_link (transport->loader,
link);
status = DBUS_DISPATCH_NEED_MEMORY;
break;
}
else
{
/* pass ownership of link and message ref to connection */
// 添加到connection->incoming_messages链表中,源码见下面[luther.gliethttp]
_dbus_connection_queue_received_message_link (transport->connection,
link);
}
}
if (_dbus_message_loader_get_is_corrupted (transport->loader))
{
_dbus_verbose ("Corrupted message stream, disconnecting\n");
_dbus_transport_disconnect (transport);
}
return status != DBUS_DISPATCH_NEED_MEMORY;
}
void
_dbus_connection_queue_received_message_link (DBusConnection *connection,
DBusList *link)
{
DBusPendingCall *pending;
dbus_uint32_t reply_serial;
DBusMessage *message;
_dbus_assert (_dbus_transport_get_is_authenticated (connection->transport));
_dbus_list_append_link (&connection->incoming_messages,
link); // 将link添加到connection->incoming_messages中,link->data就等于message.
message = link->data;
/* If this is a reply we're waiting on, remove timeout for it */
reply_serial = dbus_message_get_reply_serial (message); // 获取刚刚接收到的message的serial id号.
if (reply_serial != 0)
{
pending = _dbus_hash_table_lookup_int (connection->pending_replies,
reply_serial);
// 查看pending_replies哈西表中是否有等待以该serial id号作为reply回应数据的pending操作.
if (pending != NULL)
{
if (_dbus_pending_call_is_timeout_added_unlocked (pending))
// 如果添加了timeout到pending中,那么移除它[luther.gliethttp]
_dbus_connection_remove_timeout_unlocked (connection,
_dbus_pending_call_get_timeout_unlocked (pending));
_dbus_pending_call_set_timeout_added_unlocked (pending, FALSE); // 修改状态为timeout未添加[luther.gliethttp]
}
}
connection->n_incoming += 1; // 来自网络的报文个数加1.
_dbus_connection_wakeup_mainloop (connection); // 唤醒主loop
}
DBusDispatchStatus
_dbus_transport_get_dispatch_status (DBusTransport *transport)
{
if (_dbus_counter_get_value (transport->live_messages_size) >= transport->max_live_messages_size)
return DBUS_DISPATCH_COMPLETE; /* complete for now */
if (!_dbus_transport_get_is_authenticated (transport))
{
if (_dbus_auth_do_work (transport->auth) ==
DBUS_AUTH_STATE_WAITING_FOR_MEMORY)
return DBUS_DISPATCH_NEED_MEMORY;
else if (!_dbus_transport_get_is_authenticated (transport))
return DBUS_DISPATCH_COMPLETE;
}
if (!transport->unused_bytes_recovered &&
!recover_unused_bytes (transport))
return DBUS_DISPATCH_NEED_MEMORY;
transport->unused_bytes_recovered = TRUE;
if (!_dbus_message_loader_queue_messages (transport->loader))
// 尝试从网络数据区transport->loader->data组建一封完整message,源码见下面
return DBUS_DISPATCH_NEED_MEMORY;
if (_dbus_message_loader_peek_message (transport->loader) != NULL) // 查看loader->messages链表是否有完整的message了.
return DBUS_DISPATCH_DATA_REMAINS;
else
return DBUS_DISPATCH_COMPLETE;
}
dbus_bool_t
_dbus_message_loader_queue_messages (DBusMessageLoader *loader)
{
while (!loader->corrupted &&
_dbus_string_get_length (&loader->data) >= DBUS_MINIMUM_HEADER_SIZE) // 当前网络接收到的数据至少保证存放头部控制信息的16字节
{
DBusValidity validity;
int byte_order, fields_array_len, header_len, body_len;
if (_dbus_header_have_message_untrusted (loader->max_message_size,
&validity,
&byte_order,
&fields_array_len,
&header_len,
&body_len,
&loader->data, 0,
_dbus_string_get_length (&loader->data)))
// 当前网络接收到的数据个数如果已经超过了该message下header+body_len长度,那么说明该message已经完整接收
// 所以执行下面的函数进行message重建,最后将重建完成的有效的message追加到loader->messages链表中[luther.gliethttp]
{
DBusMessage *message;
_dbus_assert (validity == DBUS_VALID);
message = dbus_message_new_empty_header ();
if (message == NULL)
return FALSE;
// 进行message重建,最后将重建完成的有效的message追加到loader->messages链表中[luther.gliethttp]
if (!load_message (loader, message,
byte_order, fields_array_len,
header_len, body_len))
{
dbus_message_unref (message);
/* load_message() returns false if corrupted or OOM; if
* corrupted then return TRUE for not OOM
*/
return loader->corrupted;
}
_dbus_assert (loader->messages != NULL);
_dbus_assert (_dbus_list_find_last (&loader->messages, message) != NULL);
}
else
{
_dbus_verbose ("Initial peek at header says we don't have a whole message yet, or data broken with invalid code %d\n",
validity);
if (validity != DBUS_VALID)
{
loader->corrupted = TRUE;
loader->corruption_reason = validity;
}
return TRUE;
}
}
return TRUE;
}
// 上面_dbus_connection_handle_watch将调用
// dispatch_status_function将connection添加到loop->need_dispatch中
_dbus_connection_update_dispatch_status_and_unlock
==>connection->dispatch_status_function
connection->dispatch_status_function = dispatch_status_function;
server->new_connection_function = bus_connections_setup_connection; // 当有新连接时socket_handle_watch将添加新连接对应的transport和对应的connection,这时将调用server->new_connection_function()回调函数.
dispatch_status_function或者bus_connections_setup_connection
==>_dbus_loop_queue_dispatch
==>_dbus_list_append (&loop->need_dispatch, connection)
好了上面_dbus_connection_handle_watch()回调处理函数,已经从网络数据中解析出一封message消息,接下来就是调用
_dbus_loop_dispatch()函数进行message细分处理了[luther.gliethttp].
==>_dbus_loop_dispatch
==*>DBusConnection *connection = _dbus_list_pop_first (&loop->need_dispatch);
dbus_connection_dispatch(connection); // 处理该connection接收到的incoming_messages数据包[luther.gliethttp].
==**>message_link = _dbus_connection_pop_message_link_unlocked (connection); // 从incoming_messages弹出一个message.
reply_serial = dbus_message_get_reply_serial (message); // 读取message的serial id号
// 查找是否pending_replies中有人正在等待该serial id号消息到来[luther.gliethttp]
pending = _dbus_hash_table_lookup_int (connection->pending_replies,
reply_serial);
// 如果有pending,即正在等待reply消息返回,那么执行
// complete_pending_call_and_unlock (connection, pending, message); // 设置pending->reply = message;
// 然后goto out;完成退出.
// 否则继续执行下面的代码[luther.gliethttp]
// 先检查该message是否可被builtin的东西处理,主要是处理interface为"org.freedesktop.DBus.Peer",
// 同时method为"Ping"或"GetMachineId"的消息.
result = _dbus_connection_run_builtin_filters_unlocked_no_update (connection, message);
// 接下来执行connection->filter_list链表上的所有过滤函数[luther.gliethttp]
// 首先,_dbus_message_filter_ref增减引用计数filter->refcount++;
_dbus_list_foreach (&filter_list_copy,
(DBusForeachFunction)_dbus_message_filter_ref,
NULL);
// 接下来执行 connection->filter_list链表上的所有过滤函数
// 所有connection->filter_list链表添加都是由dbus_connection_add_filter()完成.
// 只有一个地方调用了dbus_connection_add_filter()函数,就是bus_dispatch_add_connection()
// 它设置的回调函数为bus_dispatch_message_filter();
//
// 整个流程是这样的
// server_watch_callback
// DBUS_SESSION_BUS_ADDRESS=unix:abstract=/tmp/dbus-0jpvwWTzEb,guid=e4295f4ec1c581bb5a971bfb4a401c5e
// 该unix通道有client尝试连接,将执行下面的一系列函数.
// ==>socket_handle_watch
// ==*>handle_new_client_fd_and_unlock
// ==**>server->new_connection_function = new_connection_callback;
// ==***>new_connection_callback
// ==****>bus_connections_setup_connection
// ==*****>bus_dispatch_add_connection (connection)
// ==******>_dbus_list_append (&connection->filter_list, bus_dispatch_message_filter);
(* filter->function) (connection, message, filter->user_data);
// 这之后数据处理工作就算完成了.
接下来我们就来看看唯一的一个filter_func函数bus_dispatch_message_filter()[luther.gliethttp]
bus_dispatch_message_filter
==>bus_dispatch
// 如果应用程序发送了
// dbus_message_new_method_call (DBUS_SERVICE_DBUS, DBUS_PATH_DBUS, DBUS_INTERFACE_DBUS, "Hello");
service_name = dbus_message_get_destination (message); // 获取目的路径DBUS_SERVICE_DBUS
transaction = bus_transaction_new (context); // 创建一个transaction操作片
// 1.如果service_name等于DBUS_SERVICE_DBUS那么
bus_context_check_security_policy(context, transaction,
connection, NULL, NULL, message, &error); // 做安全检查
bus_driver_handle_message (connection, transaction, message, &error)
// 交由bus总线驱动处理该message[luther.gliethttp]
// 2.如果service_name非DBUS_SERVICE_DBUS,那么dbus-session将作route路由该message消息[luther.gliethttp]
bus_registry_lookup (registry, &service_string);
// 当安全校验全部通过之后,将该transaction操作片route路由到addressed_recipient连接对应的transport上[luther.gliethttp]
bus_transaction_send (transaction, addressed_recipient, message);
我们现在关心的只是应用程序启动注册部分,所以我们下面将着重讨论的是bus_driver_handle_message--这个bus总线
相应interface接口下method方法处理集[luther.gliethttp]
==>bus_driver_handle_message
static struct
{
const char *name;
const char *in_args;
const char *out_args;
dbus_bool_t (* handler) (DBusConnection *connection,
BusTransaction *transaction,
DBusMessage *message,
DBusError *error);
} message_handlers[] = {
{ "Hello",
"",
DBUS_TYPE_STRING_AS_STRING,
bus_driver_handle_hello },
......
};
==>bus_driver_handle_hello
static dbus_bool_t
bus_driver_handle_hello (DBusConnection *connection,
BusTransaction *transaction,
DBusMessage *message,
DBusError *error)
{
DBusString unique_name;
BusService *service;
dbus_bool_t retval;
BusRegistry *registry;
BusConnections *connections;
_DBUS_ASSERT_ERROR_IS_CLEAR (error);
if (bus_connection_is_active (connection))
{
/* We already handled an Hello message for this connection. */
dbus_set_error (error, DBUS_ERROR_FAILED,
"Already handled an Hello message");
return FALSE;
}
/* Note that when these limits are exceeded we don't disconnect the
* connection; we just sort of leave it hanging there until it times
* out or disconnects itself or is dropped due to the max number of
* incomplete connections. It's even OK if the connection wants to
* retry the hello message, we support that.
*/
connections = bus_connection_get_connections (connection);
if (!bus_connections_check_limits (connections, connection,
error))
{
_DBUS_ASSERT_ERROR_IS_SET (error);
return FALSE;
}
if (!_dbus_string_init (&unique_name))
{
BUS_SET_OOM (error);
return FALSE;
}
retval = FALSE;
registry = bus_connection_get_registry (connection);
// 为连接到该dbus-session daemon的应用程序创建unique_name, 源码见下面
// unique_name将等于":1
.12345678"之类[luther.gliethttp]
if (!create_unique_client_name (registry, &unique_name))
{
BUS_SET_OOM (error);
goto out_0;
}
// d = BUS_CONNECTION_DATA (connection);
// 设置_dbus_string_copy_data (name, &d->name)
// 即:connection->slot_list[connection_data_slot]->name = ":1
.12345678";
// 设置 policy
// d->policy = bus_context_create_client_policy (d->connections->context, connection, error);
if (!bus_connection_complete (connection, &unique_name, error))
{
_DBUS_ASSERT_ERROR_IS_SET (error);
goto out_0;
}
// 设置sender域为":1
.12345678"
if (!dbus_message_set_sender (message,
bus_connection_get_name (connection)))
{
BUS_SET_OOM (error);
goto out_0;
}
// 发送带有参数unique_name的message给应用程序,应用程序正等待该unique_name参数到来[luther.gliethttp]
// 源码见下面
if (!bus_driver_send_welcome_message (connection, message, transaction, error))
goto out_0;
/* Create the service */
service = bus_registry_ensure (registry,
&unique_name, connection, 0, transaction, error);
if (service == NULL)
goto out_0;
_dbus_assert (bus_connection_is_active (connection));
retval = TRUE;
out_0:
_dbus_string_free (&unique_name);
return retval;
}
static dbus_bool_t
create_unique_client_name (BusRegistry *registry,
DBusString *str)
{
/* We never want to use the same unique client name twice, because
* we want to guarantee that if you send a message to a given unique
* name, you always get the same application. So we use two numbers
* for INT_MAX * INT_MAX combinations, should be pretty safe against
* wraparound.
*/
/* FIXME these should be in BusRegistry rather than static vars */
static int next_major_number = 0;
static int next_minor_number = 0;
int len;
len = _dbus_string_get_length (str); // 这里len等于0
while (TRUE)
{
/* start out with 1-0, go to 1-1, 1-2, 1-3,
* up to 1-MAXINT, then 2-0, 2-1, etc.
*/
if (next_minor_number <= 0) // minor大到了0x7fffffff,那么major加1,minor归0,进入下一轮.
{
next_major_number += 1;
next_minor_number = 0;
if (next_major_number <= 0)
_dbus_assert_not_reached ("INT_MAX * INT_MAX clients were added");
}
_dbus_assert (next_major_number > 0);
_dbus_assert (next_minor_number >= 0);
/* appname:MAJOR-MINOR */
if (!_dbus_string_append (str, ":"))
return FALSE;
if (!_dbus_string_append_int (str, next_major_number))
return FALSE;
if (!_dbus_string_append (str, "."))
return FALSE;
if (!_dbus_string_append_int (str, next_minor_number))
return FALSE;
next_minor_number += 1;
/* Check if a client with the name exists */
// 务必确认该unique_name在所有连接中确实是唯一的,没有存在过的[luther.gliethttp]
if (bus_registry_lookup (registry, str) == NULL)
break;
/* drop the number again, try the next one. */
_dbus_string_set_length (str, len);
}
return TRUE;
}
static dbus_bool_t
bus_driver_send_welcome_message (DBusConnection *connection,
DBusMessage *hello_message,
BusTransaction *transaction,
DBusError *error)
{
DBusMessage *welcome;
const char *name;
_DBUS_ASSERT_ERROR_IS_CLEAR (error);
name = bus_connection_get_name (connection);
_dbus_assert (name != NULL);
welcome = dbus_message_new_method_return (hello_message); // 生成method方法调用返回message内存单元.
if (welcome == NULL)
{
BUS_SET_OOM (error);
return FALSE;
}
if (!dbus_message_append_args (welcome,
// 向该message追加参数,unique_name内容为":1
.12345678", 调用dbus_bus_get()库函数的应用程序正在
// 饥渴的等着unique_name到来呢[luther.gliethttp].
DBUS_TYPE_STRING, &name,
DBUS_TYPE_INVALID))
{
dbus_message_unref (welcome);
BUS_SET_OOM (error);
return FALSE;
}
_dbus_assert (dbus_message_has_signature (welcome, DBUS_TYPE_STRING_AS_STRING));
// 将数据从dbus-session这个daemon发送到调用dbus_bus_get()库函数的应用程序[luther.gliethttp]
// 在这里会做各种合法性检查,包括该connection是否允许send发送数据,是否允许recive接收数据,selinux安全等[lutehr.gliethttp]
if (!bus_transaction_send_from_driver (transaction, connection, welcome))
{
dbus_message_unref (welcome);
BUS_SET_OOM (error);
return FALSE;
}
else
{
dbus_message_unref (welcome);
return TRUE;
}
}