Chinaunix首页 | 论坛 | 博客
  • 博客访问: 321244
  • 博文数量: 239
  • 博客积分: 481
  • 博客等级: 下士
  • 技术积分: 1170
  • 用 户 组: 普通用户
  • 注册时间: 2008-07-15 17:28
文章分类

全部博文(239)

文章存档

2014年(13)

2013年(6)

2012年(122)

2011年(98)

分类: LINUX

2014-02-17 11:30:37

原文地址:Linux内核中流量控制(1) 作者:Godbach

本文档的Copyleft归yfydz所有,使用GPL发布,可以自由拷贝,转载,转载时请保持文档的完整性,
严禁用于任何商业用途。
msn:
来源:http://yfydz.cublog.cn

1. 前言

linux内核中提供了流量控制的相关处理功能,相关代码在net/sched目录下;而应用层上的控制是通
过iproute2软件包中的tc来实现,tc和sched的关系就好象iptables和netfilter的关系一样,一个是
用户层接口,一个是具体实现,关于tc的使用方法可详将Linux Advanced Routing HOWTO,本文主要
分析内核中的具体实现。

流控包括几个部分: 流控算法, 通常在net/sched/sch_*.c中实现, 缺省的是FIFO, 是比较典型的黑
盒模式, 对外只看到入队和出对两个操作; 流控结构的操作处理; 和用户空间的控制接口, 是通过
rtnetlink实现的。

以下内核代码版本为2.6.19.2。

2. 控制入口

2.1 控制入口

linux流控功能反映为网卡设备的属性,表明是网络最底层的处理部分, 已经和上层的网络协议栈无
关了:

