项目中要用到ringbuffer,一直都是自己造轮子,调试中才发现经常会出问题,主要是没有加内存屏障。近期自己学习了linux kernel的kfifo,才发现原来内核对于ringbuffer,早就实现了一套,而且代码之精妙,一般人很难想到。不同于我们自己造轮子的定长ringbuffer,kfifo提供的一套接口,支持变长的ringbuffer。下面细细分析:
本文分析的源代码:linux kernel 2.6.34.12 kfifo.c kfifo.h
1. Kfifo简介
Kfifo是内核里面的一个first in first out数据结构,它采用环形循环队列(ringbuffer)的数据结构来实现;它提供一个无边界的字节流服务,最重要的一点是,它使用并行无锁编程技术,即当它用于只有一个入队线程和一个出队线程的场景时,两个线程可以并发操作,而不需要任何加锁行为,就可以保证kfifo的线程安全。
2. Kfifo数据结构
-
struct kfifo {
-
unsigned char *buffer; /* the buffer holding the data */
-
unsigned int size; /* the size of the allocated buffer */
-
unsigned int in; /* data is added at offset (in % size) */
-
unsigned int out; /* data is extracted from off. (out % size) */
-
};
-
buffer, 用于存放数据的缓存
-
size, buffer空间的大小,必须为2的幂次方
-
in, out, 和buffer一起构成一个循环队列。 in指向buffer中数据队头,out指向buffer中数据的队尾
这个结构有以下特性:
-
初始状态下,in == out == 0
-
入队导致in递增,出队导致out递增。注意in和out总是递增的,从来不会减少
-
计算in对应的缓冲区索引的公式是fifo->in & (fifo->size-1),见__kfifo_off函数
-
计算缓冲区中元素数量的公式是fifo->in – fifo->out,见kfifo_size函数
根据以上特性,在运行的时候
in和out迟早会溢出。然而即使溢出,kfifo_size函数仍然是对的,这个就是kfifo的精妙之处。
下面我们来验证下这个结论:
预备知识:在32位机器上,假设N为正整数,static_cast(-N) == 2^32 – N。也就是说,-N的补码的比特位,如果强制解释成正整数,其数值为2^32-N。
分情况证明:
-
如果in和out都没有溢出,或者in和out都溢出了,是对的。
-
如果in溢出了,out没有溢出。记in在溢出之前的数值为in_nofl,则in_nofl == 2^32+in。由于fifo.in < fifo.out,fifo.in – fifo.out为负数,因此static_cast(fifo.in-fifo.out) == 2^32 -(fifo.out-fifo.in) == (2^32+fifo.in)-fifo.out == in_nofl – fifo.out。因此是正确的。
-
不可能in没有溢出同时out溢出。
3. kfifo_alloc 分配kfifo内存和初始化工作
这个接口主要是分配kfifo的buffer内核,并初始化kfifo。kfifo->size的值总是在调用者传进来的size参数的基础上向2的幂扩展,这是内核一贯的做法。这样的好处不言而喻--对kfifo->size取模运算可以转化为与运算,如下:
-
kfifo->in % kfifo->size 可以转化为 kfifo->in & (kfifo->size – 1)
4. kfifo_in
kfifo in是入队操作,其一共3个步骤:
1) 计算fifo中可写的长度
2) 将数据放入buffer里面
3) 修改in参数
上代码:
-
/**
-
* kfifo_in - puts some data into the FIFO
-
* @fifo: the fifo to be used.
-
* @from: the data to be added.
-
* @len: the length of the data to be added.
-
*
-
* This function copies at most @len bytes from the @from buffer into
-
* the FIFO depending on the free space, and returns the number of
-
* bytes copied.
-
*
-
* Note that with only one concurrent reader and one concurrent
-
* writer, you don't need extra locking to use these functions.
-
*/
-
unsigned int kfifo_in(struct kfifo *fifo, const void *from,
-
unsigned int len)
-
{
-
len = min(kfifo_avail(fifo), len);
-
-
__kfifo_in_data(fifo, from, len, 0);
-
__kfifo_add_in(fifo, len);
-
return len;
-
}
-
-
static inline void __kfifo_in_data(struct kfifo *fifo,
-
const void *from, unsigned int len, unsigned int off)
-
{
-
unsigned int l;
-
-
/*
-
* Ensure that we sample the fifo->out index -before- we
-
* start putting bytes into the kfifo.
-
*/
-
-
smp_mb();
-
-
off = __kfifo_off(fifo, fifo->in + off);
-
-
/* first put the data starting from fifo->in to buffer end */
-
l = min(len, fifo->size - off);
-
memcpy(fifo->buffer + off, from, l);
-
-
/* then put the rest (if any) at the beginning of the buffer */
-
memcpy(fifo->buffer, from + l, len - l);
-
}
-
-
/*
-
* __kfifo_add_in internal helper function for updating the in offset
-
*/
-
static inline void __kfifo_add_in(struct kfifo *fifo,
-
unsigned int off)
-
{
-
smp_wmb();
-
fifo->in += off;
-
}
从__kfifo_in_data函数中,可以明显看到,如果fifo->in小于buffer->size,那么先放完buffer->size-fifo->in这段 内存空间,剩下的部分,转移到buffer的可用空间开头存放;如果fifo->in大于buffer->size,那么直接把要入队列的数 据放到buffer可用空间开头。
5. kfifo_out
kfifo out是出队操作,其一共3个步骤:
1) 计算fifo中可读的长度
2) 将数据从buffer中移走
3) 修改out参数
-
/**
-
* kfifo_out - gets some data from the FIFO
-
* @fifo: the fifo to be used.
-
* @to: where the data must be copied.
-
* @len: the size of the destination buffer.
-
*
-
* This function copies at most @len bytes from the FIFO into the
-
* @to buffer and returns the number of copied bytes.
-
*
-
* Note that with only one concurrent reader and one concurrent
-
* writer, you don't need extra locking to use these functions.
-
*/
-
unsigned int kfifo_out(struct kfifo *fifo, void *to, unsigned int len)
-
{
-
len = min(kfifo_len(fifo), len);
-
-
__kfifo_out_data(fifo, to, len, 0);
-
__kfifo_add_out(fifo, len);
-
-
return len;
-
}
-
-
static inline void __kfifo_out_data(struct kfifo *fifo,
-
void *to, unsigned int len, unsigned int off)
-
{
-
unsigned int l;
-
-
/*
-
* Ensure that we sample the fifo->in index -before- we
-
* start removing bytes from the kfifo.
-
*/
-
-
smp_rmb();
-
-
off = __kfifo_off(fifo, fifo->out + off);
-
-
/* first get the data from fifo->out until the end of the buffer */
-
l = min(len, fifo->size - off);
-
memcpy(to, fifo->buffer + off, l);
-
-
/* then get the rest (if any) from the beginning of the buffer */
-
memcpy(to + l, fifo->buffer, len - l);
-
}
-
-
/*
-
* __kfifo_add_out internal helper function for updating the out offset
-
*/
-
static inline void __kfifo_add_out(struct kfifo *fifo,
-
unsigned int off)
-
{
-
smp_mb();
-
fifo->out += off;
-
}
6. 场景分析
in和out均没有超出fifo->size,这个很好理解。下面我们主要分析两种溢出的场景:
1) fifo->in大于fifo->size而fifo->out小于fifo->size(即只有 fifo->in“越界”)
这种情况下,先读取fifo->out到fifo->size-1的那一段,大小为L个byte,然后再读取剩下的。从buffer开头,大小为len-L个byte的数据(即先读data A段, 再读出data B段)
2) fifo->in和fifo->out都“越界”了
这种情况下,L = min(len, fifo->size - (fifo->out & (fifo->size - 1))); 这一语句便起作用了,此时fifo->out&fifo->size-1的结果即实际要读的数据所在的内存地址相对于buffer起始地址的偏移值(如下图所示,左边为实际上存在于内存中的data A段, 而右边虚线框为逻辑上的data A段的位置)
阅读(1484) | 评论(0) | 转发(0) |