网络可编程数据面——DPDK Graph Library
——lvyilong316
Graph library是DPDK 20.05版本引入的新特性,最近抽时间把其中关键代码看了一遍,主要希望看下其实现思路是否有可以借鉴的东西。下面将其大概实现进行了总结。
DPDK中的Graph架构将数据的处理抽象成了node和link构成的graph,这种思想并不是新东西,VPP就采用了类似的思路实现,只不过DPDK这里讲graph思想实现为了一个lib框架,提供了graph的创建,删除,查找,node clone,edge update,adge shrink等操作,可供上层应用基于此框架开发。
这种graph架构有什么优势呢?首先graph架构将报文处理的相似逻辑抽象同样到一个node中,这样减少了报文处理过程中的I cache和D cache miss;其次提高了报文处理逻辑的复用性,同样的处理逻辑抽象为node可以在graph路径中多次出现复用;其次提高了报文处理模块的灵活性,通过不同node的任意顺序组合完成不同的报文处理流程。
node的构成
Graph lib中的node构成如下图所示,由以下几部分构成:
(1)process:这是一个callback function,是node的核心处理流程,包含了数据处理的实现,通过rte_graph_walk()函数遍历每个node,然后调用每个node的process函数实现数据的处理,如果数据处理完成后需要交给下一个node处理,则需要在其中调用rte_node_enqueue*()函数。
(2)init:node的初始化函数,在rte_graph_create()函数创建graph时会调用各个node的init函数;
(3)fini:node的析构函数,在rte_graph_destroy()函数销毁graph时会调用各个node的fini函数;
(4)Context memory:存放当前node所需的私有信息的内存空间,process,init,fini等函数都可能会用到;
(5)nb_edges:和当前node关联的边数;
(6)next_node[]:存放当前node的邻居节点,由于graph是一个有向图,所以这里next_node[]其实存放是其下游的邻居节点;
内置node类型
用户可以根据自身需求实现注册自己的node节点,dpdk中的rte_node库中也实现了一些内置的基础node。下面我们以ethdev_tx node为例分析一下一个node的定义和注册过程。
ethdev_tx
ethdev_tx node定义了网卡port的收包行为。在rte_node中每个node的定义是通过注册rte_node_register信息完成的,rte_node_register包含了node所需要的基本信息,包括其process处理函数, ethdev_tx的rte_node_register如下所示:
-
static struct rte_node_register ethdev_tx_node_base = {
-
.process = ethdev_tx_node_process,
-
.name = "ethdev_tx",
-
-
.init = ethdev_tx_node_init,
-
-
.nb_edges = ETHDEV_TX_NEXT_MAX,
-
.next_nodes = {
-
[ETHDEV_TX_NEXT_PKT_DROP] = "pkt_drop",
-
},
-
};
之后通过RTE_NODE_REGISTER进行注册:
RTE_NODE_REGISTER(ethdev_tx_node_base);
RTE_NODE_REGISTER的实现如下,最终调用__rte_node_register。
-
#define RTE_NODE_REGISTER(node) \
-
RTE_INIT(rte_node_register_##node) \
-
{ \
-
node.parent_id = RTE_NODE_ID_INVALID; \
-
node.id = __rte_node_register(&node); \
-
}
__rte_node_register的主要工作就是分配struct node结构和对应的nodeid,并将其加入到全局链表node_list中。
-
rte_node_t
-
__rte_node_register(const struct rte_node_register *reg)
-
{
-
struct node *node;
-
rte_edge_t i;
-
size_t sz;
-
-
graph_spinlock_lock();
-
-
sz = sizeof(struct node) + (reg->nb_edges * RTE_NODE_NAMESIZE);
-
node = calloc(1, sz);
-
if (node == NULL) {
-
rte_errno = ENOMEM;
-
goto fail;
-
}
-
-
/* Initialize the node */
-
if (rte_strscpy(node->name, reg->name, RTE_NODE_NAMESIZE) < 0) {
-
rte_errno = E2BIG;
-
goto free;
-
}
-
node->flags = reg->flags;
-
node->process = reg->process;
-
node->init = reg->init;
-
node->fini = reg->fini;
-
node->nb_edges = reg->nb_edges;
-
node->parent_id = reg->parent_id;
-
for (i = 0; i < reg->nb_edges; i++) {
-
if (rte_strscpy(node->next_nodes[i], reg->next_nodes[i],
-
RTE_NODE_NAMESIZE) < 0) {
-
rte_errno = E2BIG;
-
goto free;
-
}
-
}
-
-
node->id = node_id++;
-
-
/* Add the node at tail */
-
STAILQ_INSERT_TAIL(&node_list, node, next);
-
graph_spinlock_unlock();
-
-
return node->id;
-
free:
-
free(node);
-
fail:
-
graph_spinlock_unlock();
-
return RTE_NODE_ID_INVALID;
-
}
对应数据结构如下所示:
下面我们看一下ethdev_tx node的process函数,即ethdev_tx_node_process。
-
static uint16_t
-
ethdev_tx_node_process(struct rte_graph *graph, struct rte_node *node,
-
void **objs, uint16_t nb_objs)
-
{
-
ethdev_tx_node_ctx_t *ctx = (ethdev_tx_node_ctx_t *)node->ctx;
-
uint16_t port, queue;
-
uint16_t count;
-
-
/* Get Tx port id */
-
port = ctx->port;
-
queue = ctx->queue;
-
-
count = rte_eth_tx_burst(port, queue, (struct rte_mbuf **)objs,
-
nb_objs);
-
-
/* Redirect unsent pkts to drop node */
-
if (count != nb_objs) {
-
rte_node_enqueue(graph, node, ETHDEV_TX_NEXT_PKT_DROP,
-
&objs[count], nb_objs - count);
-
}
-
-
return count;
-
}
这里我们也可以清楚的看到node->ctx的作用,其中process所需的port和queue信息都存放在其中。
ethdev_rx
ethdev_rx是处理网卡port接收报文逻辑的node,其rte_node_register定义如下所示:
-
static struct rte_node_register ethdev_rx_node_base = {
-
.process = ethdev_rx_node_process,
-
.flags = RTE_NODE_SOURCE_F,
-
.name = "ethdev_rx",
-
-
.init = ethdev_rx_node_init,
-
-
.nb_edges = ETHDEV_RX_NEXT_MAX,
-
.next_nodes = {[ETHDEV_RX_NEXT_IP4_LOOKUP] = "ip4_lookup"},
-
};
和ethdev_tx不同的是这里多了一个 RTE_NODE_SOURCE_F的flag,这个flag标识了这是一个source node,在DPDK graph中,source node是一种特殊的node,作为这个graph的起点,当然一个graph可以有多个source node。另外注意这里静态定义这个node的邻居node为ip4_lookup。
我们再来看一下其process处理函数ethdev_rx_node_process。
-
static __rte_always_inline uint16_t
-
ethdev_rx_node_process(struct rte_graph *graph, struct rte_node *node,
-
void **objs, uint16_t cnt)
-
{
-
ethdev_rx_node_ctx_t *ctx = (ethdev_rx_node_ctx_t *)node->ctx;
-
uint16_t n_pkts = 0;
-
-
RTE_SET_USED(objs);
-
RTE_SET_USED(cnt);
-
-
n_pkts = ethdev_rx_node_process_inline(graph, node, ctx->port_id,
-
ctx->queue_id);
-
return n_pkts;
-
}
其中主要调用了 ethdev_rx_node_process_inline。
-
static __rte_always_inline uint16_t
-
ethdev_rx_node_process_inline(struct rte_graph *graph, struct rte_node *node,
-
uint16_t port, uint16_t queue)
-
{
-
uint16_t count, next_index = ETHDEV_RX_NEXT_IP4_LOOKUP;
-
-
/* Get pkts from port */
-
count = rte_eth_rx_burst(port, queue, (struct rte_mbuf **)node->objs,
-
RTE_GRAPH_BURST_SIZE);
-
-
if (!count)
-
return 0;
-
node->idx = count;
-
/* Enqueue to next node */
-
rte_node_next_stream_move(graph, node, next_index);
-
-
return count;
-
}
通过rte_eth_rx_burst接收到报文后,再通过rte_node_next_stream_move将报文交给下一个节点处理,而下一个节点已经被指定好为next_index = ETHDEV_RX_NEXT_IP4_LOOKUP,即ip4_lookup node。
ip4_rewrite
这个node为IPv4重写节点:包含IPv4和以太网报文头的重写功能,可以通过rte_node_ip4_rewrite_add 函数进行配置。
ip4_lookup
IPv4查找节点:由IPv4提取和LPM查找节点组成。路由表可以由应用程序通过rte_node_ip4_route_add 函数进行配置。
-
static struct rte_node_register ip4_lookup_node = {
-
.process = ip4_lookup_node_process,
-
.name = "ip4_lookup",
-
-
.init = ip4_lookup_node_init,
-
-
.nb_edges = RTE_NODE_IP4_LOOKUP_NEXT_MAX,
-
.next_nodes = {
-
[RTE_NODE_IP4_LOOKUP_NEXT_REWRITE] = "ip4_rewrite",
-
[RTE_NODE_IP4_LOOKUP_NEXT_PKT_DROP] = "pkt_drop",
-
},
-
};
其邻居节点有ip4_rewrite和pkt_drop。
null
空节点:定义了节点通用结构的skeleton节点。
pkt_drop
数据包丢弃节点:将各自mempool中接受到的数据包释放。
将node添加到graph的几种方法
将一个node加入图中link起来有以下几种方法:
1. 在node注册的时候提供并初始化next_node[]数组,上文提过next_node[]中用来存放当前节点的下游节点对象的,所以也体现了节点间的link关系,这种方法也称为静态方法;
2. 使用rte_node_edge_get(), rte_node_edge_update(), rte_node_edge_shrink() 几个函数来在运行时更新 next_nodes[],但是这种方式需要在graph创建之前;
3. 使用rte_node_clone()方法clone一个已有节点,这种方法适用于将系统中更多CPU加入到转发逻辑的扩展操作,clone操作会复制原有节点的所有属性,但是不会复制"context memory",context memory中包含了port和queue pair信息。
创建graph对象
graph是通过rte_graph_create函数创建的,这个函数也是理解DPDK graph架构的核心。
-
rte_graph_t
-
rte_graph_create(const char *name, struct rte_graph_param *prm)
-
{
-
rte_node_t src_node_count;
-
struct graph *graph;
-
const char *pattern;
-
uint16_t i;
-
-
graph_spinlock_lock();
-
-
/* Check arguments sanity */
-
if (prm == NULL)
-
SET_ERR_JMP(EINVAL, fail, "Param should not be NULL");
-
-
if (name == NULL)
-
SET_ERR_JMP(EINVAL, fail, "Graph name should not be NULL");
-
-
/* Check for existence of duplicate graph */
-
/* 在graph全局链表中查找看是否已有同名的graph */
-
STAILQ_FOREACH(graph, &graph_list, next)
-
if (strncmp(name, graph->name, RTE_GRAPH_NAMESIZE) == 0)
-
SET_ERR_JMP(EEXIST, fail, "Found duplicate graph %s",
-
name);
-
-
/* Create graph object */
-
/* 分配graph结构 */
-
graph = calloc(1, sizeof(*graph));
-
if (graph == NULL)
-
SET_ERR_JMP(ENOMEM, fail, "Failed to calloc graph object");
-
-
/* Initialize the graph object */
-
STAILQ_INIT(&graph->node_list);
-
if (rte_strscpy(graph->name, name, RTE_GRAPH_NAMESIZE) < 0)
-
SET_ERR_JMP(E2BIG, free, "Too big name=%s", name);
-
-
/* Expand node pattern and add the nodes to the graph */
-
for (i = 0; i < prm->nb_node_patterns; i++) {
-
pattern = prm->node_patterns[i];
-
if (expand_pattern_to_node(graph, pattern))
-
goto graph_cleanup;
-
}
-
-
/* Go over all the nodes edges and add them to the graph */
-
if (graph_node_edges_add(graph))
-
goto graph_cleanup;
-
-
/* Update adjacency list of all nodes in the graph */
-
if (graph_adjacency_list_update(graph))
-
goto graph_cleanup;
-
-
/* Make sure at least a source node present in the graph */
-
src_node_count = graph_src_nodes_count(graph);
-
if (src_node_count == 0)
-
goto graph_cleanup;
-
-
/* Make sure no node is pointing to source node */
-
if (graph_node_has_edge_to_src_node(graph))
-
goto graph_cleanup;
-
-
/* Don't allow node has loop to self */
-
if (graph_node_has_loop_edge(graph))
-
goto graph_cleanup;
-
-
/* Do BFS from src nodes on the graph to find isolated nodes */
-
if (graph_has_isolated_node(graph))
-
goto graph_cleanup;
-
-
/* Initialize graph object */
-
graph->socket = prm->socket_id;
-
graph->src_node_count = src_node_count;
-
graph->node_count = graph_nodes_count(graph);
-
graph->id = graph_id;
-
-
/* Allocate the Graph fast path memory and populate the data */
-
if (graph_fp_mem_create(graph))
-
goto graph_cleanup;
-
-
/* Call init() of the all the nodes in the graph */
-
if (graph_node_init(graph))
-
goto graph_mem_destroy;
-
-
/* All good, Lets add the graph to the list */
-
graph_id++;
-
STAILQ_INSERT_TAIL(&graph_list, graph, next);
-
-
graph_spinlock_unlock();
-
return graph->id;
-
-
graph_mem_destroy:
-
graph_fp_mem_destroy(graph);
-
graph_cleanup:
-
graph_cleanup(graph);
-
free:
-
free(graph);
-
fail:
-
graph_spinlock_unlock();
-
return RTE_GRAPH_ID_INVALID;
-
}
我将此函数的实现展开并添加注释得到下图。内嵌函数不再单独说明。
这个函数的核心是创建并初始化struct graph结构,包括其中的node和邻居关系,然后将struct graph添加到全局链表graph_list中。相关数据结构如下图所示。注意struct graph底层有对应的struct rte_graph,struct node底层有对应的struct rte_node。
graph的遍历
graph创建后需要让整个graph运行起来,这个工作主要是由rte_graph_walk函数完成的。
-
/**
-
* Perform graph walk on the circular buffer and invoke the process function
-
* of the nodes and collect the stats.
-
*
-
* @param graph
-
* Graph pointer returned from rte_graph_lookup function.
-
*
-
* @see rte_graph_lookup()
-
*/
-
__rte_experimental
-
static inline void
-
rte_graph_walk(struct rte_graph *graph)
-
{
-
const rte_graph_off_t *cir_start = graph->cir_start;
-
const rte_node_t mask = graph->cir_mask;
-
uint32_t head = graph->head;
-
struct rte_node *node;
-
uint64_t start;
-
uint16_t rc;
-
void **objs;
-
-
/*
-
* Walk on the source node(s) ((cir_start - head) -> cir_start) and then
-
* on the pending streams (cir_start -> (cir_start + mask) -> cir_start)
-
* in a circular buffer fashion.
-
*
-
* +-----+ <= cir_start - head [number of source nodes]
-
* | |
-
* | ... | <= source nodes
-
* | |
-
* +-----+ <= cir_start [head = 0] [tail = 0]
-
* | |
-
* | ... | <= pending streams
-
* | |
-
* +-----+ <= cir_start + mask
-
*/
-
while (likely(head != graph->tail)) {
-
node = RTE_PTR_ADD(graph, cir_start[(int32_t)head++]);
-
RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
-
objs = node->objs;
-
rte_prefetch0(objs);
-
-
if (rte_graph_has_stats_feature()) { /*如果graph开启了统计功能*/
-
start = rte_rdtsc();
-
rc = node->process(graph, node, objs, node->idx);
-
node->total_cycles += rte_rdtsc() - start;
-
node->total_calls++;
-
node->total_objs += rc;
-
} else {
-
node->process(graph, node, objs, node->idx);
-
}
-
node->idx = 0;
-
head = likely((int32_t)head > 0) ? head & mask : head;
-
}
-
graph->tail = 0;
-
}
要理解这个过程需要再次拿出来rte_graph的结构,如下图所示。该函数从graph->head开始遍历graph的每个node,并调用其process函数。而graph->head在graph创建时被做了如下初始化:
graph->head = (int32_t)-_graph->src_node_count;
也就是负的src_node_count,根据rte_graph的结构图可知cir_start[head]指向的就是graph的source node的起始位置。所以函数的整个过程是先从cir_start[-src_node_count]遍历到cir_start[0]将所有的source node遍历一遍,然后再从cir_start[0]遍历到cir_start[mask]将graph中的所有node遍历一遍。为什么要先遍历source node呢?这也很容易理解,source node一般负责原始数据的处理,如ethdev_rx负责将报文从网卡收上来,如果没有source node后续node也就没有报文可处理。
graph的统计
DPDK的graph也带了统计功能,可以统计graph的每个node被调用了多少次,每个node被调用消耗了多少个cycle。可以使用rte_graph_cluster_stats_get()函数获取,获取信息如下图所示。
l3fwd-graph示例分析
l3fwd-graph是DPDK使用graph架构对l3fwd example的重实现,也是我们学习使用graph的很好的例子。
l3fwd-graph通过DPDK中的ethdev_rx, ip4_lookup, ip4_rewrite, ethdev_tx,pkt_drop等内置node,在每个转发core上创建一个graph来实现三层的转发功能。
启动参数如下所示:
-
./build/l3fwd-graph -l 1,2 -n 4 -- -p 0x3 --config="(0,0,1),(1,0,2)"
其中--config (port,queue,lcore)[,(port,queue,lcore)]表示queue,port,core三者的绑定关系。下面分段介绍这个例子的实现。
Graph Node Pre-Init Configuration
和正常的DPDK程序一样,l3fwd-graph启动会先调用rte_eth_dev_configure对每个port进行配置,然后调用rte_eth_tx_queue_setup配置tx queue,调用rte_eth_rx_queue_setup配置每个队列的tx queue,然后调用rte_eth_dev_start对每个port进行start。这些通用逻辑我们就不再展开,这里只关注和graph相关的。
其中第一点是在rte_eth_tx_queue_setup和rte_eth_rx_queue_setup后调用rte_node_eth_config。这个函数分配对ethdev_rx和ethdev_tx 两个node进行 clone ,其clone出来的节点name分别为 ethdev_rx-X-Y和ethdev_tx-X,其中X和Y分别代表port id和queue id,所以ethdev_tx是每个port一个,其queue id通过graph id指定,而ethdev_rx是每个port-queue映射对应一个。
Graph Initialization
在每个转发面上创建一个graph对象,每个core根据配置tx和rx的能力包含对应的ethdev_rx和ethdev_tx node,
-
static const char *const default_patterns[] = {
-
"ip4*",
-
"ethdev_tx-*",
-
"pkt_drop",};const char **node_patterns;uint16_t nb_pattern;
-
/* ... */
-
/* Create a graph object per lcore with common nodes and * lcore specific nodes based on application arguments */
-
nb_patterns = RTE_DIM(default_patterns);
-
node_patterns = malloc((MAX_RX_QUEUE_PER_LCORE + nb_patterns) *sizeof(*node_patterns));
-
memcpy(node_patterns, default_patterns, nb_patterns * sizeof(*node_patterns));
-
memset(&graph_conf, 0, sizeof(graph_conf));
-
-
/* Common set of nodes in every lcore's graph object */
-
graph_conf.node_patterns = node_patterns;
-
for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
-
/* ... */
-
-
/* Skip graph creation if no source exists */
-
if (!qconf->n_rx_queue)
-
continue;
-
-
/* Add rx node patterns of this lcore based on --config */
-
for (i = 0; i < qconf->n_rx_queue; i++) {
-
graph_conf.node_patterns[nb_patterns + i] =
-
qconf->rx_queue_list[i].node_name;
-
}
-
-
graph_conf.nb_node_patterns = nb_patterns + i;
-
graph_conf.socket_id = rte_lcore_to_socket_id(lcore_id);
-
-
snprintf(qconf->name, sizeof(qconf->name), "worker_%u", lcore_id);
-
-
graph_id = rte_graph_create(qconf->name, &graph_conf);
-
-
/* ... */
-
-
qconf->graph = rte_graph_lookup(qconf->name);
-
-
/* ... */
-
}
注意:如果通过shell传递的一组节点pattern不满足他们的相互依赖关系或给定的正则表达式node pattern找不到对应node则graph将会创建失败。
Forwarding data(Route, Next-Hop) addition
graph创建好之后就可以通过rte_node_ip4_route_add() 和rte_node_ip4_rewrite_add() 函数分别向ipv4_lookup和ipv4_rewrite node添加配置规则。
-
/* Add route to ip4 graph infra */for (i = 0; i < IPV4_L3FWD_LPM_NUM_ROUTES; i++) {
-
/* ... */
-
-
dst_port = ipv4_l3fwd_lpm_route_array[i].if_out;
-
next_hop = i;
-
-
/* ... */
-
ret = rte_node_ip4_route_add(ipv4_l3fwd_lpm_route_array[i].ip,
-
ipv4_l3fwd_lpm_route_array[i].depth, next_hop,
-
RTE_NODE_IP4_LOOKUP_NEXT_REWRITE);
-
-
/* ... */
-
-
memcpy(rewrite_data, val_eth + dst_port, rewrite_len);
-
-
/* Add next hop for a given destination */
-
ret = rte_node_ip4_rewrite_add(next_hop, rewrite_data,
-
rewrite_len, dst_port);
-
-
RTE_LOG(INFO, L3FWD_GRAPH, "Added route %s, next_hop %u\n",
-
route_str, next_hop);
-
}
Packet Forwarding using Graph Walk
graph配置完成后就可以启动graph的转发了,其核心函数为graph_main_loop.
-
/* Main processing loop */
-
static intgraph_main_loop(void *conf){
-
// ...
-
-
lcore_id = rte_lcore_id();
-
qconf = &lcore_conf[lcore_id];
-
graph = qconf->graph;
-
-
RTE_LOG(INFO, L3FWD_GRAPH,
-
"Entering main loop on lcore %u, graph %s(%p)\n", lcore_id,
-
qconf->name, graph);
-
-
/* Walk over graph until signal to quit */
-
while (likely(!force_quit))
-
rte_graph_walk(graph);
-
return 0;
-
}
即使用rte_graph_walk遍历当前core上的graph,其中rte_graph_walk我们前面已经分析过,其内部会先遍历指向graph的source node,也就是 ethdev_rx-X-Y,然后遍历graph中的其他node,这个执行node的process函数。
参考文档: