首先要知道的是有一个算法:叫做 Token Bucket算法,用来做流量控制(Rate limiting)。基本思想就是:(摘自wiki)
The algorithm can be conceptually understood as follows:
- A token is added to the bucket every 1 / r seconds.
- The bucket can hold at the most b tokens. If a token arrives when the bucket is full, it is discarded.
- When a packet (network layer PDU) of n bytes arrives, n tokens are removed from the bucket, and the packet is sent to the network.
- If fewer than n tokens are available, no tokens are removed from the bucket, and the packet is considered to be non-conformant.
The algorithm allows bursts of up to b bytes, but over the long run the output of conformant packets is limited to the constant rate, r. Non-conformant packets can be treated in various ways:
- They may be dropped.
- They may be enqueued for subsequent transmission when sufficient tokens have accumulated in the bucket.
- They may be transmitted, but marked as being non-conformant, possibly to be dropped subsequently if the network is overloaded.
Implementers of this algorithm on platforms lacking the clock resolution necessary to add a single token to the bucket every 1 / r
seconds may want to consider an alternative formulation. Given the
ability to update the token bucket every S milliseconds, the number of
tokens to add every S milliseconds = (r * S) / 1000.
但是看了filezilla的实现,没有这么复杂,filezilla的实现算法大概:
1. 如果有n个thread,那么就有n被监控的object
2. 主thread有一个timer,250ms触发一次,每次OnTimer被触发后,都从新读取用户对总体
流量的设定的上限。包括输出和读入2个限速值,设分别为maxIn, maxOut
3. 通过一定的计算,算出每一次timer触发,每一个thread平均允许获得的输出及输入值,即让maxOut/一秒钟触发timer的次数 = 每次触发允许发送的字节量。同理可以得到读入量. 并分别保存在数组中。允许maxOut和maxIn一定程度的加倍,以处理burst的情况。
比如一个thread上次没有将自己的份额用完,那么我们这次在检查他的数组时,我们不能直接将其允许下次发送的值修改为平均值,因为这等于使该thread放弃了它应该获得的速度,我们在他当前剩下的数值的基础上加上这次又允许的平均值,但是这个值是有上限的,即不能超过一定程度。
4. 如果有些thread因为读/写完了他的允许的数据量,那么它就会设置一个readwait/writewait标记,并不是真正的去wait,而是当再次让他去读/写的时候,即调用read/write的时候,就什么都不作。而只有等到OnTimer操作去将他的wait标记清除。
具体实现:
1. class CRateLimiter
class CRateLimiterObject
CRateLimiter会创建自己的一个静态实例,它含有一个CRateLimiterObject列表,每个CRateLimiterObject其实对应了一个CBackend对象,而一个CBackend对象对应了一个thread.
thread的读写操作都经过CBackend过滤,进而根据要读写的数据量和当前允许的数值比较,如果当前允许数据〉0那么就可以进行一定的读写,如果==0, 那么就只能设置自己的wait标记为真。如果进行了一定的读写,就要更新自己的允许量,使其减少一定数值。如果==0了,也要置wait标记。
2. 具体的读写的触发:
貌似wxWidget的worker thread没有event loop,因此不能向worker thread发送event。即你不可能写一个OnXXX()让一个thread去响应一个event。而只能写OnXXX让main thread去响应。
main thread创建了一个timer,他的OnTimer就只能在mainthread运行。它做上面描述的工作,工作完毕后,就看是否有thread的wait标记需要被清除,如果有,就清除他们,然后发送event,即Read和write event,告知可以进行读写了。但是响应者还是main thread. 由于event在创建的时候就指定了响应对象,即CSocketEventHandler对象,其实指向的是CRealSocket对象。那么也就是说,event发送给了不同的对象,也可看作不同thread对应的对象,但注意,其执行还是在mainthread里。这个不同的对对象执行OnSocketEvent()函数,他们会调用OnRead()或者OnWrite()去做真正的socket读写操作,(在main thread)里。
除了main thread的rate timer会触发出读写event的发送,在各个thread里的Entry()循环里面,也会根据他的状态和任务,根据select的结果,设置m_triggered标记,然后调用sendEvents()来发送event。所以thread也会触发读写操作,当然响应者是main thread。
阅读(3448) | 评论(0) | 转发(0) |