from: http://www.cnblogs.com/niniwzw/archive/2010/01/18/1651082.html
PHP多线程编程(一)
虽然PHP 中,多线程用的比较的少。但是毕竟可能是会用到了。我最近就遇到这样一个问题,用户提交几百个url以后,要读出这个url 中的标题。
当然,你不希望用户等待的太久,10s 钟应该给出个答案。但是,本身,你要获取一个url 的标题,少的要 0.1s ,多的要好几秒。
显然,采用单个线程的方式是不行的。
我的第一个设计方案是这样的:
1. 用我前面提供的代码提供一个简单的服务器: http://www.cnblogs.com/niniwzw/archive/2009/09/27/1575002.html
这个服务器的作用是:提供一个url,然后,就读取标题。这里,可以每次读128个字节,看看有没有读到title,如果读到title了就停止读了。
这样可以省带宽。
2. 在客户端,同时打开1百个 socket ,访问这个服务器。如果提供的url数目超过100,那么就多次运行。
这个方案,基本上能够满足要求,读比较快的网页如:google.com 100次,也只要1s 左右。但是,通过测试,发现,有一定
的概率在打开链接的时候被阻塞。(有时候会阻塞个1s左右,然后继续往下open)可能打开了太多的链接了,会出很大的问题。
当然,这是一个很差的解决方案:建立tcp 链接本身的消耗非常的大。因为可靠有序传输的要求,要维持一个数据结构,而且,系统还要开辟一定的缓存给客户端和服务器端,
用户缓存数据。如果建立上百个链接,就可能占用很大的内存。作为一个系统的服务,应该尽量的简单,就是,我叫你做什么事情,你做好以后,结果给我就可以了。
一般来说,PHP要进行多线程编程,比较常见的是:
1. 要进行大量的网络耗时的操作
2. 要做大量的运算,并且,系统有多个cpu,为了让用户有更快的体验,把一个任务,分成几个小任务,最后合并。
所以,应该尽量不要在调用的地方有太多复杂的逻辑,把逻辑内置在服务中。
我的第二个设计方案是这样的:
同样用上面的服务器,只是,这个服务器功能变了,接收不超过100个的url,然后打开100个子线程,下载title。最后合并,返回给客户端。
具体怎么编写这个服务器,在下一个部分讲。
这个一测试,发现效率高了很多。而且也十分的稳定。下载一百下google 大概 0.7s。基本上不会超过1s,而原来的那个方案,经常超过5s(20%的可能性)
当然,如果这样的设计方案只是一个很简单的解决方案。如果有很多人使用你的服务的情况下,肯定不能这样做。
PHP做企业级别的开发,一个比较复杂的问题,就是多线程怎么处理。还有就是往往采用数组 会引起内存急剧膨胀。一般,数组处理10万条数据已经是极限,
在小网站开发很少会用到一次读取如此大的数据量,要是遇到了,最好通过C 扩展进行解决,否则,一次会损耗 几百M 的内存,10个人用就拖死你。
PHP多线程编程(二)管道通信
一个线程如果是个人英雄主义,那么多线程就是集体主义。(不严格区分多进程 和 多线程的差别)
你不再是一个独行侠,而是一个指挥家。
独来独往,非常自由自在,但是,很多时候,不如众人拾柴火焰高。
这就是我对多线程的理解。多线程编程的主要问题是:通信 和 同步问题。
更多PHP 多线程编程的背景知识见:
PHP多线程编程(一)
在PHP 中,如果光用pcntl ,实现比较简单的通信问题都是很困难的。
下面介绍管道通信:
1. 管道可以认为是一个队列,不同的线程都可以往里面写东西,也都可以从里面读东西。写就是
在队列末尾添加,读就是在队头删除。
2. 管道一般有大小,默认一般是4K,也就是内容超过4K了,你就只能读,不能往里面写了。
3. 默认情况下,管道写入以后,就会被阻止,直到读取他的程序读取把数据读完。而读取线程也会被阻止,
直到有进程向管道写入数据。当然,你可以改变这样的默认属性,用stream_set_block 函数,设置成非阻断模式。
下面是我分装的一个管道的类(这个类命名有问题,没有统一,没有时间改成统一的了,我一般先写测试代码,最后分装,所以命名上可能不统一):
- <?php
- class Pipe
- {
- public $fifoPath;
- private $w_pipe;
- private $r_pipe;
- /**
- * 自动创建一个管道
- *
- * @param string $name 管道名字
- * @param int $mode 管道的权限,默认任何用户组可以读写
- */
- function __construct($name = 'pipe', $mode = 0666)
- {
- $fifoPath = "/tmp/$name." . posix_getpid();
- if (!file_exists($fifoPath)) {
- if (!posix_mkfifo($fifoPath, $mode)) {
- error("create new pipe ($name) error.");
- return false;
- }
- } else {
- error( "pipe ($name) has exit.");
- return false;
- }
- $this->fifoPath = $fifoPath;
- }
-
- ///////////////////////////////////////////////////
- // 写管道函数开始
- ///////////////////////////////////////////////////
- function open_write()
- {
- $this->w_pipe = fopen($this->fifoPath, 'w');
- if ($this->w_pipe == NULL) {
- error("open pipe {$this->fifoPath} for write error.");
- return false;
- }
- return true;
- }
- function write($data)
- {
- return fwrite($this->w_pipe, $data);
- }
- function write_all($data)
- {
- $w_pipe = fopen($this->fifoPath, 'w');
- fwrite($w_pipe, $data);
- fclose($w_pipe);
- }
- function close_write()
- {
- return fclose($this->w_pipe);
- }
- /////////////////////////////////////////////////////////
- /// 读管道相关函数开始
- ////////////////////////////////////////////////////////
- function open_read()
- {
- $this->r_pipe = fopen($this->fifoPath, 'r');
- if ($this->r_pipe == NULL) {
- error("open pipe {$this->fifoPath} for read error.");
- return false;
- }
- return true;
- }
- function read($byte = 1024)
- {
- return fread($this->r_pipe, $byte);
- }
- function read_all()
- {
- $r_pipe = fopen($this->fifoPath, 'r');
- $data = '';
- while (!feof($r_pipe)) {
- //echo "read one K\n";
- $data .= fread($r_pipe, 1024);
- }
- fclose($r_pipe);
- return $data;
- }
- function close_read()
- {
- return fclose($this->r_pipe);
- }
- ////////////////////////////////////////////////////
- /**
- * 删除管道
- *
- * @return boolean is success
- */
- function rm_pipe()
- {
- return unlink($this->fifoPath);
- }
- }
- ?>
有了这个类,就可以实现简单的管道通信了,因为这个教程是多线程编程系列教程的一个部分。
这个管道类的应用部分,将放到第三部分。
PHP多线程编程(三)多线程抓取网页的演示
要理解这个部分的代码,请阅读:
用 Socket 和 Pcntl 实现一个多线程服务器(一)
PHP多线程编程(一)
PHP多线程编程(二)管道通信
我们知道,从父进程到子经常的数据传递相对比较容易一些,但是从子进程传递到父进程就比较的困难。
有很多办法实现进程交互,在php中比较方便的是 管道通信。当然,还可以通过 socket_pair 进行通信。
首先是服务器为了应对每一个请求要做的事情(发送一个url 序列,url序列用\t 分割。而结束标记是 \n)
- function clientHandle($msgsock, $obj)
- {
- $nbuf = '';
- socket_set_block($msgsock);
- do {
- if (false === ($buf = @socket_read($msgsock, 2048, PHP_NORMAL_READ))) {
- $obj->error("socket_read() failed: reason: " . socket_strerror(socket_last_error($msgsock)));
- break;
- }
- $nbuf .= $buf;
- if (substr($nbuf, -1) != "\n") {
- continue;
- }
- $nbuf = trim($nbuf);
- if ($nbuf == 'quit') {
- break;
- }
- if ($nbuf == 'shutdown') {
- break;
- }
- $url = explode("\t", $nbuf);
- $nbuf = '';
- $talkback = serialize(read_ntitle($url));
- socket_write($msgsock, $talkback, strlen($talkback));
- debug("write to the client\n");
- break;
- } while (true);
- }
上面代码比较关键的一个部分是 read_ntitle,这个函数实现多线程的读取标题。
代码如下:(为每一个url fork 一个线程,然后打开管道 ,读取到的标题写入到管道里面去,主线程一直的在读取管道数据,直到所有的数据读取完毕,最后删除管道)
- function read_ntitle($arr)
- {
- $pipe = new Pipe("multi-read");
- foreach ($arr as $k => $item)
- {
- $pids[$k] = pcntl_fork();
- if(!$pids[$k])
- {
- $pipe->open_write();
- $pid = posix_getpid();
- $content = base64_encode(read_title($item));
- $pipe->write("$k,$content\n");
- $pipe->close_write();
- debug("$k: write success!\n");
- exit;
- }
- }
- debug("read begin!\n");
- $data = $pipe->read_all();
- debug("read end!\n");
- $pipe->rm_pipe();
- return parse_data($data);
- }
- parse_data 代码如下,非常的简单,就不说了。
- function parse_data($data)
- {
- $data = explode("\n", $data);
- $new = array();
- foreach ($data as $value)
- {
- $value = explode(",", $value);
- if (count($value) == 2) {
- $value[1] = base64_decode($value[1]);
- $new[intval($value[0])] = $value[1];
- }
- }
- ksort($new, SORT_NUMERIC);
- return $new;
- }
上面代码中,还有一个函数read_title 比较有技巧。为了兼容性,我没有采用curl,而是直接采用socket 通信。
在下载到 title 标签后,就停止读取内容,以节省时间。代码如下:
- function read_title($url)
- {
- $url_info = parse_url($url);
- if (!isset($url_info['host']) || !isset($url_info['scheme'])) {
- return false;
- }
- $host = $url_info['host'];
-
- $port = isset($url_info['port']) ? $url_info['port'] : null;
- $path = isset($url_info['path']) ? $url_info['path'] : "/";
- if(isset($url_info['query'])) $path .= "?".$url_info['query'];
- if(empty($port)){
- $port = 80;
- }
- if ($url_info['scheme'] == 'https'){
- $port = 443;
- }
- if ($url_info['scheme'] == 'http') {
- $port = 80;
- }
- $out = "GET $path HTTP/1.1\r\n";
- $out .= "Host: $host\r\n";
- $out .= "User-Agent: Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN; rv:1.9.1.7)\r\n";
- $out .= "Connection: Close\r\n\r\n";
- $fp = fsockopen($host, $port, $errno, $errstr, 5);
- if ($fp == NULL) {
- error("get title from $url, error. $errno: $errstr \n");
- return false;
- }
- fwrite($fp, $out);
- $content = '';
- while (!feof($fp)) {
- $content .= fgets($fp, 1024);
- if (preg_match("/(.*?)<\/title>/is", $content, $matches)) {
- fclose($fp);
- return encode_to_utf8($matches[1]);
- }
- }
- fclose($fp);
- return false;
- }
- function encode_to_utf8($string)
- {
- return mb_convert_encoding($string, "UTF-8", mb_detect_encoding($string, "UTF-8, GB2312, ISO-8859-1", true));
- }
这里,我只是检测了 三种最常见的编码。
其他的代码都很简单,这些代码都是测试用的,如果你要做这样一个服务器,一定要进行优化处理。特别是,要防止一次打开太多的线程,你要做更多的处理。
很多时候,我们抱怨php 不支持多线程,实际上,php是支持多线程的。当然,没有那么多的进程通信的选项,而多线程的核心就在于线程的通信与同步。
在web开发中,这样的多线程基本上是不会使用的,因为有很严重的性能问题。要实现比较简单的多线程,高负载,必须借助其扩展。
PHP多进程(四) 内部多进程
上面一个系列的教程:
用 Socket 和 Pcntl 实现一个多进程服务器(一)
PHP多进程编程(一)
PHP多进程编程(二)管道通信
PHP多进程编程(三)多进程抓取网页的演示
说的都是只兼容unix 服务器的多进程,下面来讲讲在window 和 unix 都兼容的多进程(这里是泛指,下面的curl实际上是通过IO复用实现的)。
通过扩展实现多线程的典型例子是CURL,CURL 支持多线程的抓取网页的功能。
这部分过于抽象,所以,我先给出一个CURL并行抓取多个网页内容的一个分装类。这个类实际上很实用,
详细分析这些函数的内部实现将在下一个教程里面描述。
你可能不能很好的理解这个类,而且,php curl 官方主页上都有很多错误的例子,在讲述了其内部机制
后,你就能够明白了。
先看代码:
- <?php
- class Http_MultiRequest
- {
- //要并行抓取的url 列表
- private $urls = array();
- //curl 的选项
- private $options;
-
- //构造函数
- function __construct($options = array())
- {
- $this->setOptions($options);
- }
- //设置url 列表
- function setUrls($urls)
- {
- $this->urls = $urls;
- return $this;
- }
- //设置选项
- function setOptions($options)
- {
- $options[CURLOPT_RETURNTRANSFER] = 1;
- if (isset($options['HTTP_POST']))
- {
- curl_setopt($ch, CURLOPT_POST, 1);
- curl_setopt($ch, CURLOPT_POSTFIELDS, $options['HTTP_POST']);
- unset($options['HTTP_POST']);
- }
- if (!isset($options[CURLOPT_USERAGENT]))
- {
- $options[CURLOPT_USERAGENT] = 'Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1;)';
- }
- if (!isset($options[CURLOPT_FOLLOWLOCATION]))
- {
- $options[CURLOPT_FOLLOWLOCATION] = 1;
- }
- if (!isset($options[CURLOPT_HEADER]))
- {
- $options[CURLOPT_HEADER] = 0;
- }
- $this->options = $options;
- }
- //并行抓取所有的内容
- function exec()
- {
- if(empty($this->urls) || !is_array($this->urls))
- {
- return false;
- }
- $curl = $data = array();
- $mh = curl_multi_init();
- foreach($this->urls as $k => $v)
- {
- $curl[$k] = $this->addHandle($mh, $v);
- }
- $this->execMulitHandle($mh);
- foreach($this->urls as $k => $v)
- {
- $data[$k] = curl_multi_getcontent($curl[$k]);
- curl_multi_remove_handle($mh, $curl[$k]);
- }
- curl_multi_close($mh);
- return $data;
- }
-
- //只抓取一个网页的内容。
- function execOne($url)
- {
- if (empty($url)) {
- return false;
- }
- $ch = curl_init($url);
- $this->setOneOption($ch);
- $content = curl_exec($ch);
- curl_close($ch);
- return $content;
- }
-
- //内部函数,设置某个handle 的选项
- private function setOneOption($ch)
- {
- curl_setopt_array($ch, $this->options);
- }
- //添加一个新的并行抓取 handle
- private function addHandle($mh, $url)
- {
- $ch = curl_init($url);
- $this->setOneOption($ch);
- curl_multi_add_handle($mh, $ch);
- return $ch;
- }
- //并行执行(这样的写法是一个常见的错误,我这里还是采用这样的写法,这个写法
- //下载一个小文件都可能导致cup占用100%, 并且,这个循环会运行10万次以上
- //这是一个典型的不懂原理产生的错误。这个错误在PHP官方的文档上都相当的常见。)
- private function execMulitHandle($mh)
- {
- $running = null;
- do {
- curl_multi_exec($mh, $running);
- } while ($running > 0);
- }
- }
看最后一个注释最多的函数,这个错误在平时调试的时候可能不太容易发现,因为程序完全正常,但是,在生产服务器下,马上会引起崩溃效果。
解释为什么不能这样,必须从C 语言内部实现的角度来分析。这个部分将放到下一个教程(PHP高级编程之--单线程实现并行抓取网页 )。不过不是通过C语言来表述原理,而是通过PHP
这个类,实际上也就很简单的实现了前面我们费了4个教程的篇幅,并且是九牛二虎之力才实现的多线程的抓取网页的功能。在纯PHP的实现下,我们只能用一个后台服务的方式来比较好的实现,但是当你使用 操作系统接口语言 C 语言时候,这个实现当然就更加的简单,灵活,高效。
就同时抓取几个网页这样一件简单的事情,实际上在底层涉及到了很多东西,对很多半路出家的PHP程序员,可能不喜欢谈多线程这个东西,深入了就涉及到操作系统,浅点说就是并行运行好几个“程序”。但是,很多时候,多线程必不可少,比如要写个快点的爬虫,往往就会浪费九牛二虎之力。不过,PHP的程序员现在应该感谢CURL 这个扩展,这样,你完全不需要用你不太精通的 python 去写爬虫了,对于一个中型大小的爬虫,有这个内部多线程,就已经足够了。
最后是上面的类的一个测试的例子:
- $urls = array("", "", "", "", "", "", "", "", );
- $m = new Http_MultiRequest();
- $t = microtime(true);
- $m->setUrls($urls);
- //parallel fetch(并行抓取):
- $data = $m->exec();
- $parallel_time = microtime(true) - $t;
- echo $parallel_time . "\n";
- $t = microtime(true);
- //serial fetch(串行抓取):
- foreach ($urls as $url)
- {
- $data[] = $m->execOne($url);
- }
- $serial_time = microtime(true) - $t;
- echo $serial_time . "\n";
阅读(1438) | 评论(0) | 转发(0) |