Chinaunix首页 | 论坛 | 博客
  • 博客访问: 312043
  • 博文数量: 144
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 493
  • 用 户 组: 普通用户
  • 注册时间: 2013-08-14 17:08
文章分类

全部博文(144)

文章存档

2015年(42)

2014年(19)

2013年(83)

我的朋友

分类: LINUX

2013-08-14 17:28:54

原文地址:Linux内核中流量控制(3) 作者:Godbach

Linux内核中流量控制(3)

本文档的Copyleft归yfydz所有,使用GPL发布,可以自由拷贝,转载,转载时请保持文档的完整性,
严禁用于任何商业用途。
msn:
来源:http://yfydz.cublog.cn

5.2 FIFO

FIFO算法在net/sched/sch_fifo.c中定义, 既可以单独使用, 也可以被其他流控算法(如TBF)作为内
部流控算法使用.

5.2.1 流控结构
FIFO分两种, 一种是PFIFO, 一种是BFIFO, 分别按包数和字节数进行FIFO的流量控制.
 
// 私有数据结构, 就是流量参数
struct fifo_sched_data
{
 u32 limit;
};
// 包数流控
struct Qdisc_ops pfifo_qdisc_ops = {
 .id  = "pfifo",
 .priv_size = sizeof(struct fifo_sched_data),
 .enqueue = pfifo_enqueue,
 .dequeue = qdisc_dequeue_head,
 .requeue = qdisc_requeue,
 .drop  = qdisc_queue_drop,
 .init  = fifo_init,
 .reset  = qdisc_reset_queue,
 .change  = fifo_init,
 .dump  = fifo_dump,
 .owner  = THIS_MODULE,
};

// 字节数流控
struct Qdisc_ops bfifo_qdisc_ops = {
 .id  = "bfifo",
 .priv_size = sizeof(struct fifo_sched_data),
 .enqueue = bfifo_enqueue,
 .dequeue = qdisc_dequeue_head,
 .requeue = qdisc_requeue,
 .drop  = qdisc_queue_drop,
 .init  = fifo_init,
 .reset  = qdisc_reset_queue,
 .change  = fifo_init,
 .dump  = fifo_dump,
 .owner  = THIS_MODULE,
};
在这两个结构中, 处理enqueue, init, reset, dump外, 其他操作函数都是用流控缺省操作函数.

5.2.2 初始化

static int fifo_init(struct Qdisc *sch, struct rtattr *opt)
{
// FIFO私有数据
 struct fifo_sched_data *q = qdisc_priv(sch);
 if (opt == NULL) {
// 如果没提供流量参数情况
// limit缺省取网卡的发送队列长度, 而且至少为1
  u32 limit = sch->dev->tx_queue_len ? : 1;
// 如果是字节流控, 用的是队列长度*MTU作为限制值
  if (sch->ops == &bfifo_qdisc_ops)
   limit *= sch->dev->mtu;
// 如果是包流控, 直接用队列长度作为限制值
  q->limit = limit;
 } else {
// 提供合法流量参数时用提供的流控值作为流控限制参数
  struct tc_fifo_qopt *ctl = RTA_DATA(opt);
  if (RTA_PAYLOAD(opt) < sizeof(*ctl))
   return -EINVAL;
  q->limit = ctl->limit;
 }
 return 0;
}

5.2.3 入队

// 字节流控入队
static int bfifo_enqueue(struct sk_buff *skb, struct Qdisc* sch)
{
 struct fifo_sched_data *q = qdisc_priv(sch);
// 如果当前队列数据总长加数据包长度不超过限制值, 将数据包添加到队列尾
 if (likely(sch->qstats.backlog + skb->len <= q->limit))
  return qdisc_enqueue_tail(skb, sch);
 return qdisc_reshape_fail(skb, sch);
}

