http://blog.csdn.net/zbyufei/article/details/6092406
http://blog.daxuxu.info/2010/11/pycurl_use_examples.html
最近需要对节点到源站自己做个监控,简单的ping可以检测到一些东西,但是http请求的检查也要进行,于是就研究了下pycurl
pycurl是个用c语言实现的python 库,虽然据说不是那么pythonic,但是却很高效,它支持的协议居多:
supporting FTP, FTPS, HTTP, HTTPS, GOPHER, TELNET, DICT, FILE and LDAP. libcurl supports HTTPS certificates, HTTP POST, HTTP PUT, FTP uploading, kerberos, HTTP form based upload, proxies, cookies, user+password authentication, file transfer resume, http proxy tunneling and more!
这一堆协议已经很多了,我需要就是http一个,相对urlib来说,这个库可能更快些。
以下这个脚本是对某一个给定的url进行检查,并打印出http相应码,响应大小,建立连接时间,准备传输时间,传输第一个字节时间,完成时间
#!/usr/bin/python
# coding: UTF-8
import StringIO
import pycurl
import sys
import os
class Test:
def __init__(self):
self.contents = ''
def body_callback(self,buf):
self.contents = self.contents + buf
def test_gzip(input_url):
t = Test()
#gzip_test = file("gzip_test.txt", 'w')
c = pycurl.Curl()
c.setopt(pycurl.WRITEFUNCTION,t.body_callback)
c.setopt(pycurl.ENCODING, 'gzip')
c.setopt(pycurl.URL,input_url)
c.perform()
http_code = c.getinfo(pycurl.HTTP_CODE)
http_conn_time = c.getinfo(pycurl.CONNECT_TIME)
http_pre_tran = c.getinfo(pycurl.PRETRANSFER_TIME)
http_start_tran = c.getinfo(pycurl.STARTTRANSFER_TIME)
http_total_time = c.getinfo(pycurl.TOTAL_TIME)
http_size = c.getinfo(pycurl.SIZE_DOWNLOAD)
print 'http_code http_size conn_time pre_tran start_tran total_time'
print "%d %d %f %f %f %f"%(http_code,http_size,http_conn_time,http_pre_tran,http_start_tran,http_total_time)
if __name__ == '__main__':
input_url = sys.argv[1]
test_gzip(input_url)
|
脚本运行效果
xu:~/curl$ python pycurl_test.py
http_code http_size conn_time pre_tran start_tran total_time
200 8703 0.748147 0.748170 1.632642 1.636552
|
pycurl 的一些响应信息:
(参考: http://curl.haxx.se/libcurl/c/curl_easy_getinfo.html )
pycurl.NAMELOOKUP_TIME 域名解析时间
pycurl.CONNECT_TIME 远程服务器连接时间
pycurl.PRETRANSFER_TIME 连接上后到开始传输时的时间
pycurl.STARTTRANSFER_TIME 接收到第一个字节的时间
pycurl.TOTAL_TIME 上一请求总的时间
pycurl.REDIRECT_TIME 如果存在转向的话,花费的时间
pycurl.EFFECTIVE_URL
pycurl.HTTP_CODE HTTP 响应代码
pycurl.REDIRECT_COUNT 重定向的次数
pycurl.SIZE_UPLOAD 上传的数据大小
pycurl.SIZE_DOWNLOAD 下载的数据大小
pycurl.SPEED_UPLOAD 上传速度
pycurl.HEADER_SIZE 头部大小
pycurl.REQUEST_SIZE 请求大小
pycurl.CONTENT_LENGTH_DOWNLOAD 下载内容长度
pycurl.CONTENT_LENGTH_UPLOAD 上传内容长度
pycurl.CONTENT_TYPE 内容的类型
pycurl.RESPONSE_CODE 响应代码
pycurl.SPEED_DOWNLOAD 下载速度
pycurl.SSL_VERIFYRESULT
pycurl.INFO_FILETIME 文件的时间信息
pycurl.HTTP_CONNECTCODE HTTP 连接代码
pycurl.HTTPAUTH_AVAIL
pycurl.PROXYAUTH_AVAIL
pycurl.OS_ERRNO
pycurl.NUM_CONNECTS
pycurl.SSL_ENGINES
pycurl.INFO_COOKIELIST
pycurl.LASTSOCKET
pycurl.FTP_ENTRY_PATH
########################################################
前几天看完《》,预想练手,想起同事的一个 ruby 代码,尝试改写成 python,顺便看看两个语言的简练程度。下面是原始的 ruby 代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
#!/usr/bin/env ruby require 'rubygems' require 'net/http' urls = [""] 50.times do urls.each do |url| start_at = Time.now Net::HTTP.get URI.parse(url) end_at = Time.now diff = end_at - start_at if diff < 0.3 then color_code = 32 elsif diff > 0.8 then color_code = 31 else color_code = 33 end puts "#{url}\n time: \033[#{color_code}m#{diff}\033[0m seconds" end end
|
改写 python 的同时,考虑脚本的灵活性准备增加两个参数,第一个是请求测试次数,第二个是请求测试的 URL,而 python 默认提供了argparse 库,可以很方便的生成 --help 的帮助和解析传递的参数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
#!/usr/bin/env python import urllib2 import time import sys import argparse def benchmark(url, count): for i in range(count): s = time.time() r = urllib2.urlopen(urllib2.Request(url)) e = time.time() diff = e - s if diff < 0.3: color_code = 32 elif diff > 0.8: color_code = 31 else: color_code = 33 print '# %d' % (i + 1) print '\tStauts: %s' % r.getcode() print '\tTime: \033[%dm%f\033[0m second(s)' % (color_code, diff) def main(argv): parser = argparse.ArgumentParser(description='url request time test') parser.add_argument('URL', help='request url') parser.add_argument('-t', '--time', action='store', dest='count', type=int, default=10, help='request times') args = parser.parse_args(argv) benchmark(args.URL, args.count) if __name__ == '__main__': main(sys.argv[1:])
|
当然,我主要是为了练手 python 才去写的,ruby 本身也有 库用于解析参数,但是需要自己手写生成 --help 的输出,而且需要对每个参数做相应的 callback。
效果如下:
Posted by on 2012-08-02
#########################################################
http://blog.raphaelzhang.com/2012/03/issues-in-python-crawler/
用Python抓网页的注意事项
用Python编一个抓网页的程序是非常快的,下面就是一个例子:
|
importurllib2
html=urllib2.urlopen('http://blog.raphaelzhang.com').read()
|
但是在实际工作中,这种写法是远远不够的,至少会遇到下面几个问题:
-
网络会出错,任何错误都可能。例如机器宕了,网线断了,域名出错了,网络超时了,页面没有了,网站跳转了,服务被禁了,主机负载不够了…
-
服务器加上了限制,只让常见浏览器访问
-
服务器加上了防盗链的限制
-
某些2B网站不管你HTTP请求里有没有Accept-Encoding头部,也不管你头部具体内容是什么,反正总给你发gzip后的内容
-
URL链接千奇百怪,带汉字的也罢了,有的甚至还有回车换行
-
某些网站HTTP头部里有一个Content-Type,网页里有好几个Content-Type,更过分的是,各个Content-Type还不一样,最过分的是,这些Content-Type可能都不是正文里使用的Content-Type,从而导致乱码
-
网络链接很慢,乘分析几千个页面的时间,建议你可以好好吃顿饭去了
-
Python本身的接口有点糙
好吧,这么一大箩筐问题,我们来一个个搞定。
错误处理和服务器限制
首先是错误处理。由于urlopen本身将大部分错误,甚至包括4XX和5XX的HTTP响应,也搞成了异常,因此我们只需要捕捉异常就好了。同时,我们也可以获取urlopen返回的响应对象,读取它的HTTP状态码。除此之外,我们在urlopen的时候,也需要设置timeout参数,以保证处理好超时。下面是代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
importurllib2
importsocket
try:
f=urllib2.urlopen('http://blog.raphaelzhang.com',timeout=10)
code=f.getcode()
ifcode<200orcode>=300:
#你自己的HTTP错误处理
exceptException,e:
ifisinstance(e,urllib2.HTTPError):
print'http error: {0}'.format(e.code)
elifisinstance(e,urllib2.URLError)andisinstance(e.reason,socket.timeout):
print'url error: socket timeout {0}'.format(e.__str__())
else:
print'misc error: '+e.__str__()
|
如果是服务器的限制,一般来说我们都可以通过查看真实浏览器的请求来设置对应的HTTP头部,例如针对浏览器的限制我们可以设置User-Agent头部,针对防盗链限制,我们可以设置Referer头部,下面是示例代码:
|
importurllib2
req=urllib2.Request('http://blog.raphaelzhang.com',
headers={"Referer":"",
"User-Agent":"Mozilla/5.0 (Windows NT 5.1) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.68 Safari/534.24"
})
html=urllib2.urlopen(url=req,timeout=10).read()
|
有的网站用了Cookie来限制,主要是涉及到登录和限流,这时候没有什么通用的方法,只能看能否做自动登录或者分析Cookie的问题了。
URL与内容处理
URL里奇形怪状的格式只能个别分析,个别处理,逮到一个算一个。例如针对URL里可能有汉字,相对路径,以及回车换行之类的问题,我们可以先用urlparse模块的urljoin函数处理相对路径的问题,然后去掉乱七八糟的回车换行之类的玩意,最后用urllib2的quote函数处理特殊字符编码和转义的问题,生成真正的URL。
当然,在使用的过程中你会发现Python的urljoin和urlopen都有些问题,因此具体的代码我们在后面再给。
对于那些不管三七二十一就把gzip内容丢过来的,我们直接分析它的内容格式,不要管HTTP头部就好了。代码是这样的:
|
importurllib2
importgzip,cStringIO
html=urllib2.urlopen('http://blog.raphaelzhang.com').read()
ifhtml[:6]=='\x1f\x8b\x08\x00\x00\x00':
html=gzip.GzipFile(fileobj=cStringIO.StringIO(html)).read()
|
好了,现在又到了编码处理的痛苦时间了。由于我们是中国人,用的是汉字,因此必须要搞清楚一段文本使用的到底是什么编码(f*ck),不然就会出现传说中的乱码。
按照一般浏览器的处理流程,判断网页的编码首先是根据HTTP服务器发过来的HTTP响应头部中Content-Type字段,例如text/html; charset=utf-8就表示这是一个HTML网页,使用的是utf8编码。如果HTTP头部中没有,或者网页内容的head区中有charset属性或者其http-equiv属性为Content-Type的meta元素,如果有charset属性,则直接读取这个属性,如果是后者,则读取这个元素的content属性,格式与上述格式类似。
按理来说,如果大家都按规矩出牌,编码的问题是很好搞定的。但是,问题就在于大家可能不按规矩办事,走自己的捷径…(OMG)。首先,HTTP响应里不一定有Content-Type头部,其次某些网页里可能没有Content-Type或者有多个Content-Type(例如百度搜索的缓存页面)。这个时候,我们就必须冒险猜测了,因此处理编码的一般流程是:
-
读取urlopen返回的HTTP相应对象的headers.dict['content-type']属性,将charset读取出来
-
利用正则表达式解析HTML里head区对应的meta元素中Content-Type/charset属性的值,并读取出来
-
如果上面两步中获得的编码只有gb2312或者gbk,则可以认为网页的编码是gbk(反正gbk兼容gb2312)
-
如果发现有多个编码,而且这些编码互不兼容,例如又有utf8又有ios8859-1的,那我们只好使用chardet模块了,调用chardet的detect函数,读取encoding属性即可
-
获得了编码后,可以将网页用decode方法转码为Python中的unicode串以方便后续的统一处理
几个需要注意的地方,我的做法是只要网页的编码是gb2312,我就将其假定为gbk。因为不少网站虽然写的是gb2312,实际上是gbk编码的,毕竟gb2312覆盖的字符数太少,很多字,例如朱棣、陶喆,朱镕基这些词中都有字不在gb2312的覆盖范围内。
其次,chardet不是万能的,如果有现成的Content-Type/charset,就仍然使用Content-Type/charset。因为chardet使用的是一种猜测方式来做的,主要是参考的老版本的火狐代码中的猜测算法,就是根据某些编码的特征来猜测的。例如gb2312编码中“的”这个字的对应的编码的出现频率可能比较高等。在我的实际使用中,它出错的概率也有10%左右吧。另外,因为它要分析和猜测,所以时间也比较长。
检测编码的代码有点长,具体可以参看里的getencoding函数的实现。
提高性能
网页抓取程序的主要性能瓶颈都在网络处理上。关于这一点,可以使用cProfile.run和pstats.Stats来测试一下每个函数的调用时间就可以得到验证了。一般来说,可以通过下面几种方法来解决:
-
使用threading或者multiprocess来并行处理,同时抓取多个页面,使用都非常简单的,文档在
-
在HTTP请求里面加上Accept-Encoding头部,将其设为gzip即可,表示可以接受gzip压缩的数据,绝大多数网站都支持gzip压缩,可以节省70 ~ 80%的流量
-
如果不需要读取所有内容,可以在HTTP请求里面加上Range头部(HTTP断点续传也需要这个头部的)。例如把Range头部的值设为bytes=0-1023就是相当于请求开头1024个字节的内容,这样可以大大节省带宽。不过少数网站不支持这个头部,这个需要注意下
-
调用urlopen的时候一定要设置timeout参数,不然可能程序会永远等待在那里的
并行处理具体使用多线程还是多进程,就我现在测试来看区别不大,毕竟主要瓶颈是在网络链接上。
其实除了上述方法外,在Python里还有一些可能更好的提高性能的方法。例如使用,,等支持更好多线程多进程的工具或python,还可以使用异步IO,例如或者PycURL。不过个人对greenlet不熟,觉得twisted实在太twisted,PycURL不够pythonic,而stackless和pypy怕破坏了其他python程序,因此仍然使用urllib2 + threading方案。当然,因为GIL的问题,python多线程仍然不够快,可是对单线程情况来说,已经有数倍时间的节省了。
Python的小问题
在抓网页的时候,Python暴露出一些小问题。主要是urlopen和urljoin函数的问题。
urlparse模块里的urljoin函数,其功能是将一个相对url,例如../img/mypic.png,加上当前页面的绝对url,例如http://blog.raphaelzhang.com/apk/index.html,转化为一个绝对URL,例如在这个例子里就是http://blog.raphaelzhang.com/img/mypic.png。但是urljoin处理的结果还需要进一步处理,就是去掉多余的..路径,去掉回车换行等特殊符号。代码是这样的:
|
fromurlparseimporturljoin
relurl='../../img/\nmypic.png'
absurl='http://blog.raphaelzhang.com/2012/'
url=urljoin(absurl,relurl)
#url为http://blog.raphaelzhang.com/../img/\nmypic.png
url=reduce(lambdar,x:r.replace(x[0],x[1]),[('/../','/'),('\n',''),('\r','')],url)
#url为正常的http://blog.raphaelzhang.com/img/mypic.png
|
urlopen函数也有一些问题,它其实对url字符串有自己的要求。首先,你交给urlopen的函数需要自己做汉字等特殊字符的编码工作,使用urllib2的quote函数即可,就像这样:
|
importurllib2
#此处的url当然是经过了上述urljoin和reduce处理过后的良好的绝对URL了
url=urllib2.quote(url.split('#')[0].encode('utf8'),safe="%/:=&?~#+!$,;'@()*[]")
|
其次,url里面不能有#。因为从理论上来说,#后面的是fragment,是在获取整个文档以后定位用的,而不是用来获取文档用的,但是实际上urlopen应该可以自己做这个事的,却把这个任务留给了开发人员。上面我已经用url.split(‘#’)[0]的方式来处理这个问题了。
最后,Python 2.6和之前的版本不能处理类似http://blog.raphaelzhang.com?id=123这样的url,会导致运行时错误,我们必须手工处理一下,将这种url转化为正常的http://blog.raphaelzhang.com/?id=123这样的url才行,不过这个bug在Python 2.7里面已经解决了。
好了,Python网页抓取的上述问题告一段落了,下面我们还要处理分析网页的问题,留待下文分解。
httplib实现了HTTP和HTTPS的客户端协议,一般不直接使用,在python更高层的封装模块中(urllib,urllib2)使用了它的http实现。
Code highlighting produced by Actipro CodeHighlighter (freeware)
-->
httplib.HTTPConnection ( host [ , port [ , strict [ , timeout ]]] )
HTTPConnection类的构造函数,表示一次与服务器之间的交互,即请求/响应。参数host表示服务器主机,如:;port为端口号,默认值为80; 参数strict的 默认值为false, 表示在无法解析服务器返回的状态行时( status line) (比较典型的状态行如: HTTP/1.0 200 OK ),是否抛BadStatusLine 异常;可选参数timeout 表示超时时间。
HTTPConnection提供的方法:
Code highlighting produced by Actipro CodeHighlighter (freeware)
-->HTTPConnection.request ( method , url [ , body [ , headers ]] )
调用request 方法会向服务器发送一次请求,method 表示请求的方法,常用有方法有get 和post ;url 表示请求的资源的url ;body 表示提交到服务器的数据,必须是字符串(如果method 是”post” ,则可以把body 理解为html 表单中的数据);headers 表示请求的http 头。
HTTPConnection.getresponse ()
获取Http 响应。返回的对象是HTTPResponse 的实例,关于HTTPResponse 在下面 会讲解。
HTTPConnection.connect ()
连接到Http 服务器。
HTTPConnection.close ()
关闭与服务器的连接。
HTTPConnection.set_debuglevel ( level )
设置高度的级别。参数level 的默认值为0 ,表示不输出任何调试信息。
httplib.HTTPResponse
HTTPResponse表示服务器对客户端请求的响应。往往通过调用HTTPConnection.getresponse()来创建,它有如下方法和属性:
HTTPResponse.read([amt])
获取响应的消息体。如果请求的是一个普通的网页,那么该方法返回的是页面的html。可选参数amt表示从响应流中读取指定字节的数据。
HTTPResponse.getheader(name[, default])
获取响应头。Name表示头域(header field)名,可选参数default在头域名不存在的情况下作为默认值返回。
HTTPResponse.getheaders()
以列表的形式返回所有的头信息。
HTTPResponse.msg
获取所有的响应头信息。
HTTPResponse.version
获取服务器所使用的http协议版本。11表示http/1.1;10表示http/1.0。
HTTPResponse.status
获取响应的状态码。如:200表示请求成功。
HTTPResponse.reason
返回服务器处理请求的结果说明。一般为”OK”
下面通过一个例子来熟悉HTTPResponse中的方法:
Code highlighting produced by Actipro CodeHighlighter (freeware)
-->import httplib
conn = httplib.HTTPConnection("", 80, False)
conn.request('get', '/', headers = {"Host": "",
"User-Agent": "Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN; rv:1.9.1) Gecko/20090624 Firefox/3.5",
"Accept": "text/plain"})
res = conn.getresponse()
print 'version:', res.version
print 'reason:', res.reason
print 'status:', res.status
print 'msg:', res.msg
print 'headers:', res.getheaders()
#html
#print '\n' + '-' * 50 + '\n'
#print res.read()
conn.close()
Httplib模块中还定义了许多常量,如:
Httplib. HTTP_PORT 的值为80,表示默认的端口号为80;
Httplib.OK 的值为200,表示请求成功返回;
Httplib. NOT_FOUND 的值为404,表示请求的资源不存在;
可以通过httplib.responses 查询相关变量的含义,如:
Print httplib.responses[httplib.NOT_FOUND] #not found
更多关于httplib的信息,请参考Python手册httplib 模块。
urllib 和urllib2实现的功能大同小异,但urllib2要比urllib功能等各方面更高一个层次。目前的HTTP访问大部分都使用urllib2.
urllib2:
Code highlighting produced by Actipro CodeHighlighter (freeware)
-->req = urllib2.Request('')
response = urllib2.urlopen(req)
the_page = response.read()
FTP同样:
Code highlighting produced by Actipro CodeHighlighter (freeware)
-->req = urllib2.Request('ftp://pythoneye.com')
urlopen返回的应答对象response有两个很有用的方法info()和geturl()
geturl — 这个返回获取的真实的URL,这个很有用,因为urlopen(或者opener对象使用的)或许
会有重定向。获取的URL或许跟请求URL不同。
Data数据
有时候你希望发送一些数据到URL
前言
最近在编写一个简单的WEB服务器,一个日常的工作就是测试服务器的性能,试用了MS的Web Application Stress,发现它居然不支持除80以外端口的测试,其他的如Loadrunner 太贵而且太大,试用版只支持10个并发用户,我Google到了100个并发用户的许可想试用一下,不过没有安装成功。想想这种压力测试实际上没啥技术含 量,就自己用Python来编写了小段测试代码。
使用Python的原因
毫无疑问,编写这样的代码使用Python最合适,使用C/C++编写有点小题大做,使用C#编写编译又很麻烦,我是使用Editplus来写代码 的,因为要考虑做不同的测试,只能边写边调整,使用Python,下载一个Python的加亮文件,设置User Tool 1 到 Python,运行的时候只需要按Ctrl+1,着实方便的很。
压力测试的实现
压力测试是通过模拟对WEB服务器的访问,进行记录响应时间,计算每秒处理数,记录上传下载的字节数。一般来说,一台测试机器上视机器的性能,发起 50~200的连接,基本就差不多了。考虑到测试机器的负载,一般使用多线程来完成多个WEB请求,幸运的是,Python对所有的这些支持的相当完善。 以下是测试的代码
# code by 李嘉
# 禁止任何商业目的的转载
# 不对因使用代码产生任何后果负任何责任
# 转载请保留所有声明
import threading, time, httplib, random
# 需要测试的 url 列表,每一次的访问,我们随机取一个
urls = [
"/test?page=",
"/test2?orderby=a&page=",
"/test2?orderby=d&page=",
]
MAX_PAGE = 10000
SERVER_NAME = ""
TEST_COUNT = 10000
# 创建一个 threading.Thread 的派生类
class RequestThread(threading.Thread):
# 构造函数
def __init__(self, thread_name):
threading.Thread.__init__(self)
self.test_count = 0
# 线程运行的入口函数
def run(self):
# 不直接把代码写在run里面是因为也许我们还要做其他形式的测试
i = 0
while i < TEST_COUNT:
self.test_performace()
i += 1
#self.test_other_things()
def test_performace(self):
conn = httplib.HTTPConnection(SERVER_NAME)
# 模拟 Keep-Alive 的访问, HTTP 1.1
for i in range(0, random.randint(0, 100)):
# 构造一个 url,提供随机参数的能力
url = urls[random.randint(0, len(urls) - 1)];
url += str(random.randint(0, MAX_PAGE))
# 这就连接到服务器上去
#print url
try:
conn.request("GET", url)
rsps = conn.getresponse()
if rsps.status == 200:
# 读取返回的数据
data = rsps.read()
self.test_count += 1
except:
continue
conn.close()
# main 代码开始
# 开始的时间
start_time = time.time()
threads = []
# 并发的线程数
thread_count = 100
i = 0
while i < thread_count:
t = RequestThread("thread" + str(i))
threads.append(t)
t.start()
i += 1
# 接受统计的命令
word = ""
while True:
word = raw_input("cmd:")
if word == "s":
time_span = time.time() - start_time
all_count = 0
for t in threads:
all_count += t.test_count
print "%s Request/Second" % str(all_count / time_span)
elif word == "e":
# 准备退出 其实 X 掉 窗口更加容易,没什么浪费的资源
TEST_COUNT = 0
for t in threads:
t.join(0)
break ################################################################
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""通过 ping 命令找出响应时间最短的 ip 或域名
支持 windows,unix,linux
"""
import re
import os
import subprocess
import sys
def ping(host):
"""返回 ping 结果
host 参数应为字符串类型的 ip 或域名
'192.168.1.1' or ''
返回 host, ip, time, lost
host:域名或 ip,字符串类型
ip:ip 地址,字符串类型,默认值为'0.0.0.0'
time:平均响应时间(ms),int 类型,默认值为0
lost:平均丢包率(%),int 类型,默认值为0
返回值示例:
('baidu.com', '123.125.114.144', 70, 0)
"""
os_name = os.name
# 根据系统平台设置 ping 命令
if os_name == 'nt': # windows
cmd = 'ping ' + host
else: # unix/linux
cmd = 'ping -c 4 ' + host
# 执行 ping 命令
sub = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, shell=True)
out = sub.communicate()[0]
if not out:
return host, '0.0.0.0', 0, 100
# 替换换行符,因为在正则表达式中
# 'a$' 匹配 'a\r\n' 中的 'a\r'
text = out.replace('\r\n', '\n').replace('\r', '\n')
# 使用正则匹配 ip 地址: [192.168.1.1] (192.168.1.1)
ip = re.findall(r'(?<=\(|\[)\d+\.\d+\.\d+\.\d+(?=\)|\])', text)
# 获取时间信息
if os_name == 'nt':
time = re.findall(r'\d+(?=ms$)', text)
else:
time = re.findall(r'(?<=\d/)[\d\.]+(?=/)', text)
# 丢包率
lost = re.findall(r'\d+(?=%)', text)
ip = ip[0] if ip else '0.0.0.0'
# 小数点四舍五入
time = int(round(float(time[0]))) if time else 0
lost = int(round(float(lost[0]))) if lost else 0
return host, ip, time, lost
def get_hosts(filename):
"""从文件中读取 ip/域名
返回 ip/域名 列表,默认值为空
"""
hosts = []
with open(filename) as f:
for line in f:
line = line.strip().strip('.,/')
if line:
hosts.append(line)
return hosts
if __name__ == '__main__':
# 处理命令行参数
argvs = sys.argv
leng = len(argvs)
hosts = []
filename = 'hosts.txt'
add = False
ips = []
# 如果包含命令参数
if leng >= 2:
name = argvs[1]
# 第一个参数是个文件
if os.path.isfile(name):
filename = name
if leng > 2:
# 第二个参数是 +
if argvs[2] == '+':
add = True
ips = argvs[3:]
else:
ips = argvs[2:]
else:
if name == '+':
add = True
ips = argvs[2:]
else:
ips = argvs[1:]
# 处理 ip/域名参数
if ips:
for s in ips:
name = s.strip('.,/')
name = re.sub(r'https?://', '', name)
hosts.append(name)
# 既没有 ip/域名参数,也没有文件参数同时默认文件也不存在
if not hosts and not os.path.isfile(filename):
sys.exit('No ip/host or the file("%s") not existed!' % (filename))
if not hosts: # 如果没有参数,使用默认文件
# 移除重复项
hosts = list(set(get_hosts(filename)))
else:
hosts = list(set(hosts))
# 如果包含 + 参数,合并 ip/域名信息
if add:
hosts = list(set(get_hosts(filename) + hosts))
if not hosts: # 如果最终都没获取到 ip/域名 信息
sys.exit('Not find ip/host')
result_time = {}
print '#' * 55
# 固定字符串长度
print 'host(ip)'.rjust(33), 'time'.rjust(8), 'lost'.rjust(8)
for x in hosts:
host, ip, time, lost = ping(x)
result_time.update({host: time})
if time == 0:
lost = 100
print ('%s(%s): ' % (host, ip)).rjust(35),\
('% 3sms' % (time)).rjust(6),\
('% 2s%%' % (lost)).rjust(8)
times = sorted(result_time.itervalues())
# 去除 time 为0的
times = [i for i in times[:] if i]
print '#' * 55
if times:
for k, v in result_time.iteritems():
if v == times[0]:
print '%s has min ping time: %s ms' % (k, v)
转自:http://www.cnblogs.com/taosim/articles/3134405.html