Chinaunix首页 | 论坛 | 博客
  • 博客访问: 41648
  • 博文数量: 14
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 165
  • 用 户 组: 普通用户
  • 注册时间: 2022-11-22 23:41
个人简介

将分享技术博文作为一种快乐,提升自己帮助他人

文章分类

全部博文(14)

文章存档

2023年(9)

2022年(5)

我的朋友

分类: LINUX

2022-11-24 01:16:35

一、前言

    rte_mempool库是DPDK中的一个基本核心库,它是提高DPDK性能的方式之一,DPDK中基本所有的设备的应用都会应用到它。了解它,有助于性能问题定位,有助于跟深入理解DPDK。 rte_mempool的核心库位于工程的lib\mempool\目录下。

二、rte_mempool结构介绍

2.1 rte_mempool结构体       

    介绍rte_mempool之前,先了解以下rte_mempool结构体的定义,其定义位于rte_mempool.h中,结构体定义如下:


各域段含义:

  • name: 表示内存池的名字,一个进程中的内存池的名字不可相同,否则申请不会成功(申请memzone时检测)。内存池名字的唯一性,决定了可以通过内存池的名字,通过rte_mempool_lookup()对外接口在全局rte_mempool_list中找到该内存池的地址。
  • *pool_data或pool_id,这是一个枚举体。pool_data指向该mempool中用于存储rte_ring的首地址。
  • pool_config:应用传给ops函数的不透明数据。当前DPDK框架层未用到,cnxk和mlx5自定的有用到。
  • mz:内存池的内存memzone。
  • flags:分配内存池的flags,多生产者多消费者的模式,通过该flag指定,决定了rte_mempool_ops的类型。
  • socket_id:分配内存池所在的socket_id;
  • size: 内存池中mbuf的个数
  • cache_size:内存池中每个core的本地cache大小
  • elt_size:对象中一个元素的大小。等于rte_mbuf结构体大小+私有数据+mbuf_data_room_size.
  • header_size和trailer_size分别表示对象的头部和尾部大小
  • private_data_size:添加在rte_mempool结构体后面的用于存储私有数据的一段私有数据大小。对于网络设备的pktmbuf内存池,其大小就是struct rte_pktmbuf_pool_private结构体的大小。
  • ops_index: rte_mempool可以通过名字指定rte_mempool_ops,rte_mempool_ops中有分配和释放、入队和出队、获取有效的对象个数、内存池填充、内存池信息获取和计算存储指定数量对象的memory size。DPDK中有支持多个rte_mempool_ops,如,ops_mp_mc、ops_sp_sc、ops_mp_sc、ops_sp_mc、ops_mt_rts和ops_mt_hts。用户也可以自定义这些ops,然后通过将其注册到全局rte_mempool_ops_table变量中,该变量中定义了一个ops数组。ops注册到该全局变量后,该ops就占用了一个index。这里的ops_index就是DPDK中注册的rte_mempool_ops在全局变量定义的数组的下表。
  • local_cache指向rte_mempool的本地核的chache内存,具体细节下文还会提到。
  • populated_size:已填充的对象个数
  • elt_list:内存池中对象是通过该链表将其串起来的。
  • nb_mem_chunks:memory chunks的数量
  • mem_list:数据类型为struct rte_mempool_memhdr,其记录了一个chunk的iova、va和内存大小,通过tailq将mempool中所有的memory chunk串在一起。对象的内存就是memory chunks关联的。

2.2 mempool的结构

    rte_memool库的基本概念,也可以从中也有一些介绍。mempool的是通过三部分实现的:
  1. mempool对象节点:mempool对象节点,通过名称来唯一标识,其在创建时挂接在全局static struct rte_tailq_elem rte_mempool_tailq链表中。通过名字可以找到该对象节点,对象节点保存了rte_mempool的地址。
  2. mempool的实体内存区域:rte_mempool中的mz保存了实际分配的连续内存空间的信息,mz->addr就是rte_mempool的地址,存储了所mempool对象实体。对象实体,有三部分构成:rte_mempool结构体,private data和local cache(每个核都有一个)构成。
  3. ring无锁队列:无锁环形队列struct rte_ring,rte_ring的内存结构中包含了一个指针数组,其指向了mempool的所有对象。
    rte_mempool中本地cache、rte_ring和对象的存取关系图如下:

    rte_mempool中引入的local_cache对象缓冲区,并非硬件上的cache,DPDK应用的业务线程一般绑核的,因此是为了减少多核访问ring造成的临界区访问。local_cache上和rte_ring中一样,有一个指针数组,指向具体的对象。从coreX上的app会优先访问该local_cache上的对象。入队的时候优先入local_cache中,出队时优先出local_cache中。当cache是空时,则会从rte_ring中取对象;当cache被放满时,则会将多余的对象放入到rte_ring中。