// 包数流控入队
static int pfifo_enqueue(struct sk_buff *skb, struct Qdisc* sch)
{
 struct fifo_sched_data *q = qdisc_priv(sch);
// 如果缓存队列数据包数小于限制值, 将数据包添加到队列尾
 if (likely(skb_queue_len(&sch->q) < q->limit))
  return qdisc_enqueue_tail(skb, sch);
 return qdisc_reshape_fail(skb, sch);
}
>>>
Godbach 注:
(1)字节流控入队时, 是采用该队列统计到的入队字节总数sch->qstats.backlog,并加上当前skb->len,和队列规则配置的limit进行比较的。如果可以入队,skb入队之后,sch->qstats.backlog的值会进行修改,要加上当前数据包的长度:sch->qstats.backlog += skb->len。这在函数qdisc_enqueue_tail()->__qdisc_enqueue_tail()实现。
(2)包数流控入队时,是采用当前队列中记录的struct sk_buff_head 结构体的成员qlen和队列规则配置的limit进行比较的。如果可以入队,则skb入队之后,qlen会进行1,这在函数qdisc_enqueue_tail()->__qdisc_enqueue_tail()->__skb_queue_tail()中通过list->qlen++来实现。
(3)bfifo和pfifo的出队函数都是qdisc_dequeue_head()。因为该函数既实现了bfifo队列的sch->qstats.backlog -= skb->len,又实现了pfifo队列的list->qlen--。
>>>
5.2.4 输出

// 就是将私有数据limit作为参数返回, 用的应该是netlink套接口
static int fifo_dump(struct Qdisc *sch, struct sk_buff *skb)
{
 struct fifo_sched_data *q = qdisc_priv(sch);
 struct tc_fifo_qopt opt = { .limit = q->limit };
 RTA_PUT(skb, TCA_OPTIONS, sizeof(opt), &opt);
 return skb->len;
rtattr_failure:
 return -1;
}
 
5.3 TBF(Token Bucket Filter queue, 令牌桶过滤队列)

TBF算法是一种限制流量算法,基本原理是将入队的数据包缓存在队列中,同时按指定的速度产生令
牌,只有拥有令牌才能数据包出队,这样就控制了出口流量,目前netfilter中的limit匹配实际也是
用TBF算法, 该算法在net/sched/sch_tbf.c中定义,在该文件的注释中定义的算法如下:
/*
 Description.
 ------------
 A data flow obeys TBF with rate R and depth B, if for any
 time interval t_i...t_f the number of transmitted bits
 does not exceed B + R*(t_f-t_i).
 Packetized version of this definition:
 The sequence of packets of sizes s_i served at moments t_i
 obeys TBF, if for any i<=k:
 s_i+....+s_k <= B + R*(t_k - t_i)
 Algorithm.
 ----------
 Let N(t_i) be B/R initially and N(t) grow continuously with time as:
 N(t+delta) = min{B/R, N(t) + delta}
 If the first packet in queue has length S, it may be
 transmitted only at the time t_* when S/R <= N(t_*),
 and in this case N(t) jumps:
 N(t_* + 0) = N(t_* - 0) - S/R.
 
 Actually, QoS requires two TBF to be applied to a data stream.
 One of them controls steady state burst size, another
 one with rate P (peak rate) and depth M (equal to link MTU)
 limits bursts at a smaller time scale.
 It is easy to see that P>R, and B>M. If P is infinity, this double
 TBF is equivalent to a single one.
 When TBF works in reshaping mode, latency is estimated as:
 lat = max ((L-B)/R, (L-M)/P)
*/
 
5.3.1 操作结构定义