/* include/linux/netdevice.h */
struct net_device
{
......
/*
 * Cache line mostly used on queue transmit path (qdisc)
 */
 /* device queue lock */
 spinlock_t  queue_lock ____cacheline_aligned_in_smp;
// 这是发送数据时的队列处理
 struct Qdisc  *qdisc;
// 网卡停止时保存网卡活动时的队列处理方法
 struct Qdisc  *qdisc_sleeping;
// 网卡处理的数据队列链表
 struct list_head qdisc_list;
// 最大队列长度
 unsigned long  tx_queue_len; /* Max frames per queue allowed */
 /* Partially transmitted GSO packet. */
 struct sk_buff  *gso_skb;
 /* ingress path synchronizer */
// 输入流控锁
 spinlock_t  ingress_lock;
// 这是对于接收数据时的队列处理
 struct Qdisc  *qdisc_ingress;
......

2.1.2 输出流控

数据发出流控处理时,上层的所有处理已经完成,数据包已经交到网卡设备进行发送,在数据发送时
进行相关的流控处理网络数据的出口函数为dev_queue_xmit(); 如果是接收流控, 数据只是刚从网卡
设备中收到, 还未交到网络上层处理, 不过网卡的输入流控不是必须的, 缺省情况下并不进行流控,
输入流控入口函数为ing_filter()函数,该函数被skb_receive_skb()调用:

/* net/core/dev.c */
int dev_queue_xmit(struct sk_buff *skb)
{
 struct net_device *dev = skb->dev;
 struct Qdisc *q;
 int rc = -ENOMEM;
......
 /* Updates of qdisc are serialized by queue_lock.
  * The struct Qdisc which is pointed to by qdisc is now a
  * rcu structure - it may be accessed without acquiring
  * a lock (but the structure may be stale.) The freeing of the
  * qdisc will be deferred until it's known that there are no
  * more references to it.
  *
  * If the qdisc has an enqueue function, we still need to
  * hold the queue_lock before calling it, since queue_lock
  * also serializes access to the device queue.
  */
// 获取网卡的qdisc指针, 此出不需要锁, 是各个CPU的私有数据
 q = rcu_dereference(dev->qdisc);
#ifdef CONFIG_NET_CLS_ACT
 skb->tc_verd = SET_TC_AT(skb->tc_verd,AT_EGRESS);
#endif
// 如果队列输入非空, 将数据包入队
// 对于物理网卡设备, 缺省使用的是FIFO qdisc, 该成员函数非空, 只有逻辑网卡
// 才可能为空
 if (q->enqueue) {
  /* Grab device queue */
// 加锁
  spin_lock(&dev->queue_lock);
// 可以直接访问dev->qdisc了
  q = dev->qdisc;
  if (q->enqueue) {
// 入队处理
   rc = q->enqueue(skb, q);
// 运行流控, 出队列操作
   qdisc_run(dev);
   spin_unlock(&dev->queue_lock);
   rc = rc == NET_XMIT_BYPASS ? NET_XMIT_SUCCESS : rc;
   goto out;
  }
  spin_unlock(&dev->queue_lock);
 }
......
}
 

// 出队操作
static inline void qdisc_run(struct net_device *dev)
{
 if (!netif_queue_stopped(dev) &&
     !test_and_set_bit(__LINK_STATE_QDISC_RUNNING, &dev->state))
  __qdisc_run(dev);
}

/* net/sched/sch_generic.c */
void __qdisc_run(struct net_device *dev)
{
// 如果是noop_qdisc流控, 实际是丢包
 if (unlikely(dev->qdisc == &noop_qdisc))
  goto out;
 while (qdisc_restart(dev) < 0 && !netif_queue_stopped(dev))
  /* NOTHING */;
out:
 clear_bit(__LINK_STATE_QDISC_RUNNING, &dev->state);
}

/* Kick device.
   Note, that this procedure can be called by a watchdog timer, so that
   we do not check dev->tbusy flag here.
   Returns:  0  - queue is empty.
            >0  - queue is not empty, but throttled.
     <0  - queue is not empty. Device is throttled, if dev->tbusy != 0.
   NOTE: Called under dev->queue_lock with locally disabled BH.
*/
static inline int qdisc_restart(struct net_device *dev)
{
 struct Qdisc *q = dev->qdisc;
 struct sk_buff *skb;
 /* Dequeue packet */
// 数据包出队
 if (((skb = dev->gso_skb)) || ((skb = q->dequeue(q)))) {
  unsigned nolock = (dev->features & NETIF_F_LLTX);
  dev->gso_skb = NULL;
......
}
2.1.3 输入流控

输入流控好象不是必须的,目前内核需要配置CONFIG_NET_CLS_ACT选项才起作用:

/* net/core/dev.c */
int netif_receive_skb(struct sk_buff *skb)
{
......
#ifdef CONFIG_NET_CLS_ACT
 if (pt_prev) {
  ret = deliver_skb(skb, pt_prev, orig_dev);
  pt_prev = NULL; /* noone else should process this after*/
 } else {
  skb->tc_verd = SET_TC_OK2MUNGE(skb->tc_verd);
 }
 ret = ing_filter(skb);
 if (ret == TC_ACT_SHOT || (ret == TC_ACT_STOLEN)) {
  kfree_skb(skb);
  goto out;
 }
 skb->tc_verd = 0;
ncls:
#endif
......
}

static int ing_filter(struct sk_buff *skb)
{
 struct Qdisc *q;
 struct net_device *dev = skb->dev;
 int result = TC_ACT_OK;
// 如果网卡设备有输入流控处理
 if (dev->qdisc_ingress) {
  __u32 ttl = (__u32) G_TC_RTTL(skb->tc_verd);
  if (MAX_RED_LOOP < ttl++) {
   printk(KERN_WARNING "Redir loop detected Dropping packet (%s->%
s)\n",
    skb->input_dev->name, skb->dev->name);
   return TC_ACT_SHOT;
  }
// 设置数据包的TC参数
  skb->tc_verd = SET_TC_RTTL(skb->tc_verd,ttl);
  skb->tc_verd = SET_TC_AT(skb->tc_verd,AT_INGRESS);
  spin_lock(&dev->ingress_lock);
  if ((q = dev->qdisc_ingress) != NULL)
// 数据入队
   result = q->enqueue(skb, q);
  spin_unlock(&dev->ingress_lock);
 }
 return result;
}

2.2 初始化

本节先跟踪一下网卡设备的qdisc指针是如何被赋值的, 缺省赋值为何值.

在网卡设备的初始化函数register_netdevice()函数中调用dev_init_scheduler()函数对网卡设备的
流控队列处理进行了初始化, 也就是说每个网络网卡设备的qdisc指针都不会是空的:
/* net/sched/sch_gentric.c */
void dev_init_scheduler(struct net_device *dev)
{
 qdisc_lock_tree(dev);
// 处理发出数据的qdisc是必须的,而处理输入数据的qdisc_ingress则不是必须的
// 缺省情况下的qdisc
 dev->qdisc = &noop_qdisc;
 dev->qdisc_sleeping = &noop_qdisc;
 INIT_LIST_HEAD(&dev->qdisc_list);
 qdisc_unlock_tree(dev);
 dev_watchdog_init(dev);
}

当然noop_qdisc中的调度是不可用的, 只进行丢包处理;网卡在激活(dev_open)时会调用
dev_activate()函数重新对qdisc指针赋值,但未对qdisc_ingress赋值:
/* net/sched/sch_generic.c */
void dev_activate(struct net_device *dev)
{
 /* No queueing discipline is attached to device;
    create default one i.e. pfifo_fast for devices,
    which need queueing and noqueue_qdisc for
    virtual interfaces
  */
// 如果当前的qdisc_sleeping是noop_qdisc,重新找一个流控操作指针
 if (dev->qdisc_sleeping == &noop_qdisc) {
  struct Qdisc *qdisc;
// 前提条件是发送队列长度非0, 这正常情况肯定满足的
  if (dev->tx_queue_len) {
// 对dev设备建立fifo处理, 只是缺省的网卡发送策略: FIFO, 先入先出
   qdisc = qdisc_create_dflt(dev, &pfifo_fast_ops);
   if (qdisc == NULL) {
    printk(KERN_INFO "%s: activation failed\n", dev->name);
    return;
   }
   write_lock(&qdisc_tree_lock);
   list_add_tail(&qdisc->list, &dev->qdisc_list);
   write_unlock(&qdisc_tree_lock);
  } else {
   qdisc =  &noqueue_qdisc;
  }
  write_lock(&qdisc_tree_lock);
// 对qdisc_sleeping赋值
  dev->qdisc_sleeping = qdisc;
  write_unlock(&qdisc_tree_lock);
 }
// 如果现在网线没插, 返回
 if (!netif_carrier_ok(dev))
  /* Delay activation until next carrier-on event */
  return;
 spin_lock_bh(&dev->queue_lock);
// 将网卡当前的qdisc赋值为qdisc_sleeping所指的qdisc
 rcu_assign_pointer(dev->qdisc, dev->qdisc_sleeping);
 if (dev->qdisc != &noqueue_qdisc) {
// 启动
  dev->trans_start = jiffies;
  dev_watchdog_up(dev);
 }
 spin_unlock_bh(&dev->queue_lock);
}

qdisc_sleeping用于保存在网卡起效时使用的qdisc, 因为在网卡down或网线拔除不可用时, 网卡设
备的qdisc指针会指向noqueue_qdisc, 该qdisc操作就只是丢包, 这就是为什么在没插网线情况下也
可以调用发包函数处理的原因, 结果就是直接丢包。
/* net/sched/sch_generic.c */
void dev_deactivate(struct net_device *dev)
{
 struct Qdisc *qdisc;
 spin_lock_bh(&dev->queue_lock);
 qdisc = dev->qdisc;
// 将网卡当前qdisc设置为noop_qdisc
 dev->qdisc = &noop_qdisc;
// 释放原来的qdisc
 qdisc_reset(qdisc);
 spin_unlock_bh(&dev->queue_lock);
 dev_watchdog_down(dev);
 /* Wait for outstanding dev_queue_xmit calls. */
 synchronize_rcu();
 /* Wait for outstanding qdisc_run calls. */
 while (test_bit(__LINK_STATE_QDISC_RUNNING, &dev->state))
  yield();
 if (dev->gso_skb) {
  kfree_skb(dev->gso_skb);
  dev->gso_skb = NULL;
 }
}
 
3. 数据结构

流控处理对外表现是一个黑盒,外部只能看到数据入队和出队,但内部队列是如何操作和管理外面是
不知道的;另外处理队列处理外,流控还有一个调度器,该调度器将数据进行分类,然后对不同类型
的数据采取不同的流控处理,所分的类型可能是多级的,形成一个树型的分类树。

流控的基本数据结构是struct Qdisc(queueing discipline,直译是“排队纪律”,意译为“流控”
),这是内核中为数不多的以大写字母开头结构名称之一:
/* include/net/sch_generic.h */
struct Qdisc
{
// 入队操作
 int    (*enqueue)(struct sk_buff *skb, struct Qdisc *dev);
// 出队操作
 struct sk_buff * (*dequeue)(struct Qdisc *dev);
// 标志
 unsigned  flags;
#define TCQ_F_BUILTIN 1
#define TCQ_F_THROTTLED 2
#define TCQ_F_INGRESS 4
 int   padded;
// Qdisc的基本操作结构
 struct Qdisc_ops *ops;
// 句柄
 u32   handle;
 u32   parent;
 atomic_t  refcnt;
// 数据包链表头
 struct sk_buff_head q;
// 网卡设备
 struct net_device *dev;
 struct list_head list;
// 统计信息
 struct gnet_stats_basic bstats;
 struct gnet_stats_queue qstats;
// 速率估计
 struct gnet_stats_rate_est rate_est;
// 流控锁
 spinlock_t  *stats_lock;
 struct rcu_head  q_rcu;
 int   (*reshape_fail)(struct sk_buff *skb,
     struct Qdisc *q);
 /* This field is deprecated, but it is still used by CBQ
  * and it will live until better solution will be invented.
  */
// 父节点, 但基本已经被淘汰了
 struct Qdisc  *__parent;
};

流控队列的基本操作结构:
struct Qdisc_ops
{
// 链表中的下一个
 struct Qdisc_ops *next;
// 类别操作结构
 struct Qdisc_class_ops *cl_ops;
// Qdisc的名称, 从数组大小看应该就是网卡名称
 char   id[IFNAMSIZ];
// 私有数据大小
 int   priv_size;
// 入队
 int    (*enqueue)(struct sk_buff *, struct Qdisc *);
// 出队
 struct sk_buff * (*dequeue)(struct Qdisc *);
// 将数据包重新排队
 int    (*requeue)(struct sk_buff *, struct Qdisc *);
// 丢弃
 unsigned int  (*drop)(struct Qdisc *);
// 初始化
 int   (*init)(struct Qdisc *, struct rtattr *arg);
// 复位为初始状态,释放缓冲,删除定时器,清空计数器
 void   (*reset)(struct Qdisc *);
// 释放
 void   (*destroy)(struct Qdisc *);
// 更改Qdisc参数
 int   (*change)(struct Qdisc *, struct rtattr *arg);
// 输出
 int   (*dump)(struct Qdisc *, struct sk_buff *);
 int   (*dump_stats)(struct Qdisc *, struct gnet_dump *);
 struct module  *owner;
};
流控队列类别操作结构:
struct Qdisc_class_ops
{
 /* Child qdisc manipulation */
// 减子节点
 int   (*graft)(struct Qdisc *, unsigned long cl,
     struct Qdisc *, struct Qdisc **);
// 增加子节点
 struct Qdisc *  (*leaf)(struct Qdisc *, unsigned long cl);
 /* Class manipulation routines */
// 获取, 增加使用计数
 unsigned long  (*get)(struct Qdisc *, u32 classid);
// 释放, 减少使用计数
 void   (*put)(struct Qdisc *, unsigned long);
// 改变
 int   (*change)(struct Qdisc *, u32, u32,
     struct rtattr **, unsigned long *);
// 删除
 int   (*delete)(struct Qdisc *, unsigned long);
// 遍历
 void   (*walk)(struct Qdisc *, struct qdisc_walker * arg);
 /* Filter manipulation */
 struct tcf_proto ** (*tcf_chain)(struct Qdisc *, unsigned long);
// tc捆绑
 unsigned long  (*bind_tcf)(struct Qdisc *, unsigned long,
     u32 classid);
// tc解除
 void   (*unbind_tcf)(struct Qdisc *, unsigned long);
 /* rtnetlink specific */
// 输出
 int   (*dump)(struct Qdisc *, unsigned long,
     struct sk_buff *skb, struct tcmsg*);
 int   (*dump_stats)(struct Qdisc *, unsigned long,
     struct gnet_dump *);
};
// 流控速率控制表结构
struct qdisc_rate_table
{
 struct tc_ratespec rate;
 u32  data[256];
 struct qdisc_rate_table *next;
 int  refcnt;
};

4. 基本操作

各种流控算法是通过流控操作结构实现,然后这些操作结构登记到内核的流控链表,在使用时可为网
卡构造新的流控结构,将该结构和某种流控操作结构挂钩,这样就实现网卡采用某种策略发送数据进
行流控,所有操作在用户空间都可通过tc程序设置。

4.1 Qdisc的一些基本操作

4.1.1 分配新的流控结构
/* net/sched/sch_generic.c */
// 分配新的Qdisc结构, Qdisc的操作结构由函数参数指定
struct Qdisc *qdisc_alloc(struct net_device *dev, struct Qdisc_ops *ops)
{
 void *p;
 struct Qdisc *sch;
 unsigned int size;
 int err = -ENOBUFS;
 /* ensure that the Qdisc and the private data are 32-byte aligned */
// Qdisc空间按32字节对齐
 size = QDISC_ALIGN(sizeof(*sch));
// 增加私有数据空间
 size += ops->priv_size + (QDISC_ALIGNTO - 1);
 p = kzalloc(size, GFP_KERNEL);
 if (!p)
  goto errout;
// 确保从缓冲区中的sch到缓冲区结束点空间是32字节对齐的
 sch = (struct Qdisc *) QDISC_ALIGN((unsigned long) p);
// 填充字节的数量
 sch->padded = (char *) sch - (char *) p;
//  +------------------------------------+
//  |________|___________________________|
//  ^        ^                           ^
//  |  pad   | <------ 32*N ------------>|
//  p        sch                         

// 初始化链表, 将用于挂接到dev的Qdisc链表
 INIT_LIST_HEAD(&sch->list);
// 初始化数据包链表
 skb_queue_head_init(&sch->q);
// Qdisc结构参数
 sch->ops = ops;
 sch->enqueue = ops->enqueue;
 sch->dequeue = ops->dequeue;
 sch->dev = dev;
// 网卡使用计数增加
 dev_hold(dev);
 sch->stats_lock = &dev->queue_lock;
 atomic_set(&sch->refcnt, 1);
 return sch;
errout:
 return ERR_PTR(-err);
}

struct Qdisc * qdisc_create_dflt(struct net_device *dev, struct Qdisc_ops *ops)
{
 struct Qdisc *sch;
// 分配Qdisc结构 
 sch = qdisc_alloc(dev, ops);
 if (IS_ERR(sch))
  goto errout;
// 如果没有初始化函数或者初始化成功, 返回Qdisc结构
 if (!ops->init || ops->init(sch, NULL) == 0)
  return sch;
// 初始化失败, 释放Qdisc
 qdisc_destroy(sch);
errout:
 return NULL;
}
/* Under dev->queue_lock and BH! */
// 调用Qdisc操作函数中的reset函数
void qdisc_reset(struct Qdisc *qdisc)
{
 struct Qdisc_ops *ops = qdisc->ops;
 if (ops->reset)
  ops->reset(qdisc);
}
/* this is the rcu callback function to clean up a qdisc when there
 * are no further references to it */
// 真正释放Qdisc缓冲区
static void __qdisc_destroy(struct rcu_head *head)
{
 struct Qdisc *qdisc = container_of(head, struct Qdisc, q_rcu);
// qdisc-padded就是缓冲区头的位置
 kfree((char *) qdisc - qdisc->padded);
}
/* Under dev->queue_lock and BH! */
// 释放Qdisc
void qdisc_destroy(struct Qdisc *qdisc)
{
 struct Qdisc_ops  *ops = qdisc->ops;
// 检查Qdisc的使用计数
 if (qdisc->flags & TCQ_F_BUILTIN ||
     !atomic_dec_and_test(&qdisc->refcnt))
  return;
// 将Qdisc从网卡设备的Qdisc链表中断开
 list_del(&qdisc->list);
#ifdef CONFIG_NET_ESTIMATOR
 gen_kill_estimator(&qdisc->bstats, &qdisc->rate_est);
#endif
// 复位操作
 if (ops->reset)
  ops->reset(qdisc);
// 内部释放操作
 if (ops->destroy)
  ops->destroy(qdisc);
// 减少操作结构的模块计数
 module_put(ops->owner);
// 减少网卡使用计数
 dev_put(qdisc->dev);
// 对每个CPU的数据进行具体空间释放
 call_rcu(&qdisc->q_rcu, __qdisc_destroy);
}

/* include/net/sch_generic.h */
// 将skb包添加到数据队列最后
static inline int __qdisc_enqueue_tail(struct sk_buff *skb, struct Qdisc *sch,
           struct sk_buff_head *list)
{
// 将数据包连接到数据包链表尾
 __skb_queue_tail(list, skb);
// 更新统计信息
// 当前队列中数据包的数据长度增加
 sch->qstats.backlog += skb->len;
// Qdisc处理的数据包数字节数增加
 sch->bstats.bytes += skb->len;
 sch->bstats.packets++;
 return NET_XMIT_SUCCESS;
}
static inline int qdisc_enqueue_tail(struct sk_buff *skb, struct Qdisc *sch)
{
 return __qdisc_enqueue_tail(skb, sch, &sch->q);
}
// 将队列头的数据包出队列
static inline struct sk_buff *__qdisc_dequeue_head(struct Qdisc *sch,
         struct sk_buff_head *list)
{
// 从skb链表中头skb包出队列
 struct sk_buff *skb = __skb_dequeue(list);
// 减少当前数据队列数据长度计数
 if (likely(skb != NULL))
  sch->qstats.backlog -= skb->len;
 return skb;
}
static inline struct sk_buff *qdisc_dequeue_head(struct Qdisc *sch)
{
 return __qdisc_dequeue_head(sch, &sch->q);
}
// 将队列尾的数据包出队列
static inline struct sk_buff *__qdisc_dequeue_tail(struct Qdisc *sch,
         struct sk_buff_head *list)
{
// 从链表为提出数据包
 struct sk_buff *skb = __skb_dequeue_tail(list);
 if (likely(skb != NULL))
  sch->qstats.backlog -= skb->len;
 return skb;
}
static inline struct sk_buff *qdisc_dequeue_tail(struct Qdisc *sch)
{
 return __qdisc_dequeue_tail(sch, &sch->q);
}
// 将数据包重新入队
static inline int __qdisc_requeue(struct sk_buff *skb, struct Qdisc *sch,
      struct sk_buff_head *list)
{
// 添加到队列头
 __skb_queue_head(list, skb);
// 增加队列数据长度计数, 但不增加Qdisc处理的数据包数字节数
 sch->qstats.backlog += skb->len;
 sch->qstats.requeues++;
 return NET_XMIT_SUCCESS;
}
static inline int qdisc_requeue(struct sk_buff *skb, struct Qdisc *sch)
{
 return __qdisc_requeue(skb, sch, &sch->q);
}
// 复位Qdisc队列
static inline void __qdisc_reset_queue(struct Qdisc *sch,
           struct sk_buff_head *list)
{
 /*
  * We do not know the backlog in bytes of this list, it
  * is up to the caller to correct it
  */
// 释放Qdisc当前数据包队列中的所有数据包
 skb_queue_purge(list);
}
static inline void qdisc_reset_queue(struct Qdisc *sch)
{
 __qdisc_reset_queue(sch, &sch->q);
 sch->qstats.backlog = 0;
}
// 丢弃Qdisc数据队列尾的数据包
static inline unsigned int __qdisc_queue_drop(struct Qdisc *sch,
           struct sk_buff_head *list)
{
// 取队列尾数据包
 struct sk_buff *skb = __qdisc_dequeue_tail(sch, list);
 if (likely(skb != NULL)) {
// 释放该数据包
  unsigned int len = skb->len;
  kfree_skb(skb);
  return len;
 }
 return 0;
}
static inline unsigned int qdisc_queue_drop(struct Qdisc *sch)
{
 return __qdisc_queue_drop(sch, &sch->q);
}
// 丢弃数据包
static inline int qdisc_drop(struct sk_buff *skb, struct Qdisc *sch)
{
// 释放数据包
 kfree_skb(skb);
// 丢包计数增加
 sch->qstats.drops++;
 return NET_XMIT_DROP;
}
// 整形失败丢包
static inline int qdisc_reshape_fail(struct sk_buff *skb, struct Qdisc *sch)
{
 sch->qstats.drops++;
#ifdef CONFIG_NET_CLS_POLICE
 if (sch->reshape_fail == NULL || sch->reshape_fail(skb, sch))
  goto drop;
 return NET_XMIT_SUCCESS;
drop:
#endif
 kfree_skb(skb);
 return NET_XMIT_DROP;
}

/* net/sched/sch_api.c */
/* We know handle. Find qdisc among all qdisc's attached to device
   (root qdisc, all its children, children of children etc.)
 */
// 根据句柄查找Qdisc, 句柄是个32位整数用于标识Qdisc的
struct Qdisc *qdisc_lookup(struct net_device *dev, u32 handle)
{
 struct Qdisc *q;
 read_lock(&qdisc_tree_lock);
// 遍历dev设备Qdisc链表
 list_for_each_entry(q, &dev->qdisc_list, list) {
// 句柄相同,返回Qdisc
  if (q->handle == handle) {
   read_unlock(&qdisc_tree_lock);
   return q;
  }
 }
 read_unlock(&qdisc_tree_lock);
 return NULL;
}
// 返回指定类别的Qdisc叶节点
static struct Qdisc *qdisc_leaf(struct Qdisc *p, u32 classid)
{
 unsigned long cl;
 struct Qdisc *leaf;
// Qdisc类别操作
 struct Qdisc_class_ops *cops = p->ops->cl_ops;
 if (cops == NULL)
  return NULL;
// 获取指定classid类型的类别句柄
 cl = cops->get(p, classid);
 if (cl == 0)
  return NULL;
// 调用类别操作结构的left(Godbach注:应为leaf)成员函数获取叶Qdisc节点
 leaf = cops->leaf(p, cl);
 cops->put(p, cl);
 return leaf;
}
 
/* Graft qdisc "new" to class "classid" of qdisc "parent" or
   to device "dev".
   Old qdisc is not destroyed but returned in *old.
 */
// "嫁接"Qdisc, 将新的Qdisc节点添加到父节点作为叶节点
static int qdisc_graft(struct net_device *dev, struct Qdisc *parent,
         u32 classid,
         struct Qdisc *new, struct Qdisc **old)
{
 int err = 0;
 struct Qdisc *q = *old;