三、rte_mempool创建

    下面以pktmbuf pool的创建流程为例进行rte_mempool创建说明。

3.1 pktmbuf pool私有数据计算

点击(此处)折叠或打开

  1. // 每个mbuf的大小
  2. elt_size = sizeof(struct rte_mbuf) + (unsigned)priv_size + (unsigned)data_room_size;
  3. // 每个mbuf data_room_size
  4. mbp_priv.mbuf_data_room_size = data_room_size;
  5. mbp_priv.mbuf_priv_size = priv_size;
mbuf对象有三部分构成:rte_mbuf结构头,priv_size和data_room。

3.2 空mempool创建

    创建空memepool接口为rte_mempool_create_empty()。该接口中做了如下事情:

  1. 通过rte_mempool_calc_obj_size计算mempool的object的大小。object的内存结构为:header + element_size + trailer。其中头就是struct rte_mempool_objhdr结构,记录了对象所属mp和对象的iova地址。
  2. 分配一个struct rte_tailq_entry并将其插入到全局的static struct rte_tailq_elem rte_mempool_tailq上。

点击(此处)折叠或打开

  1. mempool_list = RTE_TAILQ_CAST(rte_mempool_tailq.head, rte_mempool_list);
  2. struct rte_tailq_entry *te = rte_zmalloc("MEMPOOL_TAILQ_ENTRY", sizeof(*te), 0);
  3. te->data = mp;
  4. TAILQ_INSERT_TAIL(mempool_list, te, next);
    3. 计算mempool的大小:rte_mempool结构体大小 + sizeof(struct rte_mempool_cache) * RTE_MAX_LCORE) + private_data_size

点击(此处)折叠或打开

  1. mempool_size = RTE_MEMPOOL_HEADER_SIZE(mp, cache_size);
  2. mempool_size += private_data_size;
  3. mempool_size = RTE_ALIGN_CEIL(mempool_size, RTE_MEMPOOL_ALIGN);
    4. 计算完mempool大小后,申请mempool的内存

点击(此处)折叠或打开

  1. mz = rte_memzone_reserve(mz_name, mempool_size, socket_id, mz_flags);
  2. if (mz == NULL)
  3.     goto exit_unlock;

  4. /* init the mempool structure */
  5. mp = mz->addr;
  6. memset(mp, 0, RTE_MEMPOOL_HEADER_SIZE(mp, cache_size));
  7. ret = strlcpy(mp->name, name, sizeof(mp->name));
  8. if (ret < 0 || ret >= (int)sizeof(mp->name)) {
  9.     rte_errno = ENAMETOOLONG;
  10.     goto exit_unlock;
  11. }
  12. mp->mz = mz;
  13. mp->size = n;
  14. mp->flags = flags;
  15. mp->socket_id = socket_id;
  16. mp->elt_size = objsz.elt_size;
  17. mp->header_size = objsz.header_size;
  18. mp->trailer_size = objsz.trailer_size;
  19. /* Size of default caches, zero means disabled. */
  20. mp->cache_size = cache_size;
  21. mp->private_data_size = private_data_size;
  22. STAILQ_INIT(&mp->elt_list);
  23. STAILQ_INIT(&mp->mem_list);

  24. /*
  25.  * local_cache pointer is set even if cache_size is zero.
  26.  * The local_cache points to just past the elt_pa[] array.
  27.  */
  28. mp->local_cache = (struct rte_mempool_cache *)
  29.     RTE_PTR_ADD(mp, RTE_MEMPOOL_HEADER_SIZE(mp, 0));

  30. /* Init all default caches. */
  31. if (cache_size != 0) {
  32.     for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++)
  33.         mempool_cache_init(&mp->local_cache[lcore_id],
  34.                  cache_size);
  35. }
    5. 初始化mempool结构体即,初始化mempool中的每个local_cache数据,如35-38行。

3.3 设置mempool ops

调用rte_mempool_set_ops_byname()通过名字设置mempool ops。

点击(此处)折叠或打开

  1. int rte_mempool_set_ops_byname(struct rte_mempool *mp, const char *name,
  2.     void *pool_config)
  3. {
  4.     struct rte_mempool_ops *ops = NULL;
  5.     unsigned i;

  6.     /* too late, the mempool is already populated. */
  7.     if (mp->flags & RTE_MEMPOOL_F_POOL_CREATED)
  8.         return -EEXIST;

  9.     for (i = 0; i < rte_mempool_ops_table.num_ops; i++) {
  10.         if (!strcmp(name,
  11.                 rte_mempool_ops_table.ops[i].name)) {
  12.             ops = &rte_mempool_ops_table.ops[i];
  13.             break;
  14.         }
  15.     }

  16.     if (ops == NULL)
  17.         return -EINVAL;

  18.     mp->ops_index = i;
  19.     mp->pool_config = pool_config;
  20.     rte_mempool_trace_set_ops_byname(mp, name, pool_config);
  21.     return 0;
  22. }