// TBF属性结构
struct tc_tbf_qopt
{
// 速率
 struct tc_ratespec rate;
// 峰值速率
 struct tc_ratespec peakrate;
// 限制值
 __u32  limit;
// 缓冲区大小
 __u32  buffer;
// MTU参数
 __u32  mtu;
};
// TBF算法私有数据结构, 这个结构让人想起limit匹配结构, 也是分两部分, 固定参数和可变参数
struct tbf_sched_data
{
/* Parameters */
// 以下是固定参数
// 流量限制值
 u32  limit;  /* Maximal length of backlog: bytes */
// 桶深, 也就是缓冲区大小
 u32  buffer;  /* Token bucket depth/rate: MUST BE >= MTU/B */
// 网卡MTU
 u32  mtu;
// 允许的最大包长
 u32  max_size;
 struct qdisc_rate_table *R_tab;
 struct qdisc_rate_table *P_tab;
/* Variables */
// 以下是变化数据, 在每次算法计算时会发生改变
// B型token的数量
 long tokens;   /* Current number of B tokens */
// P型token的数量
 long ptokens;  /* Current number of P tokens */
// 时间
 psched_time_t t_c;  /* Time check-point */
// 定时器
 struct timer_list wd_timer; /* Watchdog timer */
// 内部流控结构, 缺省使用bfifo
 struct Qdisc *qdisc;  /* Inner qdisc, default - bfifo queue */
};

// TBF流控算法操作结构
static struct Qdisc_ops tbf_qdisc_ops = {
 .next  = NULL,
 .cl_ops  = &tbf_class_ops,
 .id  = "tbf",
 .priv_size = sizeof(struct tbf_sched_data),
 .enqueue = tbf_enqueue,
 .dequeue = tbf_dequeue,
 .requeue = tbf_requeue,
 .drop  = tbf_drop,
 .init  = tbf_init,
 .reset  = tbf_reset,
 .destroy = tbf_destroy,
 .change  = tbf_change,
 .dump  = tbf_dump,
 .owner  = THIS_MODULE,
};
// TBF类别操作结构
static struct Qdisc_class_ops tbf_class_ops =
{
 .graft  = tbf_graft,
 .leaf  = tbf_leaf,
 .get  = tbf_get,
 .put  = tbf_put,
 .change  = tbf_change_class,
 .delete  = tbf_delete,
 .walk  = tbf_walk,
 .tcf_chain = tbf_find_tcf,
 .dump  = tbf_dump_class,
};

5.3.2 初始化

static int tbf_init(struct Qdisc* sch, struct rtattr *opt)
{
// TBF私有数据
 struct tbf_sched_data *q = qdisc_priv(sch);
 if (opt == NULL)
  return -EINVAL;
// 获取当前实际
 PSCHED_GET_TIME(q->t_c);
// 初始化定时器
 init_timer(&q->wd_timer);
// 定时函数
 q->wd_timer.function = tbf_watchdog;
// 数据就是流控结构本身
 q->wd_timer.data = (unsigned long)sch;
// 内部流控初始化为noop_qdisc
 q->qdisc = &noop_qdisc;
// 调用tbf_change设置TBF流控结构参数
 return tbf_change(sch, opt);
}

// 定时器函数, 功能就是清除阻塞标识, 让网卡重新调度
static void tbf_watchdog(unsigned long arg)
{
 struct Qdisc *sch = (struct Qdisc*)arg;
// 清除阻塞标志
 sch->flags &= ~TCQ_F_THROTTLED;
// 重新进行网卡调度
 netif_schedule(sch->dev);
}

5.3.3 入队

// TBF是根据数据包长度而不是个数来进行流控的, 缺省内部流控结构是bfifo
static int tbf_enqueue(struct sk_buff *skb, struct Qdisc* sch)
{
// TBF私有数据
 struct tbf_sched_data *q = qdisc_priv(sch);
 int ret;
// 如果数据包长度超过TBF允许最大长度, 丢包
 if (skb->len > q->max_size) {
  sch->qstats.drops++;
#ifdef CONFIG_NET_CLS_POLICE
  if (sch->reshape_fail == NULL || sch->reshape_fail(skb, sch))
#endif
   kfree_skb(skb);
  return NET_XMIT_DROP;
 }
// 调用内部流控结构的入队操作, 失败丢包
 if ((ret = q->qdisc->enqueue(skb, q->qdisc)) != 0) {
  sch->qstats.drops++;
  return ret;
 }
// 入队成功增加相关统计量
 sch->q.qlen++;
 sch->bstats.bytes += skb->len;
 sch->bstats.packets++;
 return 0;
}