 if (parent == NULL) {
// 父qdisc节点为空, 将新节点作为dev的基本qdisc, 返回dev原来的老的qdisc
  if (q && q->flags&TCQ_F_INGRESS) {
   *old = dev_graft_qdisc(dev, q);
  } else {
   *old = dev_graft_qdisc(dev, new);
  }
 } else {
// 父qdisc非空情况
// 将使用Qdisc类操作结构中的相关成员函数来完成操作
  struct Qdisc_class_ops *cops = parent->ops->cl_ops;
  err = -EINVAL;
  if (cops) {
// 获取类别句柄值
   unsigned long cl = cops->get(parent, classid);
   if (cl) {
// 类别有效, 调用graft成员函数将新节点插入qdisc树中
    err = cops->graft(parent, cl, new, old);
    if (new)
     new->parent = classid;
    cops->put(parent, cl);
   }
  }
 }
 return err;
}

/* Attach toplevel qdisc to device dev */
// 将qdisc作为顶层Qdisc节点附着于dev设备
static struct Qdisc *
dev_graft_qdisc(struct net_device *dev, struct Qdisc *qdisc)
{
 struct Qdisc *oqdisc;
// 如果网卡设备是启动的, 先停掉
 if (dev->flags & IFF_UP)
  dev_deactivate(dev);
// 加树锁
 qdisc_lock_tree(dev);
// 是处理输入的流控节点
 if (qdisc && qdisc->flags&TCQ_F_INGRESS) {
  oqdisc = dev->qdisc_ingress;
  /* Prune old scheduler */
  if (oqdisc && atomic_read(&oqdisc->refcnt) <= 1) {
   /* delete */
   qdisc_reset(oqdisc);
   dev->qdisc_ingress = NULL;
  } else {  /* new */
   dev->qdisc_ingress = qdisc;
  }
 } else {
// 是处理输入(Godbach注:应为输出)的流控节点
// 备份一下原来的流控Qdisc指针
  oqdisc = dev->qdisc_sleeping;
  /* Prune old scheduler */
// 如果老的Qdisc引用数不超过一个, 也就是最多只被当前设备dev所使用, 复位该Qdisc
  if (oqdisc && atomic_read(&oqdisc->refcnt) <= 1)
   qdisc_reset(oqdisc);
  /* ... and graft new one */
// 如果新qdisc是空的,用noop_qdisc
  if (qdisc == NULL)
   qdisc = &noop_qdisc;
// 将睡眠qdisc(dev启动时将使用的qdisc)赋值为新的qdisc
  dev->qdisc_sleeping = qdisc;
  dev->qdisc = &noop_qdisc;
 }
 qdisc_unlock_tree(dev);
// 激活网卡
 if (dev->flags & IFF_UP)
  dev_activate(dev);
// 返回dev设备的老的qdisc
 return oqdisc;
}
 

/*
   Allocate and initialize new qdisc.
   Parameters are passed via opt.
 */
// 在指定的网卡设备上创建新的Qdisc结构
static struct Qdisc *
qdisc_create(struct net_device *dev, u32 handle, struct rtattr **tca, int *errp)
{
 int err;
 struct rtattr *kind = tca[TCA_KIND-1];
 struct Qdisc *sch;
 struct Qdisc_ops *ops;
// 查找相关的Qdisc操作结构
 ops = qdisc_lookup_ops(kind);
#ifdef CONFIG_KMOD
// 如果没在当前内核中找到,加载相关名称的Qdisc操作内核模块
 if (ops == NULL && kind != NULL) {
  char name[IFNAMSIZ];
  if (rtattr_strlcpy(name, kind, IFNAMSIZ) < IFNAMSIZ) {
   /* We dropped the RTNL semaphore in order to
    * perform the module load.  So, even if we
    * succeeded in loading the module we have to
    * tell the caller to replay the request.  We
    * indicate this using -EAGAIN.
    * We replay the request because the device may
    * go away in the mean time.
    */
   rtnl_unlock();
// 加载模块
   request_module("sch_%s", name);
   rtnl_lock();
// 重新查找
   ops = qdisc_lookup_ops(kind);
// 如果找到还是会返回失败,不过错误号是EAGAIN,要求重新执行相关操作
   if (ops != NULL) {
    /* We will try again qdisc_lookup_ops,
     * so don't keep a reference.
     */
    module_put(ops->owner);
    err = -EAGAIN;
    goto err_out;
   }
  }
 }
#endif
 err = -ENOENT;
 if (ops == NULL)
  goto err_out;
// 分配新的Qdisc结构
 sch = qdisc_alloc(dev, ops);
 if (IS_ERR(sch)) {
  err = PTR_ERR(sch);
  goto err_out2;
 }
 if (handle == TC_H_INGRESS) {
// 是针对输入进行流控
  sch->flags |= TCQ_F_INGRESS;
  handle = TC_H_MAKE(TC_H_INGRESS, 0);
 } else if (handle == 0) {
// 分配个新的句柄
  handle = qdisc_alloc_handle(dev);
  err = -ENOMEM;
  if (handle == 0)
   goto err_out3;
 }
 sch->handle = handle;
// 调用Qdisc操作结构的初始化函数进行初始化
 if (!ops->init || (err = ops->init(sch, tca[TCA_OPTIONS-1])) == 0) {
#ifdef CONFIG_NET_ESTIMATOR
  if (tca[TCA_RATE-1]) {
// 创建预估器
   err = gen_new_estimator(&sch->bstats, &sch->rate_est,
      sch->stats_lock,
      tca[TCA_RATE-1]);
   if (err) {
    /*
     * Any broken qdiscs that would require
     * a ops->reset() here? The qdisc was never
     * in action so it shouldn't be necessary.
     */
    if (ops->destroy)
     ops->destroy(sch);
    goto err_out3;
   }
  }
#endif
  qdisc_lock_tree(dev);
// 将Qdisc结构添加到网卡设备的流控链表
  list_add_tail(&sch->list, &dev->qdisc_list);
  qdisc_unlock_tree(dev);
  return sch;
 }
err_out3:
 dev_put(dev);
 kfree((char *) sch - sch->padded);
err_out2:
 module_put(ops->owner);
err_out:
 *errp = err;
 return NULL;
}
/* Allocate an unique handle from space managed by kernel */
// 分配新句柄
static u32 qdisc_alloc_handle(struct net_device *dev)
{
// 最多循环65536次查找
 int i = 0x10000;
// 注意这是个静态量
 static u32 autohandle = TC_H_MAKE(0x80000000U, 0);
 do {
// 每次为句柄值增加一个增量
  autohandle += TC_H_MAKE(0x10000U, 0);
// 溢出处理
  if (autohandle == TC_H_MAKE(TC_H_ROOT, 0))
   autohandle = TC_H_MAKE(0x80000000U, 0);
// qdisc_lookup是根据句柄查找Qdisc结构,成功返回Qdisc结构指针, 失败返回NULL
 } while (qdisc_lookup(dev, autohandle) && --i > 0);
 return i>0 ? autohandle : 0;
}

// 修改Qdisc参数
static int qdisc_change(struct Qdisc *sch, struct rtattr **tca)
{
 if (tca[TCA_OPTIONS-1]) {
  int err;
  if (sch->ops->change == NULL)
   return -EINVAL;
// 调用Qdisc操作结构的change函数完成修改参数工作
  err = sch->ops->change(sch, tca[TCA_OPTIONS-1]);
  if (err)
   return err;
 }
#ifdef CONFIG_NET_ESTIMATOR
 if (tca[TCA_RATE-1])
  gen_replace_estimator(&sch->bstats, &sch->rate_est,
   sch->stats_lock, tca[TCA_RATE-1]);
#endif
 return 0;
}

4.2 Qdisc操作结构的一些基本操作

/* net/sched/sch_api.c */
// 登记Qdisc操作结构, 每种排队算法都是通过Qdisc操作结构实现的
int register_qdisc(struct Qdisc_ops *qops)
{
 struct Qdisc_ops *q, **qp;
 int rc = -EEXIST;
 write_lock(&qdisc_mod_lock);
// qdisc_base是全局变量, 系统的Qdisc操作结构链表头
// 遍历Qdisc操作链表
 for (qp = &qdisc_base; (q = *qp) != NULL; qp = &q->next)
// 如果ID相同, 返回已经存在错误
  if (!strcmp(qops->id, q->id))
   goto out;
// 如果操作结构中没有定义入队,出队和重入队操作的话, 用系统缺省的
 if (qops->enqueue == NULL)
  qops->enqueue = noop_qdisc_ops.enqueue;
 if (qops->requeue == NULL)
  qops->requeue = noop_qdisc_ops.requeue;
 if (qops->dequeue == NULL)
  qops->dequeue = noop_qdisc_ops.dequeue;
// 将结构节点添加到链表, 注意这里没使用内核里最常见的list链表操作
// 这是个单向链表
 qops->next = NULL;
 *qp = qops;
 rc = 0;
out:
 write_unlock(&qdisc_mod_lock);
 return rc;
}

// 拆除Qdisc操作结构
int unregister_qdisc(struct Qdisc_ops *qops)
{
 struct Qdisc_ops *q, **qp;
 int err = -ENOENT;
 write_lock(&qdisc_mod_lock);
// 由于没有用list, 必须遍历链表找到节点在链表中的位置
 for (qp = &qdisc_base; (q=*qp)!=NULL; qp = &q->next)
  if (q == qops)
   break;
 if (q) {
// 将结构节点从链表中断开
  *qp = q->next;
  q->next = NULL;
  err = 0;
 }
 write_unlock(&qdisc_mod_lock);
 return err;
}
/* Find queueing discipline by name */
// 根据名称查找;流控操作结构
static struct Qdisc_ops *qdisc_lookup_ops(struct rtattr *kind)
{
 struct Qdisc_ops *q = NULL;
 if (kind) {
  read_lock(&qdisc_mod_lock);
// 遍历Qdisc操作链表
  for (q = qdisc_base; q; q = q->next) {
// 比较ID
   if (rtattr_strcmp(kind, q->id) == 0) {
// 增加Qdisc操作模块的计数
    if (!try_module_get(q->owner))
     q = NULL;
    break;
   }
  }
  read_unlock(&qdisc_mod_lock);
 }
 return q;
}

...... 待续 ......
阅读(987) | 评论(0) | 转发(0) |
0

上一篇:Linux内核中流量控制(2)

下一篇:VI命令集锦

给主人留下些什么吧!~~