3.4 pool私有数据初始化

调用rte_pktmbuf_pool_init()初始化pool中的私有数据结构。

点击(此处)折叠或打开

  1. void
  2. rte_pktmbuf_pool_init(struct rte_mempool *mp, void *opaque_arg)
  3. {
  4.     struct rte_pktmbuf_pool_private *user_mbp_priv, *mbp_priv;
  5.     struct rte_pktmbuf_pool_private default_mbp_priv;
  6.     uint16_t roomsz;

  7.     RTE_ASSERT(mp->private_data_size >=
  8.          sizeof(struct rte_pktmbuf_pool_private));
  9.     RTE_ASSERT(mp->elt_size >= sizeof(struct rte_mbuf));

  10.     /* if no structure is provided, assume no mbuf private area */
  11.     user_mbp_priv = opaque_arg;
  12.     if (user_mbp_priv == NULL) {
  13.         memset(&default_mbp_priv, 0, sizeof(default_mbp_priv));
  14.         if (mp->elt_size > sizeof(struct rte_mbuf))
  15.             roomsz = mp->elt_size - sizeof(struct rte_mbuf);
  16.         else
  17.             roomsz = 0;
  18.         default_mbp_priv.mbuf_data_room_size = roomsz;
  19.         user_mbp_priv = &default_mbp_priv;
  20.     }

  21.     RTE_ASSERT(mp->elt_size >= sizeof(struct rte_mbuf) +
  22.         ((user_mbp_priv->flags & RTE_PKTMBUF_POOL_F_PINNED_EXT_BUF) ?
  23.             sizeof(struct rte_mbuf_ext_shared_info) :
  24.             user_mbp_priv->mbuf_data_room_size) +
  25.         user_mbp_priv->mbuf_priv_size);
  26.     RTE_ASSERT((user_mbp_priv->flags &
  27.          ~RTE_PKTMBUF_POOL_F_PINNED_EXT_BUF) == 0);

  28.     mbp_priv = rte_mempool_get_priv(mp);
  29.     memcpy(mbp_priv, user_mbp_priv, sizeof(*mbp_priv));
  30. }

3.5 填充mempool

填充mempool的实现如下:

点击(此处)折叠或打开

  1. int
  2. rte_mempool_populate_default(struct rte_mempool *mp)
  3. {
  4.     unsigned int mz_flags = RTE_MEMZONE_1GB|RTE_MEMZONE_SIZE_HINT_ONLY;
  5.     char mz_name[RTE_MEMZONE_NAMESIZE];
  6.     const struct rte_memzone *mz;
  7.     ssize_t mem_size;
  8.     size_t align, pg_sz, pg_shift = 0;
  9.     rte_iova_t iova;
  10.     unsigned mz_id, n;
  11.     int ret;
  12.     bool need_iova_contig_obj;
  13.     size_t max_alloc_size = SIZE_MAX;

  14.     ret = mempool_ops_alloc_once(mp);
  15.     if (ret != 0)
  16.         return ret;

  17.     /* mempool must not be populated */
  18.     if (mp->nb_mem_chunks != 0)
  19.         return -EEXIST;

  20.     /*
  21.      * the following section calculates page shift and page size values.
  22.      *
  23.      * these values impact the result of calc_mem_size operation, which
  24.      * returns the amount of memory that should be allocated to store the
  25.      * desired number of objects. when not zero, it allocates more memory
  26.      * for the padding between objects, to ensure that an object does not
  27.      * cross a page boundary. in other words, page size/shift are to be set
  28.      * to zero if mempool elements won't care about page boundaries.
  29.      * there are several considerations for page size and page shift here.
  30.      *
  31.      * if we don't need our mempools to have physically contiguous objects,
  32.      * then just set page shift and page size to 0, because the user has
  33.      * indicated that there's no need to care about anything.
  34.      *
  35.      * if we do need contiguous objects (if a mempool driver has its
  36.      * own calc_size() method returning min_chunk_size = mem_size),
  37.      * there is also an option to reserve the entire mempool memory
  38.      * as one contiguous block of memory.
  39.      *
  40.      * if we require contiguous objects, but not necessarily the entire
  41.      * mempool reserved space to be contiguous, pg_sz will be != 0,
  42.      * and the default ops->populate() will take care of not placing
  43.      * objects across pages.
  44.      *
  45.      * if our IO addresses are physical, we may get memory from bigger
  46.      * pages, or we might get memory from smaller pages, and how much of it
  47.      * we require depends on whether we want bigger or smaller pages.
  48.      * However, requesting each and every memory size is too much work, so
  49.      * what we'll do instead is walk through the page sizes available, pick
  50.      * the smallest one and set up page shift to match that one. We will be
  51.      * wasting some space this way, but it's much nicer than looping around
  52.      * trying to reserve each and every page size.
  53.      *
  54.      * If we fail to get enough contiguous memory, then we'll go and
  55.      * reserve space in smaller chunks.
  56.      */

  57.     need_iova_contig_obj = !(mp->flags & RTE_MEMPOOL_F_NO_IOVA_CONTIG);
  58.     ret = rte_mempool_get_page_size(mp, &pg_sz);
  59.     if (ret < 0)
  60.         return ret;

  61.     if (pg_sz != 0)
  62.         pg_shift = rte_bsf32(pg_sz);

  63.     for (mz_id = 0, n = mp->size; n > 0; mz_id++, n -= ret) {
  64.         size_t min_chunk_size;

  65.         mem_size = rte_mempool_ops_calc_mem_size(
  66.             mp, n, pg_shift, &min_chunk_size, &align);

  67.         if (mem_size < 0) {
  68.             ret = mem_size;
  69.             goto fail;
  70.         }

  71.         ret = snprintf(mz_name, sizeof(mz_name),
  72.             RTE_MEMPOOL_MZ_FORMAT "_%d", mp->name, mz_id);
  73.         if (ret < 0 || ret >= (int)sizeof(mz_name)) {
  74.             ret = -ENAMETOOLONG;
  75.             goto fail;
  76.         }

  77.         /* if we're trying to reserve contiguous memory, add appropriate
  78.          * memzone flag.
  79.          */
  80.         if (min_chunk_size == (size_t)mem_size)
  81.             mz_flags |= RTE_MEMZONE_IOVA_CONTIG;

  82.         /* Allocate a memzone, retrying with a smaller area on ENOMEM */
  83.         do {
  84.             mz = rte_memzone_reserve_aligned(mz_name,
  85.                 RTE_MIN((size_t)mem_size, max_alloc_size),
  86.                 mp->socket_id, mz_flags, align);

  87.             if (mz != NULL || rte_errno != ENOMEM)
  88.                 break;

  89.             max_alloc_size = RTE_MIN(max_alloc_size,
  90.                         (size_t)mem_size) / 2;
  91.         } while (mz == NULL && max_alloc_size >= min_chunk_size);

  92.         if (mz == NULL) {
  93.             ret = -rte_errno;
  94.             goto fail;
  95.         }

  96.         if (need_iova_contig_obj)
  97.             iova = mz->iova;
  98.         else
  99.             iova = RTE_BAD_IOVA;

  100.         if (pg_sz == 0 || (mz_flags & RTE_MEMZONE_IOVA_CONTIG))
  101.             ret = rte_mempool_populate_iova(mp, mz->addr,
  102.                 iova, mz->len,
  103.                 rte_mempool_memchunk_mz_free,
  104.                 (void *)(uintptr_t)mz);
  105.         else
  106.             ret = rte_mempool_populate_virt(mp, mz->addr,
  107.                 mz->len, pg_sz,
  108.                 rte_mempool_memchunk_mz_free,
  109.                 (void *)(uintptr_t)mz);
  110.         if (ret == 0) /* should not happen */
  111.             ret = -ENOBUFS;
  112.         if (ret < 0) {
  113.             rte_memzone_free(mz);
  114.             goto fail;
  115.         }
  116.     }

  117.     rte_mempool_trace_populate_default(mp);
  118.     return mp->size;

  119.  fail:
  120.     rte_mempool_free_memchunks(mp);
  121.     return ret;
  122. }
  • 创建rte_ring
    在上面填充实现接口中,通过rte_mempool_ops创建内存池中的rte_ring,并将其地址赋给mp->pool_data,实现流程间如下代码:

点击(此处)折叠或打开

  1. static int
  2. mempool_ops_alloc_once(struct rte_mempool *mp)
  3. {
  4.     int ret;

  5.     /* create the internal ring if not already done */
  6.     if ((mp->flags & RTE_MEMPOOL_F_POOL_CREATED) == 0) {
  7.         ret = rte_mempool_ops_alloc(mp);
  8.         if (ret != 0)
  9.             return ret;
  10.         mp->flags |= RTE_MEMPOOL_F_POOL_CREATED;
  11.     }
  12.     return 0;
  13. }

  14. int
  15. rte_mempool_ops_alloc(struct rte_mempool *mp)
  16. {
  17.     struct rte_mempool_ops *ops;

  18.     rte_mempool_trace_ops_alloc(mp);
  19.     ops = rte_mempool_get_ops(mp->ops_index);
  20.     return ops->alloc(mp);
  21. }

  22. static int
  23. ring_alloc(struct rte_mempool *mp, uint32_t rg_flags)
  24. {
  25.     int ret;
  26.     char rg_name[RTE_RING_NAMESIZE];
  27.     struct rte_ring *r;

  28.     ret = snprintf(rg_name, sizeof(rg_name),
  29.         RTE_MEMPOOL_MZ_FORMAT, mp->name);
  30.     if (ret < 0 || ret >= (int)sizeof(rg_name)) {
  31.         rte_errno = ENAMETOOLONG;
  32.         return -rte_errno;
  33.     }

  34.     /*
  35.      * Allocate the ring that will be used to store objects.
  36.      * Ring functions will return appropriate errors if we are
  37.      * running as a secondary process etc., so no checks made
  38.      * in this function for that condition.
  39.      */
  40.     r = rte_ring_create(rg_name, rte_align32pow2(mp->size + 1),
  41.         mp->socket_id, rg_flags);
  42.     if (r == NULL)
  43.         return -rte_errno;

  44.     mp->pool_data = r;

  45.     return 0;
  46. }