// 重入队操作
static int tbf_requeue(struct sk_buff *skb, struct Qdisc* sch)
{
// TBF私有数据
 struct tbf_sched_data *q = qdisc_priv(sch);
 int ret;
// 调用内部流控结构的重入队操作, 成功则增加统计计数
 if ((ret = q->qdisc->ops->requeue(skb, q->qdisc)) == 0) {
  sch->q.qlen++;
  sch->qstats.requeues++;
 }
 return ret;
}

5.3.4 出队

// 长度转换为相应大小的令牌数
#define L2T(q,L)   ((q)->R_tab->data[(L)>>(q)->R_tab->rate.cell_log])
// 长度转换为相应大小的P令牌数
#define L2T_P(q,L) ((q)->P_tab->data[(L)>>(q)->P_tab->rate.cell_log])

static struct sk_buff *tbf_dequeue(struct Qdisc* sch)
{
 struct tbf_sched_data *q = qdisc_priv(sch);
 struct sk_buff *skb;
// 调用内部流控结构的出队函数, 对于BFIFO来说就是直接取队列头的那个数据包
 skb = q->qdisc->dequeue(q->qdisc);
 if (skb) {
// 取到数据包的情况
  psched_time_t now;
  long toks, delay;
  long ptoks = 0;
// 长度初始化为数据包长度
  unsigned int len = skb->len;
// 获取当前时间
  PSCHED_GET_TIME(now);
// 根据当前时间和上次流控计算时间的时间差来计算可用的令牌量
  toks = PSCHED_TDIFF_SAFE(now, q->t_c, q->buffer);
// 如果也按包数流控
  if (q->P_tab) {
// 计算当前可用的P令牌数: 当前P令牌数加新增令牌量
   ptoks = toks + q->ptokens;
// 如果超过了MTU值, 限制为MTU
   if (ptoks > (long)q->mtu)
    ptoks = q->mtu;
// 当前可用P令牌数减去当前数据包长度对应的P令牌量
   ptoks -= L2T_P(q, len);
  }
// 再加上原来已经有的令牌量
  toks += q->tokens;
// 如果令牌量超过了全部缓冲区大小, 限制为缓冲区大小
  if (toks > (long)q->buffer)
   toks = q->buffer;
// 当前令牌量减去当前数据包长度对应的令牌量
  toks -= L2T(q, len);
// 如果当前令牌量大于0, 表示桶中的令牌量允许发送该数据包
  if ((toks|ptoks) >= 0) {
// 更新流控时间
   q->t_c = now;
// 更新当前令牌数
   q->tokens = toks;
// 更新P令牌数
   q->ptokens = ptoks;
// 队列长度减
   sch->q.qlen--;
// 此时没阻塞
   sch->flags &= ~TCQ_F_THROTTLED;
// 返回数据包
   return skb;
  }
// 现在是令牌数不够, 说明要发送的数据量超过了限制值, 发生了阻塞
// 计算延迟时间值
  delay = PSCHED_US2JIFFIE(max_t(long, -toks, -ptoks));
// 至少定时一个时间片
  if (delay == 0)
   delay = 1;
// 修改定时器的定时时间
  mod_timer(&q->wd_timer, jiffies+delay);
  /* Maybe we have a shorter packet in the queue,
     which can be sent now. It sounds cool,
     but, however, this is wrong in principle.
     We MUST NOT reorder packets under these circumstances.
     Really, if we split the flow into independent
     subflows, it would be a very good solution.
     This is the main idea of all FQ algorithms
     (cf. CSZ, HPFQ, HFSC)
   */
// 将数据包重新入队等待发送
  if (q->qdisc->ops->requeue(skb, q->qdisc) != NET_XMIT_SUCCESS) {
   /* When requeue fails skb is dropped */
   sch->q.qlen--;
   sch->qstats.drops++;
  }
// 设置阻塞标识
  sch->flags |= TCQ_F_THROTTLED;
  sch->qstats.overlimits++;
 }
 return NULL;
}

