概述:
相对于 tracker 服务器来说,BT客户端要复杂的多,Bram Cohen 花了一年 full time 的时间来完成 BT,我估计其中大部分时间是用在 BT 客户端的实现和调试上了。
由于 BT 客户端涉及的代码比较多,我不能再象分析 tracker 服务器那样,走上来就深入到细节之中去,那样的话,我写的晕晕糊糊,大家看起来也不知所云。所以第一篇文章先来谈谈客户端的功能、相关协议,以及客户端的总体架构和相关的类的层次结构。这样,从整体上把握之后,大家在自己分析代码的过程中,就能做到胸有成竹。
客户端的功能:
不看代码,只根据 BT 的相关原理,大致可以推测,客户端需要完成以下功能:
1、解析 torrent 文件,获取要下载的文件的详细信息,并在磁盘上创建空文件。
2、与 tracker服务器 建立连接,并交互消息。
3、根据从 tracker 得到的信息,跟其它 peers 建立连接,并下载需要的文件片断
4、监听某端口,等待其它peers 的连接,并提供文件片断的上传。
相关协议:
对客户端来说,它需要处理两种协议:
1、与 tracker 服务器交互的 track HTTP协议。
2、与其它 peers 交互的 BT 对等协议。
总体架构:
从总体上来看,BT客户端实际是以一个服务器的形式在运行。这一点似乎有些难以理解,但确实是这样。
为什么是一个服务器了?
客户端的主要功能是下载文件,但作为一种P2P软件,同时它必须提供上传服务,也就是它必须守候在某一个端口上,等待其它peers 的连接请求。从这一点上来说,它必须以一个服务器的形式运行。我们在后面实际分析代码的时候,可以看到,客户端复用了 RawServer 类用来实现网络服务器。
客户端的代码,是从 download.py 开始的,首先完成功能1,之后就进入服务器循环,在每一次循环过程中,完成功能 2、3、4。其中,Rerequester 类负责完成功能2,它通过 RawServer::add_task(),向 RawServer 添加自己的任务函数,这个任务函数,每隔一段时间与 tracker 服务器进行通信。而Encoder、Connecter 等多个类组合在一起,完成功能3和4。
类层次结构:
BT 客户端涉及的类比较多,我首先大致描述一下这些类的功能,然后给出它们的一个层次结构。
1、RawServer:负责实现网络服务器
2、Rerequester:负责和 tracker 通信。它调用 RawServer::add_task() ,向 RawServer 添加自己的任务函数 Rerequester::c()
3、Encoder:一种 Handler类(在分析 tracker 服务器时候提到),负责处理与其它peers建立连接和以及对读取的数据按照BT对等协议进行分析。
Encoder 类在Encrypter.py中,该文件中,还有一个 Connection 类,而在 Connecter.py 文件中,也有一个 Connection 类,这两个同名的 Connection 类有些蹊跷,为了区分,我把它们重新命名为 E-Connection 和 C-Connection。
3.1、E-Connection:负责 TCP 层次上的连接工作
这两个 Connection 是有区别的,这是因为BT对等协议需要在两个层次上建立连接。首先是 TCP 层次上的连接,也就是经过 TCP 的三次握手之后,建立连接,这个连接由 E-Connection 来管理。在 Encoder:: external_connection_made() 函数中可以看到,一旦有外部连接到来,则创建一个 E-Connection 类。
3.2、C-Connection:管理对等协议层次上的连接。
在 TCP 连接之上,是 BT对等协议的连接,它需要经过BT对等协议的两次“握手”,握手的细节大家去看BT对等协议。过程是这样的:
为了便于述说,我们假设一个BT客户端为 A,另一个客户端为 X。
如果是X主动向A发起连接,那么在TCP连接建立之后,A立刻利用这个连接向X发送BT对等协议的“握手”消息。同样,X在连接一旦建立之后,向 A发送BT对等协议的“握手”消息。A一旦接收到X的“握手”消息,那么它就认为“握手”成功,建立了BT对等协议层次上的连接。我把它叫做“对等连接”。A 发送了一个消息,同时接收了一个消息,所以这个握手过程是两次“握手”。
同样,对X 来说,因为连接是它主动发起的,所以它在发送完“握手”消息之后,就等待A的“握手”消息,如果收到,那么它也认为“对等连接”建立了。
一旦“对等连接”建立之后,双方就可以通过这个连接传递消息了。
这样,原来我所疑惑的一个问题也就有了答案。就是:如果 X 需要从 A 这里下载数据,那么它会同 A 建立一个连接。假如 A 又希望从 X 那里下载数据,它需不需要重新向 X 发起另外一个连接了?答案显然是不用,它会利用已有的一条连接。也就是说,不管是X主动向A发起的连接,还是 A 主动向 X发起的连接,一旦建立之后,它们的效果是一样的。这个同我们平时做 C/S结构的网络开发是有区别的。
我们可以看到在 E-Connection的初始化函数中,会主动连接的另一方发送“握手”消息,在 E-Connection::data_came_in() 中,会首先对对方的“握手”消息进行处理。这正是我上面所描述的情形。
在 E-Connection::read_peer_id() 中,是对“握手”消息的最后一项 peer id进行处理,一旦正确无误,那么就认为“对等连接”完成,
self.encoder.connecter.connection_made(self)
在 Connecter::connection_made() 函数中,就创建了管理“对等连接”的 C-Connectinon类。所以,更高一层的“对等连接”是由 C-Connection 来管理的。
3.3、Connecter:连接器,管理下载、上传、阻塞、片断选择、读写磁盘等等。
下载和上传不是孤立的,它们之间相互影响。下载需要有片断选择算法,上传的时候要考虑阻塞,片断下载之后,要写到磁盘上。上传的时候,也需要从磁盘读取。这些任务,是由 Connecter 来统一调度的。
类层次结构,我用缩进来表示一种包含关系。
Encoder:
E-Connection
C-Connection
Upload
SingleDownloader
Connecter
Choker:负责阻塞的管理
Downloader:
SingleDownloader
Picker:片断选择策略
StorageWrapper:
Storage 类
由于 Storage 类比较简单,我直接在源码基础上进行注释。掌握Storage,为进一步分析 StorageWrapper 类打下基础。
几点说明:
1、 Storage 类封装了对磁盘文件的读和写的操作。
2、 BT既支持单个文件的下载,也支持多个文件,包括可以有子目录。但是它并不是以文件为单位进行下载和上传的,而是以“文件片断”为单位。这可以在BT协议规范以及另一篇讲BT技术的文章中看到。所以,对于多个文件的情况,它也是当作一个拼接起来的“大文件”来处理的。例如,有文件 aaa和bbb,大小分别是 400和1000,那么它看作一个大小为 1400 的大文件,并以此来进行片断划分。
3、 文件在下载过程中,同时提供上传,所以是以读写方式打开的,wb+和rb+都指的读写方式。在下载完毕之后,改为只读方式。
4、 由于下载可能中断,所以在 Storage 初始化的时候,磁盘上可能已经存在文件的部分数据,必须检查一下文件的大小。为了便于描述,我们把完整文件的大小称为“实际长度”,把文件当前的大小成为“当前长度”。
class Storage:
# files 是一个二元组的列表(list),二元组包含了文件名称和长度,例如:
[(“aaa”, 100), (“bbb”, 200)]
def __init__(self, files, open, exists, getsize):
self.ranges = []
# 注意,这里是 0l,后面的l表示类型是长整形,而不是 01。
total = 0l
so_far = 0l
for file, length in files:
if length != 0:
# ranges 是一个三元组列表,三元组的格式是: 在“整个”文件的起始位置、结束位置、文件名。BT在处理多个文件的时候,是把它们看作一个拼接起来的大文件。
self.ranges.append((total, total + length, file))
total += length
# so_far 是实际存在的文件的总长度,好像没有起作用
if exists(file):
l = getsize(file)
if l > length:
l = length
so_far += l
# 如果文件长度为0, 则创建一个空文件
elif not exists(file):
open(file, 'wb').close()
# begins 是一个列表,用来保存每个文件的起始位置
self.begins = [i[0] for i in self.ranges]
self.total_length = total
self.handles = {}
self.whandles = {}
self.tops = {}
太极 2005-10-25 23:19
StorageWrapper 类
StorageWrapper 的作用:把文件片断进一步切割为子片断,并且为这些子片断发送 request消息。在获得子片断后,将数据写入磁盘。
请结合 Storage 类的分析来看。
几点说明:
1、 为了获取传输性能,BT把文件片断切割为多个子片断。
2、 BT为获取一个子片断,需要向拥有该子片断的peer发送request消息(关于 request消息,参见《BT协议规范》)。
3、 例如一个256k大小的片断,索引号是10,被划分为16个16k大小的子片断。那么需要为这16个子片断分别产生一个 request 消息。这些request消息在发出之前,以list的形式保存在 inactive_requests 这个list中。例如对这个片断,就保存在inactive_requests下标为 10(片断的索引号)的地方,值是如下的 list:[(0,16k),(16k, 16k), (32k, 16k), (48k, 16k), (64k, 16k), (80k, 16k), (96k, 16k), (112k, 16k), (128k, 16k), (144k, 16k), (160k, 16k), (176k, 16k), (192k, 16k), (208k, 16k), (224k, 16k), (240k, 16k)]。这个处理过程在 _make_inactive() 函数中。因为这些request还没有发送出去,所以叫做 inactive request(未激活的请求)。如果一个 request 发送出去了,那么叫做 active request。为每个片断已经发送出去的request个数记录在 numactive 中。如果收到一个子片断,那么 active request 个数就要减1。amount_inactive 记录了尚没有发出request的子片断总的大小。
4、 每当获得一个子片段,都要写入磁盘。如果子片断所属的片断在磁盘上还没有分配空间,那么首先需要为整个片断分配空间。如何为片断分配空间?这正是 StorageWrapper 类中最难理解的一部分代码。这个“空间分配算法”说起来很简单,但是在没有任何注释的情况下去看代码,耗费了我好几天的时间。具体的算法分析,请看 _piece_came_in() 的注释。
class StorageWrapper:
def __init__(self, storage, request_size, hashes,
piece_size, finished, failed,
statusfunc = dummy_status, flag = Event(), check_hashes = True,
data_flunked = dummy_data_flunked):
self.storage = storage # Storage 对象
self.request_size = request_size #子片断大小
self.hashes = hashes # 文件片断摘要信息
self.piece_size = piece_size # 片断大小
self.data_flunked = data_flunked # 一个函数,用来检查片断的完整性
self.total_length = storage.get_total_length() # 文件总大小
self.amount_left = self.total_length # 未下载完的文件大小
# 文件总大小的有效性检查
# 因为最后一个片断长度可能小于 piece_size
if self.total_length <= piece_size * (len(hashes) - 1):
raise ValueError, 'bad data from tracker - total too small'
if self.total_length > piece_size * len(hashes):
raise ValueError, 'bad data from tracker - total too big'
# 两个事件,分布在下载完成和下载失败的时候设置
self.finished = finished
self.failed = failed
这几个变量的作用在前面已经介绍过了。
self.numactive = [0] * len(hashes)
inactive_request
inactive_requests 的值全部被初始化为1,这表示每个片断都需要发送 request。后面在对磁盘文件检查之后,那些已经获得的片断,在 inactive_requests中对应的是 None,表示不需要再为这些片断发送 request了。
self.inactive_requests = [1] * len(hashes)
self.amount_inactive = self.total_length
# 是否进入 EndGame 模式?关于 endgame 模式,在《Incentives Build Robustness in BitTorrent 》的“片断选择算法”中有介绍。后面可以看到,在为最后一个“子片断”产生请求后,进入 endgame 模式。
self.endgame = False
self.have = Bitfield(len(hashes))
# 该片是否检查了完整性
self.waschecked = [check_hashes] * len(hashes)
这两个变量用于“空间分配算法”
self.places = { }
self.holes = [ ]
if len(hashes) == 0:
finished()
return
targets = {}
total = len(hashes)
# 检查每一个片断,,,
for i in xrange(len(hashes)):
# 如果磁盘上,还没有完全为这个片断分配空间,那么这个片断需要被下载,在 targets 字典中添加一项(如果已经存在,就不用添加了),它的关键字(key)是该片断的摘要值,它的值(value)是一个列表, 这个片断的索引号被添加到这个列表中。
这里一度让我非常迷惑,因为一直以为不同的文件片断肯定具有不同的摘要值。后来才想明白了,那就是:两个不同的文件片断,可能拥有相同的摘要值。不是么?只要这两个片断的内容是一样的。
这一点,对后面的分析非常重要。
if not self._waspre(i):
targets.setdefault(hashes[i], []).append(i)
total -= 1
numchecked = 0.0
if total and check_hashes:
statusfunc({"activity" : 'checking existing file', "fractionDone" : 0})
# 这是一个内嵌在函数中的函数。在 c++ 中,可以有内部类,不过好像没有内部函数的说法。这个函数只能在 __init__() 内部使用。
这个函数在一个片段被确认获得后调用
# piece: 片断的索引号
# pos: 这个片断在磁盘上存储的位置
例如,片断5可能存储在片断2的位置上。请参看后面的“空间分配算法”
def markgot(piece, pos, self = self, check_hashes = check_hashes):
self.places[piece] = pos
self.have[piece] = True
self.amount_left -= self._piecelen(piece)
self.amount_inactive -= self._piecelen(piece)
不用再为这个片断发送 request消息了
self.inactive_requests[piece] = None
self.waschecked[piece] = check_hashes
lastlen = self._piecelen(len(hashes) - 1) # 最后一个片断的长度
# 对每一个片断
for i in xrange(len(hashes)):
#如果磁盘上,还没有完全为这个片断分配空间,那么在 holes 中添加该片断的索引号。
if not self._waspre(i):
self.holes.append(i)
# 否则,也就是空间已经分配。但是还是不能保证这个片断已经完全获得了,正如分析 Storage 时提到的那样,可能存在“空洞”
# 如果不需要进行有效性检查,那么简单调用 markgot() 表示已经获得了该片断。这显然是一种不负责任的做法。
elif not check_hashes:
markgot(i, i)
# 如果需要进行有效性检查
else:
sha是python内置的模块,它封装了 SHA-1摘要算法。SHA-1摘要算法对一段任意长的数据进行计算,得出一个160bit (也就是20个字节)长的消息摘要。在 torrent 文件中,保存了每个片断的消息摘要。接收方在收到一个文件片断之后,再计算一次消息摘要,然后跟 torrent 文件中对应的值进行比较,如果结果不一致,那么说明数据在传输过程中发生了变化,这样的数据应该被丢弃。
这里,首先,根据片断i的起始位置开始,lastlen长的一段数据构造一个 sha 对象。
sh = sha(self.storage.read(piece_size * i, lastlen))
计算这段数据的消息摘要
sp = sh.digest()
然后,更新 sh 这个 sha 对象,注意,是根据片断 i 剩下的数据来更新的。关于 sha::update() 的功能,请看 python的帮助。如果有两段数据 a 和 b,那么
sh = sha(a)
sh.update(b),等效于
sh = sha(a+b)
所以,下面这个表达式等于
sh.update(self.storage.read(piece_size*i, self._piecelen(i)))
sh.update(self.storage.read(piece_size * i + lastlen, self._piecelen(i) - lastlen))
所以,这次计算出来的就是片断i 的摘要
(原来的困惑:为什么不直接计算 i 的摘要,要这么绕一下了?后来分析清楚“空间分配算法”之后,这后面一段代码也就没有什么问题了。)
s = sh.digest()
如果计算出来的摘要和 hashes[i] 一致(后者是从 torrent 文件中获得的),那么,这个片断有效且已经存在于磁盘上。
if s == hashes[i]:
markgot(i, i)
elif targets.get(s)
and self._piecelen(i) == self._piecelen(targets[s][-1]):
markgot(targets[s].pop(), i)
elif not self.have[len(hashes) - 1]
and sp == hashes[-1]
and (i == len(hashes) - 1 or not self._waspre(len(hashes) - 1)):
markgot(len(hashes) - 1, i)
else:
self.places[i] = i
if flag.isSet():
return
numchecked += 1
statusfunc({'fractionDone': 1 - float(self.amount_left) / self.total_length})
# 如果所有片断都下载完了,那么结束。
if self.amount_left == 0:
finished()
太极 2005-10-25 23:19
# 检查某个片断,是否已经在磁盘上分配了空间,调用的是 Storage:: was_preallocated()
def _waspre(self, piece):
return self.storage.was_preallocated(piece * self.piece_size,
self._piecelen(piece))
# 获取指定片断的长度,只有最后一个片断大小可能小于 piece_size
def _piecelen(self, piece):
if piece < len(self.hashes) - 1:
return self.piece_size
else:
return self.total_length - piece * self.piece_size
# 返回剩余文件的大小
def get_amount_left(self):
return self.amount_left
# 判断是否已经获得了一些文件片断
def do_I_have_anything(self):
return self.amount_left < self.total_length
# 将指定片断切割为“子片断”
def _make_inactive(self, index):
# 先获取该片断的长度
length = min(self.piece_size, self.total_length - self.piece_size * index)
l = []
x = 0
# 为了获得更好的传输性能,BT把每个文件片断又分为更小的“子片断”,我们可以在 download.py 文件中 default 变量中,找到“子片断”大小的定义:
'download_slice_size', 2 ** 14, "How many bytes to query for per request."
这里定义的“子片断”大小是16k。
下面这个循环,就是将一个片断进一步切割为“子片断”的过程。
while x + self.request_size < length:
l.append((x, self.request_size))
x += self.request_size
l.append((x, length - x))
# 将 l 保存到 inactive_requests 这个列表中
self.inactive_requests[index] = l
# 是否处于 endgame 模式,关于endgame模式,参加《Incentives Build Robustness in BitTorrent》
def is_endgame(self):
return self.endgame
def get_have_list(self):
return self.have.tostring()
def do_I_have(self, index):
return self.have[index]
# 判断指定的片断,是否还有 request没有发出?如果有,那么返回 true,否则返回 false。
def do_I_have_requests(self, index):
return not not self.inactive_requests[index]
为指定片断创建一个 request 消息,返回的是一个二元组,例如(32k, 16k),表示“子片断”的起始位置是 32k ,大小是 16k。
def new_request(self, index):
# returns (begin, length)
# 如果还没有为该片断创建 request。,那么调用 _make_inactive() 创建 request列表。(inactive_requests[index] 初始化的值是1)
if self.inactive_requests[index] == 1:
self._make_inactive(index)
# numactive[index] 记录了已经为该片断发出了多少个 request。
self.numactive[index] += 1
rs = self.inactive_requests[index]
# 从 inactive_request 中移出最小的那个request(也就是起始位置最小)。
r = min(rs)
rs.remove(r)
# amount_inactive 记录了尚没有发出request的子片断总的大小。
self.amount_inactive -= r[1]
# 如果这是最后一个“子片断”,那么进入 endgame 模式
if self.amount_inactive == 0:
self.endgame = T.rue
# 返回这个 request
return r
def piece_came_in(self, index, begin, piece):
try:
return self._piece_came_in(index, begin, piece)
except IOError, e:
self.failed('IO Error ' + str(e))
return True
如果获得了某个“子片断”,那么调用这个函数。
index:“子片断”所在片断的索引号,
begin:“子片断”在片断中的起始位置,
piece:实际数据
def _piece_came_in(self, index, begin, piece):
# 如果之前没有获得过该片断中任何“子片断”,那么首先需要在磁盘上为整个片断分配空间。
空间分配的算法如下:
假设一共是6个片断,现在已经为 0、1、4三个片断分配了空间,那么
holes:[2, 3, 5]
places:{0:0, 1:1, 4:4}
现在要为片断5分配空间,思路是把片断5的空间暂时先分配在片断2应该在的空间上。这样分配以后,
holes:[3, 5]
places: {0:0, 1:1, 4:4, 5:2}
假设下一步为片断2分配空间,因为2的空间已经被5占用,所以把5的数据转移到3上,2才可以使用自己的空间。这样分配之后,
holes:[5]
places:{0:0, 1:1, 2:2, 4:4, 5:3}
最后,为3分配空间,因为3的空间被5占用,所以把5的数据转移到5自己的空间上,3就可以使用自己的空间了。这样分配之后,
holes:[]
places:{0:0, 1:1, 2:2, 3:3, 4:4, 5:5}
下面这段比较晦涩的代码,实现的就是这种空间分配算法。
if not self.places.has_key(index):
n = self.holes.pop(0)
if self.places.has_key(n):
oldpos = self.places[n]
old = self.storage.read(self.piece_size * oldpos, self._piecelen(n))
if self.have[n] and sha(old).digest() != self.hashes[n]:
self.failed('data corrupted on disk - maybe you have two copies running?')
return True
self.storage.write(self.piece_size * n, old)
self.places[n] = n
if index == oldpos or index in self.holes:
self.places[index] = oldpos
else:
for p, v in self.places.items():
if v == index:
break
self.places[index] = index
self.places[p] = oldpos
old = self.storage.read(self.piece_size * index, self.piece_size)
self.storage.write(self.piece_size * oldpos, old)
elif index in self.holes or index == n:
if not self._waspre(n):
self.storage.write(self.piece_size * n,
self._piecelen(n) * chr(0xFF))
self.places[index] = n
else:
for p, v in self.places.items():
if v == index:
break
self.places[index] = index
self.places[p] = n
old = self.storage.read(self.piece_size * index, self._piecelen(n))
self.storage.write(self.piece_size * n, old)
# 调用 Stoarge::write() 将这个子片断写入磁盘,注意是写到 places[index] 所在的空间上。
self.storage.write(self.places[index] * self.piece_size + begin, piece)
# 既然获得了一个子片断,那么发出的request个数显然要减少一个。
self.numactive[index] -= 1
# 如果既没有尚未发出的 request,而且也没有已发出的request(每当获得一个子片断,numactive[index]减少1,numactive[index]为0,说明所有发出的 request都已经接收到了响应的数据),那么显然整个片断已经全部获得了。
if not self.inactive_requests[index] and not self.numactive[index]:
检查整个片断的有效性,如果通过检查
if sha(self.storage.read(self.piece_size * self.places[index],
self._piecelen(index))).digest() == self.hashes[index]:
#“我”已经拥有了这个片断
self.have[index] = True
self.inactive_requests[index] = None
# 也检查过了有效性
self.waschecked[index] = True
self.amount_left -= self._piecelen(index)
if self.amount_left == 0:
self.finished()
如果没有通过有效性检查
else:
self.data_flunked(self._piecelen(index))
得丢弃这个片断
self.inactive_requests[index] = 1
self.amount_inactive += self._piecelen(index)
return False
return True
# 如果向某个 peer 发送的获取“子片断”的请求丢失了,那么调用此函数
def request_lost(self, index, begin, length):
self.inactive_requests[index].append((begin, length))
self.amount_inactive += length
self.numactive[index] -= 1
def get_piece(self, index, begin, length):
try:
return self._get_piece(index, begin, length)
except IOError, e:
self.failed('IO Error ' + str(e))
return None
def _get_piece(self, index, begin, length):
if not self.have[index]:
return None
if not self.waschecked[index]:
# 检查片断的 hash值,如果错误,返回 None
if sha(self.storage.read(self.piece_size * self.places[index],
self._piecelen(index))).digest() != self.hashes[index]:
self.failed('told file complete on start-up, but piece failed hash check')
return None
# 通过 hash 检查
self.waschecked[index] = True
# 检查一下“子片断”长度是否越界
if begin + length > self._piecelen(index):
return None
# 调用 Storage::read() ,将该“子片断”数据从磁盘上读出来,返回值就是这段数据。
return self.storage.read(self.piece_size * self.places[index] + begin, length)
太极 2005-10-25 23:20
PiecePicker 类
PiecePicker 用于实现“片断选择算法”,片断选择算法在《Incentives Build Robustness in BitTorrent》一文中有介绍,我把相关内容列出来。
BT的片断选择算法,综合下面几种策略。
? 严格的优先级
片断选择的第一个策略是:一旦请求了某个片断的子片断,那么该片断剩下的子片断优先被请求。这样,可以尽可能快的获得一个完整的片断
? 最少的优先
每个peer都优先选择整个系统中最少的那些片断去下载,而那些在系统中相对较多的片断,放在后面下载,这样,整个系统就趋向于一种更优的状态。如果不用这种算法,大家都去下载最多的那些片断,那么这些片断就会在系统中分布的越来越多,而那些在系统中相对较少的片断仍然很少,最后,某些 peer 就不再拥有其它 peer 感兴趣的片断了,那么系统的参与者越来越少,整个系统的性能就下降。
? 随机的第一个片断
“最少优先”的一个例外是在下载刚开始的时候。此时,下载者没有任何片断可供上传,所以,需要尽快的获取一个完整的片断。而最少的片断,通常只有某一个peer拥有,所以,它可能比多个peers都拥有的那些片断下载的要慢。因此,第一个片断是随机选择的,直到第一个片断下载完成,才切换到“最少优先”的策略。
? 最后阶段模式
有时候,从一个速率很慢的peer那里请求一个片断。在下载的中间阶段,这不是什么问题,但是却可能潜在的延迟下载的完成。为了防止这种情况,在最后阶段,peer向它的所有的peers们都发送某片断的子片断的请求,一旦某些子片断到了,那么就会向其它peer发送cancel 消息,取消对这些子片断的请求,以避免带宽的浪费。实际上,用这种方法并没有浪费多少带宽,而文件的结束部分也一直下载的非常快。
下面是我在分析之前思考的两个问题:
问题1:如何实现“严格优先级”
答案:记录每个已经开始下载的片断。优先选择它们。
问题2:如何实现“最少优先”算法?也就是你如何去统计某个片断在系统中最少?
答案:通过 have 消息(have消息请参看BT对等协议)来计算。在下载过程中,会不停的收到其它 peer 发来的 have 消息,每个have消息都表明对方拥有了某个片断。那么,为每个片断维护一个计数器,每收到一个have消息,相应的计数器加1。在选择片断的时候,计数器最小的某个片断被选中。
在实际代码中,可以看到,变量started和seedstarted 是用来实现“严格优先级”的,它们记录了那些已经开始下载的片断。而变量 numinterests用来实现“最少优先”算法。
PiecePicker类的核心函数是 next() ,它综合多种策略,来计算出下一个应该被选择进行下载的片断。
PiecePicker 类的难点是三个变量 numinterests、interests、pos_in_interests的作用。因为没有任何注释,我思考了很久才明白它们的作用,特别是 pos_in_interests。所以,在分析代码之前,我结合例子来讲解这三个变量的作用。
假设有三个片断:
numinterests:
类型是list,每个片断对应一项,记录了每个片断收到的 have 消息的个数。初始化的时候,numinterests = [0, 0, 0]。
interests:
类型是 list,它的每一项又是一个 list。例如在这个例子中,初始化的时候,interests = [ [0, 1, 2] ],显然,它只有一项。
interests 的作用是什么了?嗯,有点难以表达。大概是这样的吧:所有未完成下载的片断的索引号都保存在 interests,进行片断选择的时候,要用到 interests。我们看两个例子:
1、interests = [ [0, 1], [2]]
2、interests = [ [1], [0, 2]]
在第一个例子中,片断0、1位于 interests 的第0项,片断2位于 interests的第1项。
在第二个例子中,片断1位于位于 interests 的第0项,片断0、2位于 interests的第1项。
无论哪种情况,都表明0、1、2三个片断都还没有下载完成。
那么,某个片断到底应该处于 interests 中什么位置了?答案是根据该片断收到的 have 消息个数,也就是 numinterests 的值。例如,第一个例子中,说明片断0、1收到的 have 个数都是0,所以处于 interests的第0项,而片断2收到的 have 个数是1,所以处于第1项。而初始化的时候,interests =[ [0, 1, 2]],说明片断0、1、2收到的 have个数都是0。
奇怪,为什么要这样设计了?答案就是“最少优先”的片断选择策略。我们看到,拥有越多 have 的片断,在 interests 中,位置越靠后。在进行片断选择的时候,可能会从 interests中选一个片断出来(为什么说可能了,一会可以看到,会优先采用其它策略,如果其它策略不能选一个出来,才会试图从 interests 中选)。这样,按照索引从小到大的顺序,拥有 have 越少的片断,越可能被选到。我们考虑这样一个例子:
interests = [[2, 3], [5, 0, 1], [], [], [4]]
片断2、3拥有0个 have,不能被选择。(至少要有一个 have 才被考虑)。
片断0、1、5都有1个have,所以会优先从它们中选择一个。
片断4拥有4个 have,所以最后被考虑。
pos_in_interests:
如上所述,拥有相同 have 个数的片断,处于 interests 中的同一位置。例如上面这个例子,0、1、5都处于第1个位置。那么它们又根据什么原则进行先后排列了?答案是随机排列。所以,既可能是0、1、5,也可能是 1、5、0,或者其它。为了记录某个片断的确切位置,就需要用到 pos_in_interests了。它也是一个 list,每个片断拥有一项,根据上面这个例子,应该是:
pos_in_interests = [1, 2, 0, 1, 0, 0]
看出什么来没?呵呵
它的意思是,
片断0是 [5, 0, 1] 的第1个
片断1是 [5, 0, 1] 的第2个
片断2是 [2, 3] 的第0个
片断3是 [2, 3] 的第1个
片断4是 [4] 的第0个
片断5是 [5, 0, 1] 的第0个
就是这样喽,不知道我有没有说清楚。
# 封装“片断选择算法”
class PiecePicker:
def __init__(self, numpieces, rarest_first_cutoff = 1):
self.rarest_first_cutoff = rarest_first_cutoff
self.numpieces = numpieces # 片断的个数
self.interests = [range(numpieces)]
self.pos_in_interests = range(numpieces)
self.numinterests = [0] * numpieces
self.started = []
self.seedstarted = []
self.numgot = 0 # 获得了几个片断?
self.scrambled = range(numpieces)
shuffle(self.scrambled)
收到一个 have 消息的处理
def got_have(self, piece):
if self.numinterests[piece] is None:
return
numint = self.numinterests[piece]
if numint == len(self.interests) - 1:
self.interests.append([])
numinterests 对应的值要加1。
self.numinterests[piece] += 1
调整 interests 和 pos_in_interests
self._shift_over(piece, self.interests[numint], self.interests[numint + 1])
丢失一个 have 消息?????
def lost_have(self, piece):
if self.numinterests[piece] is None:
return
numint = self.numinterests[piece]
self.numinterests[piece] -= 1
self._shift_over(piece, self.interests[numint], self.interests[numint - 1])
调整 interests 和 pos_in_interests,前面已经分析过。
def _shift_over(self, piece, l1, l2):
p = self.pos_in_interests[piece]
l1[p] = l1[-1]
self.pos_in_interests[l1[-1]] = p
del l1[-1]
newp = randrange(len(l2) + 1)
if newp == len(l2):
self.pos_in_interests[piece] = len(l2)
l2.append(piece)
else:
old = l2[newp]
self.pos_in_interests[old] = len(l2)
l2.append(old)
l2[newp] = piece
self.pos_in_interests[piece] = newp
为某个片断发送过 requested 消息,用于“严格优先级”策略
def requested(self, piece, seed = False):
if piece not in self.started:
把片断索引号添加到 started中
self.started.append(piece)
if seed and piece not in self.seedstarted:
self.seedstarted.append(piece)
# 如果某个片断已经得到,那么调用这个函数
def complete(self, piece):
assert self.numinterests[piece] is not None
self.numgot += 1
l = self.interests[self.numinterests[piece]]
p = self.pos_in_interests[piece]
l[p] = l[-1]
self.pos_in_interests[l[-1]] = p
del l[-1]
self.numinterests[piece] = None
try:
self.started.remove(piece)
self.seedstarted.remove(piece)
except ValueError:
pass
计算下一个被选择的片断
def next(self, havefunc, seed = False):
bests = None
bestnum = 2 ** 30
首先根据“严格优先级”策略,从已经开始下载的片断中选择。
if seed:
s = self.seedstarted
else:
s = self.started
“严格优先级”策略是和“最少优先”策略结合起来使用的。也就是说,在满足“严格优先”的片断中,再去选择一个满足“最少优先”的片断。注意,“最少优先”还有一个限制,就是如果一个片断如果从来没有收到过 have 消息(也就是计数是0),也不能被选择。这个判断由下面的 havefunc(i) 完成。
for i in s:
if havefunc(i):
if self.numinterests[i] < bestnum:
bests = [i]
bestnum = self.numinterests[i]
elif self.numinterests[i] == bestnum:
bests.append(i)
经过“严格优先级”和“最少优先”策略之后,可能返回多个候选片断,从中随机选择一个,返回。
if bests:
从 bests 随机返回一个值
return choice(bests)
如果以上步骤,没有选择出一个片断。那么随机选择一个。这大概就是“随机的第一个片断”的策略吧。因为 rarest_first_cutoff 默认是设置为1的。也就是说,在一个片断都没有获得的情况下,才会选择这种策略。如果 rarest_first_cutoff 设置为10,那么这个策略就可以叫做“随机的前10个片断”,呵呵。
if self.numgot < self.rarest_first_cutoff:
for i in self.scrambled:
if havefunc(i):
return i
return None
太极 2005-10-25 23:21
如果不能采用“随机的第一个片断”测率,那么, interests 终于派上用场了。到这里,终于明白 interests 为什么要用 numinterests 对应的值来进行定位了。还是“最少优先”的思想,因为那些收到 have 消息少的片断,在interests 中位置比较靠前,所以会优先被选择到。
for i in xrange(1, min(bestnum, len(self.interests))):
for j in self.interests[i]:
if havefunc(j):
return j
如果还选不出来,只好返回 None了。
return None
def am_I_complete(self):
return self.numgot == self.numpieces
谁来补充?
def bump(self, piece):
l = self.interests[self.numinterests[piece]]
pos = self.pos_in_interests[piece]
del l[pos]
l.append(piece)
for i in range(pos,len(l)):
self.pos_in_interests[l[i]] = i
Encoder 类和 Connection 类
Encoder 是一种 Handler 类(关于 Handler类,请参看前面的分析文章)。它在 download.py 中被初始化。它与 Connection类一起,完成“BT对等连接”的建立,以及“BT对等协议”的分析。
为了有助于理解,我添加了一些用圆圈括起来的序号,建议你按照这个顺序去阅读。
class Connection:
②def __init__(self, Encoder, connection, id, is_local):
self.encoder = Encoder
self.connection = connection
如果是本地发起连接,那么 id 是对方的 id,否则 id 为 None
self.id = id
如果连接是由本地发起的,那么 is_local 为 True,否则为 False
self.locally_initiated = is_local
self.complete = False
self.closed = False
self.buffer = StringIO()
self.next_len = 1
self.next_func = self.read_header_len
如果连接是由本地主动发起建立的,那么需要向对方发送一个握手消息。(如果不是由本地主动发起的,那么就是被动建立的,那么不能在这里发送握手消息,而必须在分析完对方的握手消息之后,再去回应一个握手西消息,请看read_download_id() 中的处理。
if self.locally_initiated:
connection.write(chr(len(protocol_name)) + protocol_name +
(chr(0) * 8) + self.encoder.download_id)
if self.id is not None:
connection.write(self.encoder.my_id)
def get_ip(self):
return self.connection.get_ip()
def get_id(self):
return self.id
def is_locally_initiated(self):
return self.locally_initiated
def is_flushed(self):
return self.connection.is_flushed()
⑦def read_header_len(self, s):
if ord(s) != len(protocol_name):
return None
return len(protocol_name), self.read_header
def read_header(self, s):
if s != protocol_name:
return None
return 8, self.read_reserved
def read_reserved(self, s):
return 20, self.read_download_id
def read_download_id(self, s):
if s != self.encoder.download_id:
return None
这一步很重要, 如果连接是由对方发起的,那么,给对方发送一个握手消息。为什么不在读完了 peer id 之后才发送这个消息了?这是因为 peer id 是可选的,所以只要分析完 download id 之后,就要立即发送握手消息。
if not self.locally_initiated:
self.connection.write(chr(len(protocol_name)) +
protocol_name +
(chr(0) * 8) +
self.encoder.download_id + self.encoder.my_id)
return 20, self.read_peer_id
def read_peer_id(self, s):
if not self.id:
如果 peer id 是自己,那么出错了
if s == self.encoder.my_id:
return None
for v in self.encoder.connections.values():
如果已经跟该 peer 建立了连接了,那么也出错了
if s and v.id == s:
return None
self.id = s
if self.locally_initiated:
self.connection.write(self.encoder.my_id)
else:
self.encoder.everinc = True
else:
如果 peer id 和 xxx 不符,那么出错了。
if s != self.id:
return None
“BT对等连接”的握手过程正式宣告完成,此后,双方就可以通过这个连接互相发送消息了。
self.complete = True
太极 2005-10-25 23:21
调用Connecter::connection_made(),这个函数的意义,我们到分析 Connecter 类的时候,再记得分析。
self.encoder.connecter.connection_made(self)
下面进入 BT 消息的处理过程。
return 4, self.read_len
def read_len(self, s):
l = toint(s)
if l > self.encoder.max_len:
return None
return l, self.read_message
消息处理,交给了 Connecter::got_message(),所以下一篇我们要分析 Connecter 类。
def read_message(self, s):
if s != '':
self.encoder.connecter.got_message(self, s)
return 4, self.read_len
def read_dead(self, s):
return None
def close(self):
if not self.closed:
self.connection.close()
self.sever()
def sever(self):
self.closed = True
del self.encoder.connections[self.connection]
if self.complete:
self.encoder.connecter.connection_lost(self)
def send_message(self, message):
self.connection.write(tobinary(len(message)) + message)
⑤在 Encoder::data_came_in() 中调用下面这个函数,表示某个连接上有数据可读。如果有数据可读,那么我们就按照 BT 对等协议的规范来进行分析。。。
def data_came_in(self, s):
进入协议分析循环。。。
while True:
if self.closed:
return
self.next_len表示按照BT对等协议规范,下一段要分析的数据的长度
self.buffer.tell() 表示缓冲区中剩下数据的长度
那么 i 就表示:为了完成接下来的协议分析,还需要多少数据?
i = self.next_len - self.buffer.tell()
如果 i 大于所读到的数据的长度,那表示数据还没有读够,无法继续协议分析,需要等读到足够多的数据才能继续,所以只能退出。
if i > len(s):
self.buffer.write(s)
return
否则表示这次读到的数据已经足够完成一步协议分析。
只取满足这一步协议分析的数据放入 buffer 中(因为 buffer中可能还有上一步协议分析后留下的一些数据,要加在一起),剩下的数据保留在 s 中。
self.buffer.write(s[:i])
s = s[i:]
从 buffer 中取出数据,这些数据就是这一步协议分析所需要的数据。然后把 buffer 清空。
m = self.buffer.getvalue()
self.buffer.reset()
self.buffer.truncate()
next_func 就是用于这一步协议分析的函数。
返回的 x 是一个二元组,包含了下一步协议分析的数据长度和协议分析函数。这样,就形成了一个协议分析循环。
try:
x = self.next_func(m)
except:
self.next_len, self.next_func = 1, self.read_dead
raise
if x is None:
self.close()
return
从 x 中分解出 next_len和 next_func。
self.next_len, self.next_func = x
⑥那么BT对等协议分析的第一步是什么了?
请看初始化函数:
self.next_len = 1
self.next_func = self.read_header_len
显然,第一步协议分析是由 read_header_len() 来完成的。
在BT源码中,有多处采用了这种协议分析的处理方式。
class Encoder:
def __init__(self, connecter, raw_server, my_id, max_len,
schedulefunc, keepalive_delay, download_id,
max_initiate = 40):
self.raw_server = raw_server
self.connecter = connecter
self.my_id = my_id
self.max_len = max_len
self.schedulefunc = schedulefunc
self.keepalive_delay = keepalive_delay
self.download_id = download_id
最大发起的连接数
self.max_initiate = max_initiate
self.everinc = False
self.connections = {}
self.spares = []
schedulefunc(self.send_keepalives, keepalive_delay)
为了保持连接不因为超时而被关闭,所以可能需要随机的发送一些空消息,它的目的纯粹是为了保证连接的“活力”
def send_keepalives(self):
self.schedulefunc(self.send_keepalives, self.keepalive_delay)
for c in self.connections.values():
if c.complete:
c.send_message('')
③主动向对方发起一个连接,这个函数什么时候调用?
请看 download.py 中 Rerequester 类的初始化函数,其中传递的一个参数是 encoder.start_connection。
再看 Rerequester.py 中,Rerequester::postrequest() 的最后,
for x in peers:
self.connect((x[0], x[1]), x[2])
这里调用的 connect() 就是初始化的时候传递进来的 encoder.start_connection,也就是下面这个函数了。
也就是说,当客户端从 tracker 服务器那里获取了 peers 列表之后,就逐一向这些 peers 主动发起连接。
def start_connection(self, dns, id):
if id:
跟自己不用建立连接。
if id == self.my_id:
return
如果已经与对方建立起连接,也不再建立连接
for v in self.connections.values():
if v.id == id:
return
如果当前连接数,已经超过设定的“最大发起连接数”,那么就暂时不建立连接。
if len(self.connections) >= self.max_initiate:
如果空闲连接数还小于 “最大发起连接数”,那么把对方的 ip 先放到spares中,等以后网络稍微空闲一点的时候,再从 spares 中取出来,实际去建立连接。
if len(self.spares) < self.max_initiate and dns not in self.spares:
self.spares.append(dns)
return
try:
调用 RawServer::start_connection 与对方建立TCP连接
c = self.raw_server.start_connection(dns)
创建 Connection 对象,加入到 connections 字典中,注意,最后一个参数是 True,表示是这个连接是由本地主动发起的。这样,在 Connection 的初始化函数中,会与对方进行 BT 对等协议的握手。
self.connections[c] = Connection(self, c, id, True)
except socketerror:
pass
这个内部函数好像没有用到
def _start_connection(self, dns, id):
def foo(self=self, dns=dns, id=id):
self.start_connection(dns, id)
self.schedulefunc(foo, 0)
def got_id(self, connection):
for v in self.connections.values():
if connection is not v and connection.id == v.id:
connection.close()
return
self.connecter.connection_made(connection)
def ever_got_incoming(self):
return self.everinc
①在 RawServer 中,当从外部发起的一个TCP成功建立后,调用此函数。
这里传递进来的参数 connection 是 SingleSocket 类型
def external_connection_made(self, connection):
创建一个 Connection 对象,加入到 connections 字典中。
self.connections[connection] = Connection(self, connection, None, False)
def connection_flushed(self, connection):
c = self.connections[connection]
if c.complete:
self.connecter.connection_flushed(c)
关闭连接的时候调用此函数
def connection_lost(self, connection):
self.connections[connection].sever()
关闭一个连接之后,连接数量可能就没有达到“最大连接数”,所以如果 spares 中有一些等待建立的 ip ,现在可以取出来,主动向对方发起连接。
while len(self.connections) < self.max_initiate and self.spares:
self.start_connection(self.spares.pop(), None)
④某个连接上(无论该连接上主动建立还是被动建立的)有数据可读的时候,调用此函数。在 RawServer 中被调用。转而去调 Connection::data_came_in()。
def data_came_in(self, connection, data):
self.connections[connection].data_came_in(data)
太极 2005-10-25 23:22
Tracker服务器源码分析之一:总述
tracker服务器是BT下载中必须的角色。一个BT client 在下载开始以及下载进行的过程中,要不停的与 tracker 服务器进行通信,以报告自己的信息,并获取其它下载client的信息。这种通信是通过 HTTP 协议进行的,又被称为 tracker HTTP 协议,它的过程是这样的:
client 向 tracker 发一个HTTP 的GET请求,并把它自己的信息放在GET的参数中;这个请求的大致意思是:我是xxx(一个唯一的id),我想下载yyy文件,我的ip是aaa,我用的端口是bbb。。。
tracker 对所有下载者的信息进行维护,当它收到一个请求后,首先把对方的信息记录下来(如果已经记录在案,那么就检查是否需要更新),然后将一部分(并非全部,根据设置的参数已经下载者的请求)参与下载同一个文件(一个tracker服务器可能同时维护多个文件的下载)的下载者的信息返回给对方。
Client在收到tracker的响应后,就能获取其它下载者的信息,那么它就可以根据这些信息,与其它下载者建立连接,从它们那里下载文件片断。
关于client和tracker之间通信协议的细节,在“BT协议规范”中已经给出,这里不再重复。下面我们具体分析 tracker服务器的实现细节。
从哪里开始?
要建立一个 tracker服务器,只要运行 bttrack.py 程序就行了,它最少需要一个参数,就是 –dfile,这个参数指定了保存下载信息的文件。Bttrack.py 调用 track.py 中的 track()函数。因此,我们跟踪到 track.py 中去看track() 函数。
Track.py:track()
这个函数首先对命令行的参数进行检查;然后将这些参数保存到 config 字典中。在BT中所有的工具程序,都有类似的处理方式。
接下来的代码:
r = RawServer(Event(), config['timeout_check_interval'], config['socket_timeout'])
t = Tracker(config, r)
r.bind(config['port'], config['bind'], True)
r.listen_forever(HTTPHandler(t.get, config['min_time_between_log_flushes']))
t.save_dfile()
首先是创建一个 RawServer 对象,这是一个服务器对象,它将实现一个网络服务器的一些细节封装起来。不仅tracker服务器用到了 RawServer,我们以后还可以看到,由于每个 client端也需要给其它 client 提供下载服务,因此也同时是一个服务器,client的实现中,也用到了RawServer,这样,RawServer的代码得到了重用。关于 RawServer的详细实现,在后面的小节中进行分析。
接着是创建一个 Tracker对象。
然后让RawServer绑定在指定的端口上(通过命令行传递进来)。
最后,调用 RawServer::listen_forever() 函数,使得服务器投入运行。
最后,在服务器因某些原因结束运行以后,调用 Tracker::save_dfile() 保存下载信息。这样,一旦服务器再次投入运行,可以恢复当前的状态。
其它信息:
1、 BT源码的分布:
把BT的源码展开之后,可以看到有一些python程序,还有一些说明文件等等,此外还有一个BitTorrent目录。这些 python程序,实际是一些小工具,比如制作 metafile的btmakemetafile.py、运行tracker服务器的bttrack.py、运行BT client端的 btdownloadheadless.py 等等。而这些程序中,用到的一些 python 类的实现,都放在子目录 BitTorrent 下面。我们的分析工作,通常是从工具程序入手,比如 bttrack.py,而随着分析的展开,则重点是看 BitTorrenet子目录下的代码。
BT作者 Bram Cohen 在谈到如何开发可维护的代码的一篇文章中([url][/url]),其中提到的一条就是开发一些小工具以简化工作,我想BT的这种源码结构,也正是作者思想的一种体现吧。
2、 我们看到,python和我们以前接触的 c/c++ 不一样的第一个地方就是它的函数在定义的时候,不用指定参数类型。既然这样,那么,在调用函数的时候,你可以传递任意类型的参数进来。例如这样的函数:
def foo(arg):
print type(arg)
你可以这样来调用:
a = 100
b = “hello world”
foo(a)
foo(b)
输出结果是:
这是因为,第一次调用 foo()的时候,传递的是一个整数类型,而第二次调用的时候,传递的是一个字符串类型。
这种参数具有动态类型的特性,是 c/c++等传统的语言是所不具备的。这也是 python 被称为动态语言的一个原因吧。C++的高级特性模板,虽然也使得参数类型可以动态化,但使用起来,远没有python这么简单方便。
太极 2005-10-25 23:22
RawServer类
这篇文章,我们来分析 RawServer 以及一些相关的类。RawServer 类的实现代码,在 BitTorrent 子目录的RawServer.py 中
RawServer 这个类的作用是实现一个网络服务器。关于网络编程的知识,《unix网络编程:卷1》是最经典的书籍,你如果对这块不了解,建议抽时间看看这本书。RawServer 实现的是一种事件多路复用、非阻塞的网络模型。它使用的是 poll() (而不是我们常见的select(),关于 poll和select的比较,也在《unix网络编程:卷1》中有介绍)函数,处理过程大致是这样的:
首先创建一个监听 socket,然后将这个 socket 加入 poll 的事件源;
随后进入服务处理循环,即:
调用 poll() 函数,这个函数会阻塞,直到网络上有某些事件发生或者超时才返回给调用者;
在 poll()返回之后,先检查一下是否有没有处理的任务,如果有,那么先完成这些任务。然后根据事件类型进行处理。
如果是连接请求(监听 socket上的POLLIN事件)到来,它 accept这个请求,如果 accept 成功,那么就和一个 client建立了连接,于是将 accept() 新创建的 socket 加入 poll 的事件源;
如果在已经建立的连接上(连接socket上的POLLIN事件),有数据可读,那么将数据从 client 端读过来,做进一步处理;
如果已经建立的连接已经准备好(连接socket上的POLLOUT事件),可以发送数据,则检查是否有数据需要发送,如果有,那么发送数据给 client 端。
(所以,tracker是一个单进程的服务器,并没有用到线程。)
Bram Cohen 认为软件的可维护性非常重要,使代码易于维护的重要一条就是设计可重用的类,RawServer 在设计的时候,充分考虑到了可重用性,集中表现在两个地方:
1、 将网络 I/O 和数据分析处理分离。
网络服务器的事件多路复用、网络I/O 部分通常是固定不变的,而数据在读取之后,进行分析处理的过程则是可变的。RawServer 将可变的数据处理工作,交给另外一个抽象的类 Handler (实际上并没有这么一个类)来处理。比如,在 tracker 服务器的实现中,具体使用的就是 HTTPHandler 类,而在 以后将要分析的 BT client 实现代码中,用到的具体的Handler 是 Encoder 类。
2、 采用任务队列来抽象出任务处理的过程。
RawServer维护了一个任务队列 unscheduled_tasks(实际是一个二元组的list,二元组的第一项是一个函数,第二项是超时时间)。在初始化的时候,首先向这个队列中加入一个任务:scan_for_timeouts(),这样,每隔一段时间,服务器就会去检查一下是否有连接超时。如果有其它
RawServer的成员函数中,对外暴露的有:
? __init__:(初始化函数)
? add_task():
在任务列表中增加一项任务(一个任务是一个函数以及一个指定的超时时间的组合)
? bind():
首先创建一个socket,然后设置socket的属性: SO_REUSEADDR和IP_TOS,,这两个属性的具体含义请参考《unix网络编程:卷1》,另外还将 socket 设置为非阻塞的。相对于阻塞的 socket来说,非阻塞的 socket 在网络 I/O 性能上要提高许多,但是与此同时,编程的复杂度也要提高一些。象 tracker这种可能同时要处理成千上万个并发连接的服务器,只能采用非阻塞的socket。
然后将该 socket和指定ip已经端口绑定;
最后把这个socket 加入 poll的事件源。
? start_connection():
对外主动建立一个连接,这个函数在处理NAT穿越的时候用到了,我们后面分析到 NAT穿越的时候,再具体讲解。
? listen_forever():
这个函数的功能就是实现了我在前面描述的网络服务器的处理过程。我们看到,它唯一的参数是handler,handler的作用就是封装了对数据的具体处理。
listen_forever()把对网络事件的处理过程,交给了 handle_events()。
其它函数,包括handle_events(),都是内部函数(也就是外部不会直接来调用这些函数)。Python没有c++那样 public、protected、private 这样的保护机制,python类的内部函数命名的惯例是以下划线开始,例如 RawServer 中的 _close_dead()等。
? handle_events():
事件处理过程,主要是根据三种不同的网络事件分别处理,一是连接事件,二是读事件、三是写事件。
if sock == self.server.fileno()
这段代码判断发生事件的socket是否是监听 socket,如果是,那么说明是连接事件。
连接事件的处理:
通过 accept 来接受连接,并将新建立的 socket 设置为非阻塞。
判断当前连接数是否已经达到了最大值(为了限制并发连接的数目,在初始化 RawServer的时候,需要指定最大连接数目),如果已经达到最大值,那么关闭这个新建的连接。
否则,根据新的 socket 创建一个 SingleSocket 对象,(SingleSocket 封装了对 socket的操作。)将这个对象加入内部的列表single_sockets中,以备后用。
将这个新 socket加入 poll 的事件源
最后,调用 Handler 的external_connection_made() 函数,关于这个函数,在后面分析 HTTPHandler 时再讨论。
if (event & POLLIN) != 0:
这段代码判断是否是读事件
读事件的处理:
首先刷新一下连接的最后更新时间 (last_hit)。
然后读取数据;
如果什么也没读到,那么说明连接被关闭了(在网络编程中,如果一个连接正常的被关闭,那么,也会触发读事件,只不过什么也读不到)
否则,调用 Handler的 data_came_in() 函数来处理读到的数据。
if (event & POLLOUT) != 0 and s.socket is not None and not s.is_flushed():
这段代码判断是否是写事件,而且确实有数据需要发送。在一个连接可以写的时候,就会发生写事件。
写事件的处理:
实际代码是在 SingleSocket的 try_write()函数中。
在一个非阻塞的连接上发送指定大小的数据,很可能在一次发送过程中,数据没有被完全发送出去(只发送了一部分)就返回了,所以,每次 write之后,必须判断是否完全发送了数据。如果没有发送完,那么下次有读事件的时候,还得回来继续发送未完得数据。这也是这个函数叫做 try_write 的原因吧。
try_write() 在最后,要重新设置 poll 的事件源。如果数据全部发送完毕了,那么只需要监听读事件(POLLIN)否则,既要监听读事件,也要监听写事件(POLLOUT),这样,一旦连接变的可写,可以继续将剩下的数据发送出去。
? scan_for_timeouts():
任务处理函数,它首先把自身加入未处理任务队列中,这样,经过一段时间,可以保证这个函数再次被调用,从而达到周期性调用的效果。
它检查每个连接是否超过指定时间没有被刷新,如果是,则该连接可能已经僵死,那么它关闭这个连接。
? pop_unscheduled():
从任务列表中弹出一个未处理的任务。
与 RawServer 配合使用的是 SingleSocket 类,这是一个辅助类,主要目的是封装对 socket的处理吧。包括数据的发送,都交给它来处理了。这个类比较简单,大家可以自己去看,我就不罗嗦了。
以上是对 RasServer 的具体实现的一个分析,可能读者看的还是晕晕糊糊,没办法,还是必须自己去看源代码,然后在遇到问题的时候,回头再来看这篇文章,才会有帮助。如果不亲自看源码,终究是纸上谈兵。
我们再来小结一下。
RawServer 封装了网络服务器的实现细节,它实现了一种事件多路处理、非阻塞的网络模型。它主要负责建立新的连接,从网络读取和发送数据,而对读到的数据的具体处理工作,交给 Handler 类来处理,从而把网络I/O和数据处理分离开来,使得 RawServer可以重用。Handler 类是在调用 listen_forever() 的时候,由调用者传递进来的,具体到 tracker服务器,就是HTTPHandler。有了 RawServer,tracker 就可以作为一个网络服务器运行了。
下一节,我们开始分析具体实现 tracker HTTP 协议处理的 HTTPHandler类和Tracker类。
太极 2005-10-25 23:23
本篇文章分析 Tracker 类,它在 track.py 文件中。
在分析之前,我们把前几篇文章的内容再回顾一下,以理清思路。
BT的源码,主要可以分为两个部分,一部分用来实现 tracker 服务器,另一部分用来实现BT的客户端。我们这个系列的文章围绕 tracker 服务器的实现来展开。
BT 客户端与 tracker 服务器之间,通过 track HTTP协议进行通信,而BT客户端之间以BT对等协议进行通信。
Tracker 服务器的职责是搜集客户端的信息,并帮助客户端相互发现对方,从而使得客户端之间能够相互建立连接,进而互相能下载所需的文件片断。
在实现 tracker 服务器的时候,首先是通过 RawServer 类来实现网络服务器的功能,然后由 HTTPHandler 类来完成对协议数据的第一层分析。因为 track HTTP 协议是以HTTP协议的形式交互的,所以 HTTPHandler 按照HTTP的协议对客户端的请求进行第一层处理(也就是取得URL和HTTP 消息头),然后把URL和 HTTP消息头进一步交给 Tracker类来进行第二层分析,并把分析的结果按照 HTTP协议的格式封装以后,发给客户端。
Tracker 类对 track HTTP协议做第二层分析,它根据第一层分析后的URL以及HTTP消息头,进一步得到客户端的信息(包括客户端的ip地址、端口、已下载完的数据以及剩余数据等等),然后综合当前所有下载者的情况,生成一个列表,这个列表记录了下载同一个文件的其它下载者的信息(但不是所有的下载者,只是选择一部分),并把这个列表交给 HTTPHandler,由它进一步返回给客户端。
如此,整个 tracker 服务器的实现,在层次上就比较清晰了。
为了分析 Tracker类,首先要理解“状态文件”。
l 状态文件:
在第一篇文章中,我们说到,要启动一个 tracker 服务器,至少要指定一个参数,就是状态文件。在 Tracker 的初始化函数中,主要就是读取指定的状态文件,并根据该文件做一些初始化的工作。所以必须弄清楚状态文件的作用:
1. 状态文件的作用:
tracker 服务器如果因为某些意外而停止,那么所有的下载者不仅不能继续下载,而且先前所做的努力都前功尽弃。这种情况是不能容忍的,因此,必须保证在 tracker 重新启动之后,所有的下载者还能继续工作。Tracker 服务器周期性的将当前系统中必要的下载状态信息保存到状态文件中,在它因故停止,而后又重新启动的时候,可以根据这些信息重新恢复“现场”,从而使得下载者可以继续下载。
2. 状态文件的格式:
状态文件的信息对应着一个比较复杂的4级嵌套的字典。
要详细分析这个字典类型,必须理解一点:一个 tracker 服务器,可以同时为下载不同文件的几批下载者提供服务。
我们知道,一批下载同一个文件的下载者,它们必然拥有同样的 torrent 文件,它们能根据 torrent 文件找到同一个 tracker 服务器。而下载另一个文件的一批下载者,必然拥有另外一个 torrent 文件,但是这两个不同的 torrent 文件,可能指向的是同一个 tracker 服务器。所以说“一个 tracker 服务器,可以同时为下载不同文件的几批下载者提供服务。”
实际上,那些专门提供 bt 下载的网站,都是架设了一些专门的 tracker 服务器,每个服务器可以同时为多个文件提供下载跟踪服务。
理解了这一点,我们继续分析状态文件的格式
第一级字典:
在 Tracker 的初始化函数中,有这样的代码
if exists(self.dfile):
h = open(self.dfile, 'rb')
ds = h.read()
h.close()
tempstate = bdecode(ds)
else:
tempstate = {}
这段代码是从从状态文件中读取信息,由于读到的是经过 Bencoding 编码后的数据,所以还需要经过解码,解码后就得到一个字典类型的数据,保存到 template 中,这就是第一级字典。它有两个关键字,peers 和 completed,分别用来记录参与下载的peer的信息和已经完成了下载的peer的信息(凡是出现在 completed的peer,也必然出现在 peers中)。这两个关键字对应的数据类型都是字典,我们重点分析 peers 关键字所对应的第二级字典。
第二级字典:
关键字:torrent文件中 info 部分的 SHA hash
数据:第三级字典
一个被下载的文件,唯一的被一个 torrent 文件标识,tracker通过计算torrent文件中 info 部分的 SHA hash,这是一个20字节的字符串,它可以唯一标识被下载文件的信息。第二级字典以此字符串作为关键字,保存下载此文件的下载者们的信息。
第三级字典:
关键字:下载者的 peer id
数据:第四级字典
解释:每个下载者,都创建一个唯一标识自己的20字节的字符串,称为 peer id。第三级字典以次为关键字,保存每个下载者的信息。
第四级字典:
关键字: ip、port、left等
数据:分别保存下载者的 ip地址、端口号和未下载完成的字节数
另外还有两个可选的关键字given ip 和nat,它们是用于 NAT 的,关于NAT的情况,后面会再提到。
理解了这个4级嵌套的字典,对 Tracker 的分析才好继续进行下去。
下面我们挨个看 Tracker 类的成员函数。
l 初始化函数 __init__():
开始是一些参数的初始化,其中比较难理解的有:
self.response_size = config['response_size']
self.max_give = config['max_give']
要理解这两个参数,必须看那份更详细的BT协议规范中对“numwant”关键字的解释:
? numwant: Optional. Number of peers that the client would like to receive from the tracker. This value is permitted to be zero. If omitted, typically defaults to 50 peers.
If a client wants a large peer list in the response, then it should specify the numwanted parameter.
意思就是说,默认情况下,tracker 服务器给下载者响应的 peers 个数是 response_size 个,但有时候,下载者可能希望获得更多的 peers 信息,那么它必须在请求中包含 numwant 关键字,并指定希望获得 peers 的个数。例如是 300,tracker 取 300和 max_give中较小的一个,作为返回给下载者的 peers 的个数。
self.natcheck = config['nat_check']
self.only_local_override_ip = config['only_local_override_ip']
这两个参数是和 NAT 相关的,我们终于必须要说到 NAT 了。
我们知道,如果一个 BT 客户端处在局域网中,通过 NAT 之后连到 tracker 服务器的话,那么 tracker 服务器从连接中获得的该客户端的 IP 地址是一个公网IP,如果其它客户端通过这个 IP 试图连接该客户端的话,肯定会被 NAT 拒绝的。
通过一些 NAT 穿越的技术,在某些情况下,可以让一些客户端穿过 NAT,与处在局域网中的客户端建立连接,具体的技术资料我已经贴在论坛上了,大家有兴趣可以去看一看。原来我以为 BT 也用到了一些 NAT 穿越技术,但现在发现并没有,可能是技术实现上比较复杂,而且不能保证在任何情况下都有效的原因吧。
我们来看那份比较详细的协议规范中,对“ip”关键字的解释:
? ip: Optional. The true IP address of the client machine, in dotted quad format. Notes: In general this parameter is not necessary as the address of the client can be determined from the IP address from which the HTTP request came. The parameter is only needed in the case where the IP address that the request came in on is not the IP address of the client. This happens if the client is communicating to the tracker through a proxy (or a transparent web proxy/cache.) It also is necessary when both the client and the tracker are on the same local side of a NAT gateway. The reason for this is that otherwise the tracker would give out the internal (RFC1918) address of the client, which is not routeable. Therefore the client must explicitly state its (external, routeable) IP address to be given out to external peers. Various trackers treat this parameter differently. Some only honor it only if the IP address that the request came in on is in RFC1918 space. Others honor it unconditionally, while others ignore it completely.
在客户端发给 tracker 服务器的请求中,可能包含“ip”,也就是指定自己的 IP 地址。你可能有疑问了,客户端为什么要通知 tracker服务器自己的 ip 地址了?tracker 服务器完全可以从连接中获得这个 ip 啊。嗯,实际的网络情况是非常复杂的,如果客户端是在局域网内通过 NAT 后上网,或者客户端是通过某个代理服务器之后,再与 tracker 服务器建立连接,那么 tracker 从连接中获得的 ip 地址并不是客户端真实的 ip 地址,为了获得真实的ip,必须让客户端主动在协议中通知tracker。因此,就出现了两个 ip 地址,一个是从连接中获得的 ip 地址,我把它叫做“连接ip”,另一个是客户端通过请求传递过来的 ip,我叫它“真实ip”。显然,tracker 应该把客户端的“真实ip”记录下来,并把这个“真实ip”通知给其它下载者。
这个“ip”参数又是可选的,也就是说,如果客户端拥有一个公网的ip,而且并没有通过NAT或者代理,那么,它并不需要传递这个参数,“连接ip”就是“真实ip”。
按协议规发的说法,“ip”这个参数在以下两种情况下有用:
1、客户端可能拥有一个公网IP,但它又是通过一个代理服务器与tracker服务器建立连接的,它需要传递“ip”。
2、客户端在某个局域网中,恰好tracker也在同一个局域网中,。。。(这种情况又会怎么样了?我还没有弄明白 :)
回过头来看 natcheck 和 only_local_override_ip,
natcheck :how many times to check if a downloader is behind a NAT (0 = don't check)
only_local_override_ip:如果从 GET 参数中传递过来的 ip,是一个公网 ip,是否忽略它?它的默认值是 1。
现在还不好理解它的意思,我们看后面代码的时候,再来理解它。
self.becache1 = {}
self.becache2 = {}
self.cache1 = {}
self.cache2 = {}
self.times = {}
这里出现5个字典,其中times 用来,而其它4个字典的作用是什么?
嗯,还是让我们先来看看在“BT移植邮件列表”中,Bram Cohen 发的一个帖子,
There are two new GET parameters for the tracker in the latest release. They are –
key=xxxx - this is like peer id, but it's only known to the client and the tracker. It allows clients to be behind dynamic IP. If a peer announced a key previously, then it's accepted if and only if it gives the same key again. If no key was given, then the fallback is checking that the IP hasn't changed. If the IP has changed, mainline currently will give a peer list but not change any data related to that peer, so that peers behind dynamic IP using old clients will continue to work okay. Currently mainline generates the value associated with key as eight random hex values, and the tracker accepts any string from clients.
compact=1 - when a client sends this, the 'peers' return value is a single string whose length is a multiple of 6 rather than a dict. To extract peer information from the string, chop it into substrings of length 6. For each substring, the first four bytes are the IP and the last two are the port, encoded big-endian. This results in huge bandwidth savings.
Everybody developing ports should implement these keys, they're very useful.
BT 在不停的向前发展,所以协议规范也在发展之中,新引入了两个关键字,其中一个是 compact,如果客户端请求中 compact=1,表示紧凑模式,也就是tracker给客户端响应的数据,采用一种比原来更紧凑的形式,这样可以有效的节约带宽。
Becache1 和 cache1用于普通模式,而 becache2和 cache2用于紧凑模式。我们马上能看到它们的初始化操作。
if exists(self.dfile):
h = open(self.dfile, 'rb')
ds = h.read()
h.close()
太极 2005-10-25 23:23
tempstate = bdecode(ds)
else:
tempstate = {}
if tempstate.has_key('peers'):
self.state = tempstate
else:
self.state = {}
self.state['peers'] = tempstate
self.downloads = self.state.setdefault('peers', {})
self.completed = self.state.setdefault('completed', {})
statefiletemplate(self.state)
这部分代码是读取状态文件,初始化 downloads和completed这两个字典,并检查读取的数据是否有效。
现在,downloads里面是保存了所有下载者的信息,而 completed保存了所有完成下载的下载者的信息。
for x, dl in self.downloads.items():
self.times[x] = {}
for y, dat in dl.items():
self.times[x][y] = 0
if not dat.get('nat',1):
ip = dat['ip']
gip = dat.get('given ip')
if gip and is_valid_ipv4(gip) and (not self.only_local_override_ip or is_local_ip(ip)):
ip = gip
self.becache1.setdefault(x,{})[y] = Bencached(bencode({'ip': ip, 'port': dat['port'], 'peer id': y}))
self.becache2.setdefault(x,{})[y] = compact_peer_info(ip, dat['port'])
这里,对 times、becache1、becache2初始化。它们都是2级嵌套的字典,第一级的关键字是 torrent 文件中的 info 部分的 hash,第二级关键字是下载者的 peer id,becache1保存的是一个 Bencached 对象,而 becache2 保存的是一个字符串,它是把 ip和port 组合成的一个字符串。
参数设置完之后,有:
rawserver.add_task(self.save_dfile, self.save_dfile_interval)
add_task() 我们已经见到过好多次了,这表示每隔一段时间,需要调用 save_dfile() 来保存状态文件。
再后面的代码,我没有仔细看了,象 allow_get 和 allowed_dir 等的意义,还需要看相关的代码才能明白,如果你仔细看了这些部分,希望能补充一下。
初始化以后,就是 Tracker 的最重要,也是代码最长的函数: get() 。
l get():
在第三篇文章中,我们已经看到,在由 HTTPHandler 对 track HTTP协议进行第一层分析之后,就是调用 Tracker::get() 来进行第二层分析的。它的参数是 URL 和 HTTP 消息头。
在这个函数中,首先调用 urlparse() 对 URL 进行解析,例如这样的 URL :
/announce?ip=192.168.112.1&port=9999&left=2000
解析之后,就获得了 path,是announce,还有参数,包括:
ip:192.168.112.1
port:9999
left:2000
然后,根据 path 的不同,分别处理。
一般来说,客户端发给 tracker 的请求中,path 都是 announce,但有时候,第三方可能也想查询一下 tracker 服务器的状态,那么它可以通过其它的 path 来向 tracker 服务器请求,例如 scrape。在一些专门提供 bt 下载的网站上,我们可以看到不停更新的下载者、种子个数等信息,就是用这种方式从 tracker 服务器处获得的。
我们只看 path 是 announce 的情况。
首先是对客户端传递来的参数的有效性进行检查,包括是不是有 info_hash 关键字?ip地址是否合法等等。
然后,
ip = connection.get_ip()
这样得到的 ip ,是根据客户端与 tracker 服务器建立的连接中获取的 ip,就是“连接ip”了。
接下来,
ip_override = 0
if params.has_key('ip') and is_valid_ipv4(params['ip']) and (not self.only_local_override_ip or is_local_ip(ip)):
ip_override = 1
这段代码的意图,是为了判断在随后保存客户端的 ip 地址的时候,是否要用“真实ip”来取代“连接ip”。如果 ip_override 为1,那么就保存“真实ip”,也就是“连接ip”被“真实ip”覆盖(override)了。
太极 2005-10-25 23:24
分析源码的过程其实就是揣测作者意图的过程,我的揣测是这样的:
如果客户端从请求中传递了“真实ip”,那么对 tracker来说,,既然客户端都已经报告了“真实ip”了,那么当然就保存“真实ip”就好了。可如果“真实 ip ”是个公网 ip,而且only_local_override_ip=1,也就是说,忽略“真实ip”为公网ip的情况,那么,保存的是“连接”ip。
说句实话,为什么要设置 only_local_override_ip 这么一个参数,我还是没有弄明白。
if peers.has_key(myid):
myinfo = peers[myid]
if myinfo.has_key('key'):
if params.get('key') != myinfo['key']:
return (200, 'OK', {'Content-Type': 'text/plain', 'Pragma': 'no-cache'},
bencode({'failure reason': 'key did not match key supplied earlier'}))
confirm = 1
elif myinfo['ip'] == ip:
confirm = 1
else:
confirm = 1
这段代码涉及到身份验证吧,我没有仔细看了,关于 “key”的解释,请看上面Bram Cohen的帖子。
接下来,如果验证通过,而且事件不是“stopped”,那么就把客户端的信息保存下来。如果已经存在该客户端的信息,那么就更新一下。注意这里 ip_override 派上了用场,也就是如果覆盖,那么保存的是“真实ip”,否则保存的是“连接ip”。
if port == 0:
peers[myid]['nat'] = 2**30
elif self.natcheck and not ip_override:
to_nat = peers[myid].get('nat', -1)
if to_nat and to_nat < self.natcheck:
NatCheck(self.connectback_result, infohash, myid, ip, port, self.rawserver)
else:
peers[myid]['nat'] = 0
第一个 port == 0 的情况,不知道是什么意思?
第二个表示要检查 NAT的情况。大概意思就是 tracker服务器主动用 BT对等协议与该客户端进行握手,如果握手成功,那么说明该客户端是可以被直接连接的。这一点很重要,如果 tracker 服务器无法和客户端直接建立连接的话,那么其它下载者也无法和该客户端建立连接。
这里用到的 NatChecker 类,也是一个 Handler 类,具体细节,大家自己分析吧。
data = {'interval': self.reannounce_interval}
从这到最后,就是根据紧凑模式和普通模式两种不同情况,分别从 becache1或者 becache2中,返回随机的 peers 的信息。
在这里,我们来总结一下 cache1、becache1、cache2、becache2的用处。我感觉 cache1和 cache2 好像没什么作用,因为从代码中没有看到它们两的意义。Becache1和 becache2则分别用于普通模式和紧凑模式情况下,对 peers 的信息进行缓存。它们从状态文件中初始化自己;如果有新的 peer 出现,被添加到这两个缓存中;如果是“stopped”事件,那么从缓存中删除对应的 peer。最后,tracker 根据情况,从其中一个缓存取得随机的 peers 的信息,返回给客户端。
l connectback_result()
这个函数,用于 NatCheck 类作为回调函数。它根据 tracker 服务器主动与客户端建立连接的结果做一些处理。其中的参数 result,是表示tracker 与客户端建立连接是否成功。如果建立成功,显然对方不在 NAT 后面,否则就是在 NAT 后面了。record['nat'] += 1 这没看懂,为什么不是直接 record['nat'] = 1 ?最后,如果建立连接成功,那么更新一下 becache1 和 becache2。