再顺便补充一下:内存池的rte_ring{BANNED}{BANNED}最佳佳后是通过rte_ring_create_elem()接口创建的。该接口创建时,从rte_memzone里申请rte_ring的内存(结构为:rte_ring结构体+void*ptr[mp->size]),并将rte_ring的地址和对应的memzone地址保存在struct rte_tailq_entry中,将其插入到全局的rte_ring_tailq上。具体请查看rte_ring_create_elem()的实现。
  • 得到page_size和page_shift,存放所有的mbuf。计算当前可用的chunk大小,申请chunk内存。每个chunk memory的信息以struct rte_mempool_memhdr形式保存下来,插入到mp->mem_list中,chunk memory的数量保存在mp->nb_mem_chunks。在chunk虚拟内存中,依次划分对象实体,通过rte_mempoo_ops填充接口rte_mempool_ops_populate()调用mempool_add_elem()将一个个实体对象插入到mp->elt_list链表上,关键函数如下。

点击(此处)折叠或打开

  1. i = rte_mempool_ops_populate(mp, mp->size - mp->populated_size,
  2.         (char *)vaddr + off,
  3.         (iova == RTE_BAD_IOVA) ? RTE_BAD_IOVA : (iova + off),
  4.         len - off, mempool_add_elem, NULL);

返回值i表示该chunk memory中填充的对象个数。mempool_add_elem实现如下:

点击(此处)折叠或打开

  1. static void
  2. mempool_add_elem(struct rte_mempool *mp, __rte_unused void *opaque,
  3.          void *obj, rte_iova_t iova)
  4. {
  5.     struct rte_mempool_objhdr *hdr;
  6.     struct rte_mempool_objtlr *tlr __rte_unused;

  7.     /* set mempool ptr in header */
  8.     hdr = RTE_PTR_SUB(obj, sizeof(*hdr));
  9.     hdr->mp = mp;
  10.     hdr->iova = iova;
  11.     STAILQ_INSERT_TAIL(&mp->elt_list, hdr, next);
  12.     mp->populated_size++;

  13. #ifdef RTE_LIBRTE_MEMPOOL_DEBUG
  14.     hdr->cookie = RTE_MEMPOOL_HEADER_COOKIE2;
  15.     tlr = rte_mempool_get_trailer(obj);
  16.     tlr->cookie = RTE_MEMPOOL_TRAILER_COOKIE;
  17. #endif
  18. }

3.6 初始化pkt mbuf

调用rte_mempool_obj_iter()遍历rte_mempool中的所有对象,调用rte_pktmbuf_init()初始化每个对象,
遍历所有对象的接口:

点击(此处)折叠或打开

  1. uint32_t
  2. rte_mempool_obj_iter(struct rte_mempool *mp,
  3.     rte_mempool_obj_cb_t *obj_cb, void *obj_cb_arg)
  4. {
  5.     struct rte_mempool_objhdr *hdr;
  6.     void *obj;
  7.     unsigned n = 0;

  8.     STAILQ_FOREACH(hdr, &mp->elt_list, next) {
  9.         obj = (char *)hdr + sizeof(*hdr);
  10.         obj_cb(mp, obj_cb_arg, obj, n);
  11.         n++;
  12.     }

  13.     return n;
  14. }
初始化每个对象的接口:

点击(此处)折叠或打开

  1. void
  2. rte_pktmbuf_init(struct rte_mempool *mp,
  3.          __rte_unused void *opaque_arg,
  4.          void *_m,
  5.          __rte_unused unsigned i)
  6. {
  7.     struct rte_mbuf *m = _m;
  8.     uint32_t mbuf_size, buf_len, priv_size;

  9.     RTE_ASSERT(mp->private_data_size >=
  10.          sizeof(struct rte_pktmbuf_pool_private));

  11.     priv_size = rte_pktmbuf_priv_size(mp);
  12.     mbuf_size = sizeof(struct rte_mbuf) + priv_size;
  13.     buf_len = rte_pktmbuf_data_room_size(mp);

  14.     RTE_ASSERT(RTE_ALIGN(priv_size, RTE_MBUF_PRIV_ALIGN) == priv_size);
  15.     RTE_ASSERT(mp->elt_size >= mbuf_size);
  16.     RTE_ASSERT(buf_len <= UINT16_MAX);

  17.     memset(m, 0, mbuf_size);
  18.     /* start of buffer is after mbuf structure and priv data */
  19.     m->priv_size = priv_size;
  20.     m->buf_addr = (char *)m + mbuf_size;
  21.     m->buf_iova = rte_mempool_virt2iova(m) + mbuf_size;
  22.     m->buf_len = (uint16_t)buf_len;

  23.     /* keep some headroom between start of buffer and data */
  24.     m->data_off = RTE_MIN(RTE_PKTMBUF_HEADROOM, (uint16_t)m->buf_len);

  25.     /* init some constant fields */
  26.     m->pool = mp;
  27.     m->nb_segs = 1;
  28.     m->port = RTE_MBUF_PORT_INVALID;
  29.     rte_mbuf_refcnt_set(m, 1);
  30.     m->next = NULL;
  31. }