5.3.5 丢包

static unsigned int tbf_drop(struct Qdisc* sch)
{
 struct tbf_sched_data *q = qdisc_priv(sch);
 unsigned int len = 0;
// 就是调用内部流控结构的类别操作结构中的丢包操作函数
 if (q->qdisc->ops->drop && (len = q->qdisc->ops->drop(q->qdisc)) != 0) {
  sch->q.qlen--;
  sch->qstats.drops++;
 }
 return len;
}

5.3.6 复位

static void tbf_reset(struct Qdisc* sch)
{
 struct tbf_sched_data *q = qdisc_priv(sch);
// 调用内部流控结构的复位函数
 qdisc_reset(q->qdisc);
// 复位TBF流控结构参数
// 清空队列长度
 sch->q.qlen = 0;
// 获取当前时间
 PSCHED_GET_TIME(q->t_c);
// B令牌数为缓冲区大小
 q->tokens = q->buffer;
// P令牌数为MTU值
 q->ptokens = q->mtu;
// 清除被阻塞标志
 sch->flags &= ~TCQ_F_THROTTLED;
// 删除定时器
 del_timer(&q->wd_timer);
}

5.3.7 生成内部流控结构

static struct Qdisc *tbf_create_dflt_qdisc(struct net_device *dev, u32 limit)
{
// 分配一个bfifo的流控结构
 struct Qdisc *q = qdisc_create_dflt(dev, &bfifo_qdisc_ops);
        struct rtattr *rta;
 int ret;
 if (q) {
// 分配动态流控参数空间,不知道为什么不直接用静态结构而用动态分配的, 该结构也不大
  rta = kmalloc(RTA_LENGTH(sizeof(struct tc_fifo_qopt)), GFP_KERNEL);
  if (rta) {
// 设置该流控的属性, 也就是BFIFO中的流控限制值的大小
// bfifo的change函数实际就是init函数
   rta->rta_type = RTM_NEWQDISC;
   rta->rta_len = RTA_LENGTH(sizeof(struct tc_fifo_qopt));
   ((struct tc_fifo_qopt *)RTA_DATA(rta))->limit = limit;
   ret = q->ops->change(q, rta);
   kfree(rta);
   if (ret == 0)
    return q;
  }
  qdisc_destroy(q);
 }
 return NULL;
}

5.3.8 更新控制参数

// 根据opt参数设置TBF流控结构私有参数
static int tbf_change(struct Qdisc* sch, struct rtattr *opt)
{
 int err = -EINVAL;
// TBF私有数据
 struct tbf_sched_data *q = qdisc_priv(sch);
// 属性表, TCA_TBF_PTAB=3
 struct rtattr *tb[TCA_TBF_PTAB];
// TBF属性结构参数
 struct tc_tbf_qopt *qopt;
 struct qdisc_rate_table *rtab = NULL;
 struct qdisc_rate_table *ptab = NULL;
 struct Qdisc *child = NULL;
 int max_size,n;
// 解析rtattr结构属性值
 if (rtattr_parse_nested(tb, TCA_TBF_PTAB, opt) ||
     tb[TCA_TBF_PARMS-1] == NULL ||
     RTA_PAYLOAD(tb[TCA_TBF_PARMS-1]) < sizeof(*qopt))
  goto done;
// 数据转换为具体的属性参数结构指针
 qopt = RTA_DATA(tb[TCA_TBF_PARMS-1]);
// 根据新流量速率表参数生成新节点返回(系统流量速率链表为空)或
// 拷贝到当前流量速率表链表的头节点返回流量速率表节点
 rtab = qdisc_get_rtab(&qopt->rate, tb[TCA_TBF_RTAB-1]);
 if (rtab == NULL)
  goto done;
// 如果设置了峰值速率
 if (qopt->peakrate.rate) {
// 如果峰值速率大于规定速率, 新生成峰值速率节点
  if (qopt->peakrate.rate > qopt->rate.rate)
   ptab = qdisc_get_rtab(&qopt->peakrate, tb[TCA_TBF_PTAB-1]);
  if (ptab == NULL)
   goto done;
 }
// 找一个大于限制值的数组项
 for (n = 0; n < 256; n++)
  if (rtab->data[n] > qopt->buffer) break;
// 计算最大数据包长度
 max_size = (n << qopt->rate.cell_log)-1;
 if (ptab) {
// 如果峰值速率控制非空
  int size;
// 计算峰值速率发送下的最大数据包长度
  for (n = 0; n < 256; n++)
   if (ptab->data[n] > qopt->mtu) break;
  size = (n << qopt->peakrate.cell_log)-1;
  if (size < max_size) max_size = size;
 }
// 计算出来的最大值小于0, 返回错误
 if (max_size < 0)
  goto done;
 if (qopt->limit > 0) {
// 根据流量限制值生成内部BFIFO流控结构
  if ((child = tbf_create_dflt_qdisc(sch->dev, qopt->limit)) == NULL)
   goto done;
 }
 sch_tree_lock(sch);
 if (child)
  qdisc_destroy(xchg(&q->qdisc, child));
// 填充TBF流控私有结构参数
 q->limit = qopt->limit;
 q->mtu = qopt->mtu;
 q->max_size = max_size;
 q->buffer = qopt->buffer;
 q->tokens = q->buffer;
 q->ptokens = q->mtu;
 rtab = xchg(&q->R_tab, rtab);
 ptab = xchg(&q->P_tab, ptab);
 sch_tree_unlock(sch);
 err = 0;
done:
 if (rtab)
  qdisc_put_rtab(rtab);
 if (ptab)
  qdisc_put_rtab(ptab);
 return err;
}
 