至此,一个rte_mempool的池子就建立完毕。

四、rte_mempool使用 

pktmbuf pool中的mbuf是供网口收包和应用发包使用的。
从内存池中申请一个原始的mbuf:

点击(此处)折叠或打开

  1. static inline struct rte_mbuf *rte_mbuf_raw_alloc(struct rte_mempool *mp)
申请接口内部会调用rte_mempool_get_bulk()从mp中批量获取n个mbuf(此处n为1,该接口支持批量申请,接口如下)。从本地core的cache中获取,不够则先从rte_ring中获取mbuf保存在本地cache中。

点击(此处)折叠或打开

  1. static __rte_always_inline int
  2. rte_mempool_get_bulk(struct rte_mempool *mp, void **obj_table, unsigned int n)
  3. {
  4.     struct rte_mempool_cache *cache;
  5.     cache = rte_mempool_default_cache(mp, rte_lcore_id());
  6.     rte_mempool_trace_get_bulk(mp, obj_table, n, cache);
  7.     return rte_mempool_generic_get(mp, obj_table, n, cache);
  8. }

  9. static __rte_always_inline int
  10. rte_mempool_generic_get(struct rte_mempool *mp, void **obj_table,
  11.             unsigned int n, struct rte_mempool_cache *cache)
  12. {
  13.     int ret;
  14.     ret = rte_mempool_do_generic_get(mp, obj_table, n, cache);
  15.     if (ret == 0)
  16.         RTE_MEMPOOL_CHECK_COOKIES(mp, obj_table, n, 1);
  17.     rte_mempool_trace_generic_get(mp, obj_table, n, cache);
  18.     return ret;
  19. }

  20. static __rte_always_inline int
  21. rte_mempool_do_generic_get(struct rte_mempool *mp, void **obj_table,
  22.              unsigned int n, struct rte_mempool_cache *cache)
  23. {
  24.     int ret;
  25.     uint32_t index, len;
  26.     void **cache_objs;

  27.     /* No cache provided or cannot be satisfied from cache */
  28.     if (unlikely(cache == NULL || n >= cache->size))
  29.         goto ring_dequeue;

  30.     cache_objs = cache->objs;

  31.     /* Can this be satisfied from the cache? */
  32.     if (cache->len < n) {
  33.         /* No. Backfill the cache first, and then fill from it */
  34.         uint32_t req = n + (cache->size - cache->len);

  35.         /* How many do we require i.e. number to fill the cache + the request */
  36.         ret = rte_mempool_ops_dequeue_bulk(mp,
  37.             &cache->objs[cache->len], req);
  38.         if (unlikely(ret < 0)) {
  39.             /*
  40.              * In the off chance that we are buffer constrained,
  41.              * where we are not able to allocate cache + n, go to
  42.              * the ring directly. If that fails, we are truly out of
  43.              * buffers.
  44.              */
  45.             goto ring_dequeue;
  46.         }

  47.         cache->len += req;
  48.     }

  49.     /* Now fill in the response ... */
  50.     for (index = 0, len = cache->len - 1; index < n; ++index, len--, obj_table++)
  51.         *obj_table = cache_objs[len];

  52.     cache->len -= n;

  53.     RTE_MEMPOOL_STAT_ADD(mp, get_success_bulk, 1);
  54.     RTE_MEMPOOL_STAT_ADD(mp, get_success_objs, n);

  55.     return 0;

  56. ring_dequeue:

  57.     /* get remaining objects from ring */
  58.     ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, n);

  59.     if (ret < 0) {
  60.         RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1);
  61.         RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n);
  62.     } else {
  63.         RTE_MEMPOOL_STAT_ADD(mp, get_success_bulk, 1);
  64.         RTE_MEMPOOL_STAT_ADD(mp, get_success_objs, n);
  65.     }

  66.     return ret;
  67. }
将一个mbuf放回到内存池:

点击(此处)折叠或打开

  1. void rte_mbuf_raw_free(struct rte_mbuf *m)
释放接口内部调用rte_mempool_put_bulk()将n个mbuf(此处n为1,该接口支持批量申请,接口如下)释放到内存池。先释放到本地core的cache,本地cache满且仍有多余则释放到rte_ring中。

点击(此处)折叠或打开

  1. static __rte_always_inline void
  2. rte_mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table,
  3.          unsigned int n)
  4. {
  5.     struct rte_mempool_cache *cache;
  6.     cache = rte_mempool_default_cache(mp, rte_lcore_id());
  7.     rte_mempool_trace_put_bulk(mp, obj_table, n, cache);
  8.     rte_mempool_generic_put(mp, obj_table, n, cache);
  9. }

  10. static __rte_always_inline void
  11. rte_mempool_generic_put(struct rte_mempool *mp, void * const *obj_table,
  12.             unsigned int n, struct rte_mempool_cache *cache)
  13. {
  14.     rte_mempool_trace_generic_put(mp, obj_table, n, cache);
  15.     RTE_MEMPOOL_CHECK_COOKIES(mp, obj_table, n, 0);
  16.     rte_mempool_do_generic_put(mp, obj_table, n, cache);
  17. }

  18. static __rte_always_inline void
  19. rte_mempool_do_generic_put(struct rte_mempool *mp, void * const *obj_table,
  20.              unsigned int n, struct rte_mempool_cache *cache)
  21. {
  22.     void **cache_objs;

  23.     /* increment stat now, adding in mempool always success */
  24.     RTE_MEMPOOL_STAT_ADD(mp, put_bulk, 1);
  25.     RTE_MEMPOOL_STAT_ADD(mp, put_objs, n);

  26.     /* No cache provided or if put would overflow mem allocated for cache */
  27.     if (unlikely(cache == NULL || n > RTE_MEMPOOL_CACHE_MAX_SIZE))
  28.         goto ring_enqueue;

  29.     cache_objs = &cache->objs[cache->len];

  30.     /*
  31.      * The cache follows the following algorithm
  32.      * 1. Add the objects to the cache
  33.      * 2. Anything greater than the cache min value (if it crosses the
  34.      * cache flush threshold) is flushed to the ring.
  35.      */

  36.     /* Add elements back into the cache */
  37.     rte_memcpy(&cache_objs[0], obj_table, sizeof(void *) * n);

  38.     cache->len += n;

  39.     if (cache->len >= cache->flushthresh) {
  40.         rte_mempool_ops_enqueue_bulk(mp, &cache->objs[cache->size],
  41.                 cache->len - cache->size);
  42.         cache->len = cache->size;
  43.     }

  44.     return;

  45. ring_enqueue:

  46.     /* push remaining objects in ring */
  47. #ifdef RTE_LIBRTE_MEMPOOL_DEBUG
  48.     if (rte_mempool_ops_enqueue_bulk(mp, obj_table, n) < 0)
  49.         rte_panic("cannot put objects in mempool\n");
  50. #else
  51.     rte_mempool_ops_enqueue_bulk(mp, obj_table, n);
  52. #endif
  53. }

五、rte_mempool信息查询

rte_mempool的状态信息查询接口rte_mempool_dump(FILE *f, struct rte_mempool *mp),支持dump如下信息:

点击(此处)折叠或打开

  1. void
  2. rte_mempool_dump(FILE *f, struct rte_mempool *mp)
  3. {
  4. #ifdef RTE_LIBRTE_MEMPOOL_DEBUG
  5.     struct rte_mempool_info info;
  6.     struct rte_mempool_debug_stats sum;
  7.     unsigned lcore_id;
  8. #endif
  9.     struct rte_mempool_memhdr *memhdr;
  10.     struct rte_mempool_ops *ops;
  11.     unsigned common_count;
  12.     unsigned cache_count;
  13.     size_t mem_len = 0;

  14.     RTE_ASSERT(f != NULL);
  15.     RTE_ASSERT(mp != NULL);

  16.     fprintf(f, "mempool <%s>@%p\n", mp->name, mp);
  17.     fprintf(f, " flags=%x\n", mp->flags);
  18.     fprintf(f, " socket_id=%d\n", mp->socket_id);
  19.     fprintf(f, " pool=%p\n", mp->pool_data);
  20.     fprintf(f, " iova=0x%" PRIx64 "\n", mp->mz->iova);
  21.     fprintf(f, " nb_mem_chunks=%u\n", mp->nb_mem_chunks);
  22.     fprintf(f, " size=%"PRIu32"\n", mp->size);
  23.     fprintf(f, " populated_size=%"PRIu32"\n", mp->populated_size);
  24.     fprintf(f, " header_size=%"PRIu32"\n", mp->header_size);
  25.     fprintf(f, " elt_size=%"PRIu32"\n", mp->elt_size);
  26.     fprintf(f, " trailer_size=%"PRIu32"\n", mp->trailer_size);
  27.     fprintf(f, " total_obj_size=%"PRIu32"\n",
  28.      mp->header_size + mp->elt_size + mp->trailer_size);

  29.     fprintf(f, " private_data_size=%"PRIu32"\n", mp->private_data_size);

  30.     fprintf(f, " ops_index=%d\n", mp->ops_index);
  31.     ops = rte_mempool_get_ops(mp->ops_index);
  32.     fprintf(f, " ops_name: <%s>\n", (ops != NULL) ? ops->name : "NA");

  33.     STAILQ_FOREACH(memhdr, &mp->mem_list, next)
  34.         mem_len += memhdr->len;
  35.     if (mem_len != 0) {
  36.         fprintf(f, " avg bytes/object=%#Lf\n",
  37.             (long double)mem_len / mp->size);
  38.     }

  39.     cache_count = rte_mempool_dump_cache(f, mp);
  40.     common_count = rte_mempool_ops_get_count(mp);
  41.     if ((cache_count + common_count) > mp->size)
  42.         common_count = mp->size - cache_count;
  43.     fprintf(f, " common_pool_count=%u\n", common_count);

  44.     /* sum and dump statistics */
  45. #ifdef RTE_LIBRTE_MEMPOOL_DEBUG
  46.     rte_mempool_ops_get_info(mp, &info);
  47.     memset(&sum, 0, sizeof(sum));
  48.     for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
  49.         sum.put_bulk += mp->stats[lcore_id].put_bulk;
  50.         sum.put_objs += mp->stats[lcore_id].put_objs;
  51.         sum.put_common_pool_bulk += mp->stats[lcore_id].put_common_pool_bulk;
  52.         sum.put_common_pool_objs += mp->stats[lcore_id].put_common_pool_objs;
  53.         sum.get_common_pool_bulk += mp->stats[lcore_id].get_common_pool_bulk;
  54.         sum.get_common_pool_objs += mp->stats[lcore_id].get_common_pool_objs;
  55.         sum.get_success_bulk += mp->stats[lcore_id].get_success_bulk;
  56.         sum.get_success_objs += mp->stats[lcore_id].get_success_objs;
  57.         sum.get_fail_bulk += mp->stats[lcore_id].get_fail_bulk;
  58.         sum.get_fail_objs += mp->stats[lcore_id].get_fail_objs;
  59.         sum.get_success_blks += mp->stats[lcore_id].get_success_blks;
  60.         sum.get_fail_blks += mp->stats[lcore_id].get_fail_blks;
  61.     }
  62.     fprintf(f, " stats:\n");
  63.     fprintf(f, " put_bulk=%"PRIu64"\n", sum.put_bulk);
  64.     fprintf(f, " put_objs=%"PRIu64"\n", sum.put_objs);
  65.     fprintf(f, " put_common_pool_bulk=%"PRIu64"\n", sum.put_common_pool_bulk);
  66.     fprintf(f, " put_common_pool_objs=%"PRIu64"\n", sum.put_common_pool_objs);
  67.     fprintf(f, " get_common_pool_bulk=%"PRIu64"\n", sum.get_common_pool_bulk);
  68.     fprintf(f, " get_common_pool_objs=%"PRIu64"\n", sum.get_common_pool_objs);
  69.     fprintf(f, " get_success_bulk=%"PRIu64"\n", sum.get_success_bulk);
  70.     fprintf(f, " get_success_objs=%"PRIu64"\n", sum.get_success_objs);
  71.     fprintf(f, " get_fail_bulk=%"PRIu64"\n", sum.get_fail_bulk);
  72.     fprintf(f, " get_fail_objs=%"PRIu64"\n", sum.get_fail_objs);
  73.     if (info.contig_block_size > 0) {
  74.         fprintf(f, " get_success_blks=%"PRIu64"\n",
  75.             sum.get_success_blks);
  76.         fprintf(f, " get_fail_blks=%"PRIu64"\n", sum.get_fail_blks);
  77.     }
  78. #else
  79.     fprintf(f, " no statistics available\n");
  80. #endif

  81.     rte_mempool_audit(mp);
  82. }
rte_mempool中有一些统计信息,保存在mp->stats,值得关注,它是通过RTE_LIBRTE_MEMPOOL_DEBUG控制的,一般不会打开。
rte_mempool库中还有两个很有用的接口:rte_mempool_dump_cache(FILE *f, const struct rte_mempool *mp)得到指定内存池中每个本地core的cache中可用的对象个数。
和rte_mempool_ops_get_count(const struct rte_mempool *mp)得到rte_ring中可用的对象个数。
再根据mp->size和以上连个值,可以计算得到应用中的mbuf使用中的个数,有些产品业务中很关注些指标。



阅读(3702) | 评论(0) | 转发(0) |
0

上一篇:没有了

下一篇:DPDK rte_mbuf常用接口汇总

给主人留下些什么吧!~~