5.3.9 释放

static void tbf_destroy(struct Qdisc *sch)
{
 struct tbf_sched_data *q = qdisc_priv(sch);
// 删除定时器
 del_timer(&q->wd_timer);
// 释放P,R流控速率控制表结构
 if (q->P_tab)
  qdisc_put_rtab(q->P_tab);
 if (q->R_tab)
  qdisc_put_rtab(q->R_tab);
// 释放流控结构
 qdisc_destroy(q->qdisc);
}

5.3.10 输出

static int tbf_dump(struct Qdisc *sch, struct sk_buff *skb)
{
 struct tbf_sched_data *q = qdisc_priv(sch);
// 数据包的数据尾用来存储流控参数
 unsigned char  *b = skb->tail;
 struct rtattr *rta;
// TBF选项结构
 struct tc_tbf_qopt opt;
// 数据包的数据尾定义为rtattr结构
 rta = (struct rtattr*)b;
// 扩展数据包数据长度
 RTA_PUT(skb, TCA_OPTIONS, 0, NULL);
// 填充TBF选项结构: 限制值和当前P, R速率值, MTU, 缓冲区大小
 opt.limit = q->limit;
 opt.rate = q->R_tab->rate;
 if (q->P_tab)
  opt.peakrate = q->P_tab->rate;
 else
  memset(&opt.peakrate, 0, sizeof(opt.peakrate));
 opt.mtu = q->mtu;
 opt.buffer = q->buffer;
// 拷贝到skb中
 RTA_PUT(skb, TCA_TBF_PARMS, sizeof(opt), &opt);
// 属性数据长度
 rta->rta_len = skb->tail - b;
// 返回数据总长
 return skb->len;
rtattr_failure:
 skb_trim(skb, b - skb->data);
 return -1;
}

// 输出类别结构
static int tbf_dump_class(struct Qdisc *sch, unsigned long cl,
     struct sk_buff *skb, struct tcmsg *tcm)
{
 struct tbf_sched_data *q = qdisc_priv(sch);
// 只有一个类别, 返回失败
 if (cl != 1)  /* only one class */
  return -ENOENT;
// 填充TCMSG结构
 tcm->tcm_handle |= TC_H_MIN(1);
 tcm->tcm_info = q->qdisc->handle;
 return 0;
}
 
...... 待续 ......
阅读(344) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~