Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1178172
  • 博文数量: 252
  • 博客积分: 5421
  • 博客等级: 大校
  • 技术积分: 2418
  • 用 户 组: 普通用户
  • 注册时间: 2007-06-17 12:59
文章分类

全部博文(252)

文章存档

2017年(3)

2016年(18)

2015年(31)

2014年(18)

2013年(7)

2012年(8)

2011年(12)

2010年(30)

2009年(32)

2008年(57)

2007年(36)

分类: PHP

2015-08-21 17:59:07


  1. <?php
  2. /**
  3.  * 多进程形式的server.
  4.  * @package thrift.server
  5.  * @author flynetcn
  6.  */
  7. namespace Thrift\Server;

  8. use Thrift\Server\TServer;
  9. use Thrift\Transport\TTransport;
  10. use Thrift\Exception\TException;
  11. use Thrift\Exception\TTransportException;

  12. class TMultiProcessServer extends TServer
  13. {
  14.     /**
  15.      * 捕获的信号编号
  16.      */
  17.     static $catchQuitSignal = 0;

  18.     /**
  19.      * worker进程数量
  20.      */
  21.     private $workProcessNum = 4;

  22.     /**
  23.      * 每个worker进程处理的最大请求数
  24.      */
  25.     private $maxWorkRequestNum = 2000;

  26.     /**
  27.      * 当前worker进程已处理的请求数
  28.      */
  29.     private $currentWorkRequestNum = 0;
  30.     
  31.     /**
  32.      * 当前连接调用次数
  33.      */
  34.     private $currentConnectCallNum = 0;

  35.     /**
  36.      * 发送超时
  37.      */
  38.     private $sendTimeoutSec = 1;

  39.     /**
  40.      * 接收超时
  41.      */
  42.     private $recvTimeoutSec = 1;

  43.     /**
  44.      * 当前进程pid
  45.      */
  46.     private $pid = 0;

  47.     /**
  48.      * Flag for the main serving loop
  49.      */
  50.     private $stop_ = false;

  51.     /**
  52.      * List of children.
  53.      */
  54.     protected $childrens = array();

  55.     /**
  56.      * 服务器日志文件
  57.      */
  58.     protected static $logFiles;
  59.     protected static $pidFile;

  60.     /**
  61.      * run
  62.      */
  63.     public function serve($daemon=false, array $config=array())
  64.     {
  65.         if (isset($config['workProcessNum'])) {
  66.             $this->workProcessNum = intval($config['workProcessNum']);
  67.         }
  68.         if ($this->workProcessNum < 1) {
  69.             self::log(1, "child workProcessNum can not be less than 1");
  70.             throw new TException('child workProcessNum can not be less than 1');
  71.         }
  72.         if (isset($config['maxWorkRequestNum'])) {
  73.             $this->maxWorkRequestNum = intval($config['maxWorkRequestNum']);
  74.         }
  75.         if ($this->maxWorkRequestNum < 1) {
  76.             self::log(1, "child maxWorkRequestNum can not be less than 1");
  77.             throw new TException('child maxWorkRequestNum can not be less than 1');
  78.         }
  79.         if (isset($config['sendTimeoutSec'])) {
  80.             $this->sendTimeoutSec = intval($config['sendTimeoutSec']);
  81.         }
  82.         if (isset($config['recvTimeoutSec'])) {
  83.             $this->recvTimeoutSec = intval($config['recvTimeoutSec']);
  84.         }
  85.         if ($daemon) {
  86.             $this->daemon();
  87.             $this->registerSignalHandler();
  88.             self::$logFiles = isset($config['logFiles']) && is_array($config['logFiles']) ? $config['logFiles'] : array();
  89.             self::$pidFile = isset($config['pidFile']) ? $config['pidFile'] : '';
  90.             declare(ticks=3);
  91.         }
  92.         $this->pid = posix_getpid();
  93.         self::createPidFile($this->pid);
  94.         self::log(0, "manage process({$this->pid}) has started");
  95.         $this->transport_->listen();
  96.         while (!$this->stop_) {
  97.             while ($this->workProcessNum > 0) {
  98.                 try {
  99.                     $pid = pcntl_fork();
  100.                     if ($pid > 0) {
  101.                         $this->handleParent($pid, $this->workProcessNum);
  102.                     } else if ($pid === 0) {
  103.                         $this->pid = posix_getpid();
  104.                         $this->handleChild($this->workProcessNum);
  105.                     } else {
  106.                         self::log(1, "Failed to fork");
  107.                         throw new TException('Failed to fork');
  108.                     }
  109.                     $this->workProcessNum--;
  110.                 } catch (Exception $e) {
  111.                 }
  112.             }
  113.             $this->collectChildren();
  114.             sleep(2);
  115.             if (\Thrift\Server\TMultiProcessServer::$catchQuitSignal) {
  116.                 $this->stop();
  117.             }
  118.         }
  119.     }
  120.     
  121.     public function getCurrentWorkRequestNum()
  122.     {
  123.         return $this->currentWorkRequestNum;
  124.     }
  125.     
  126.     public function getCurrentConnectCallNum()
  127.     {
  128.         return $this->currentConnectCallNum;
  129.     }

  130.     /**
  131.      * Code run by the parent
  132.      *
  133.      * @param int $pid
  134.      * @param int $num 进程编号
  135.      * @return void
  136.      */
  137.     private function handleParent($pid, $num)
  138.     {
  139.         $this->childrens[$pid] = $num;
  140.     }

  141.     /**
  142.      * Code run by the child.
  143.      *
  144.      * @param int $num 进程编号
  145.      * @return void
  146.      */
  147.     private function handleChild($num)
  148.     {
  149.         self::log(0, "child process($this->pid) has started");
  150.         $this->childrens = array();
  151.         while (!$this->stop_) {
  152.             try {
  153.                 $transport = $this->transport_->accept();
  154.                 if ($transport != null) {
  155.                     $transport->setSendTimeout($this->sendTimeoutSec * 1000);
  156.                     $transport->setRecvTimeout($this->recvTimeoutSec * 1000);
  157.                     $this->currentWorkRequestNum++;
  158.                     $this->currentConnectCallNum = 0;
  159.                     $inputTransport = $this->inputTransportFactory_->getTransport($transport);
  160.                     $outputTransport = $this->outputTransportFactory_->getTransport($transport);
  161.                     $inputProtocol = $this->inputProtocolFactory_->getProtocol($inputTransport);
  162.                     $outputProtocol = $this->outputProtocolFactory_->getProtocol($outputTransport);
  163.                     while ($this->processor_->process($inputProtocol, $outputProtocol)) {
  164.                         $this->currentConnectCallNum++;
  165.                     }
  166.                     @$transport->close();
  167.                 }
  168.             } catch (TTransportException $e) {
  169.             } catch (Exception $e) {
  170.                 self::log(1, $e->getMessage().'('.$e->getCode().')');
  171.             }
  172.             if (\Thrift\Server\TMultiProcessServer::$catchQuitSignal) {
  173.                 $this->stop();
  174.             }
  175.             if ($this->currentWorkRequestNum >= $this->maxWorkRequestNum) {
  176.                 self::log(0, "child process($this->pid) has processe {$this->currentWorkRequestNum} requests will be exit");
  177.                 $this->stop();
  178.                 break;
  179.             }
  180.         }
  181.         exit(0);
  182.     }

  183.     /**
  184.      * Collects any children we may have
  185.      *
  186.      * @return void
  187.      */
  188.     private function collectChildren()
  189.     {
  190.         foreach ($this->childrens as $pid => $num) {
  191.             if (pcntl_waitpid($pid, $status, WNOHANG) > 0) {
  192.                 unset($this->childrens[$pid]);
  193.                 $this->workProcessNum++;
  194.             }
  195.         }
  196.     }

  197.     /**
  198.      * @return void
  199.      */
  200.     public function stop()
  201.     {
  202.         $this->transport_->close();
  203.         $this->stop_ = true;
  204.         foreach ($this->childrens as $pid => $num) {
  205.             if (!posix_kill($pid, SIGTERM)) {
  206.             }
  207.         }
  208.     }

  209.     /**
  210.      * 附加信号处理
  211.      */
  212.     public static function sig_handler($signo)
  213.     {
  214.         switch ($signo) {
  215.             case SIGTERM:
  216.             case SIGHUP:
  217.             case SIGQUIT:
  218.             case SIGTSTP:
  219.                 $pid = posix_getpid();
  220.                 self::log(0, "process($pid) catch signo: $signo");
  221.                 \Thrift\Server\TMultiProcessServer::$catchQuitSignal = $signo;
  222.                 break;
  223.             default:
  224.         }
  225.     }

  226.     /**
  227.      * 附加信号处理
  228.      */
  229.     private function registerSignalHandler()
  230.     {
  231.         pcntl_signal(SIGTERM, '\Thrift\Server\TMultiProcessServer::sig_handler');
  232.         pcntl_signal(SIGHUP, '\Thrift\Server\TMultiProcessServer::sig_handler');
  233.         pcntl_signal(SIGQUIT, '\Thrift\Server\TMultiProcessServer::sig_handler');
  234.         pcntl_signal(SIGTSTP, '\Thrift\Server\TMultiProcessServer::sig_handler');
  235.         declare(ticks=3);
  236.     }

  237.     /**
  238.      * 附加守护进程方式
  239.      */
  240.     private function daemon()
  241.     {
  242.         if (!function_exists('posix_setsid')) {
  243.             return;
  244.         }
  245.         if (($pid1 = pcntl_fork()) != 0) {
  246.             exit;
  247.         }
  248.         posix_setsid();
  249.         if (($pid2 = pcntl_fork()) != 0) {
  250.             exit;
  251.         }
  252.     }

  253.     public static function log($type, $msg)
  254.     {
  255.         static $fds;
  256.         $msg = date('Y-m-d H:i:s')." $type {$msg}\n";
  257.         if (isset(self::$logFiles[$type]) && self::$logFiles[$type]) {
  258.             if (file_exists(self::$logFiles[$type])) {
  259.                 if (empty($fds[$type])) {
  260.                     $fds[$type] = fopen(self::$logFiles[$type], 'a');
  261.                 }
  262.                 if (!$fds[$type]) {
  263.                     $fds[$type] = fopen('php://stdout', 'w');
  264.                     fwrite($fds[$type], date('Y-m-d H:i:s')." WARNING fopen(".self::$logFiles[$type].") failed\n");
  265.                 }
  266.             } else {
  267.                 if (!is_dir(dirname(self::$logFiles[$type])) && !mkdir(dirname(self::$logFiles[$type]), 0755, true)) {
  268.                     $fds[$type] = fopen('php://stdout', 'w');
  269.                     fwrite($fds[$type], date('Y-m-d H:i:s')." WARNING mkdir(".self::$logFiles[$type].") failed\n");
  270.                 } elseif (!($fds[$type] = fopen(self::$logFiles[$type], 'a'))) {
  271.                     $fds[$type] = fopen('php://stdout', 'w');
  272.                     fwrite($fds[$type], date('Y-m-d H:i:s')." WARNING fopen(".self::$logFiles[$type].") failed\n");
  273.                 }
  274.             }
  275.         } else {
  276.             $fds[$type] = fopen('php://stdout', 'w');
  277.         }
  278.         $ret = fwrite($fds[$type], $msg);
  279.         if (!$ret && self::$logFiles[$type]) {
  280.             fclose($fds[$type]);
  281.             $fds[$type] = fopen(self::$logFiles[$type], 'a');
  282.             $ret = fwrite($fds[$type], $msg);
  283.         }
  284.         return true;
  285.     }

  286.     public static function createPidFile($pid=0)
  287.     {
  288.         if (!$pid) {
  289.             $pid = posix_getpid();
  290.         }
  291.         if (file_exists(self::$pidFile)) {
  292.             $fd = fopen(self::$pidFile, 'w');
  293.             if (!$fd) {
  294.                 self::log(1, "fopen(".self::$pidFile.") failed");
  295.                 return false;
  296.             }
  297.         } else {
  298.             if (!is_dir(dirname(self::$pidFile)) && !mkdir(dirname(self::$pidFile), 0755, true)) {
  299.                 self::log(1, "mkdir(".self::$pidFile.") failed");
  300.                 return false;
  301.             } elseif (!($fd = fopen(self::$pidFile, 'w'))) {
  302.                 self::log(1, "fopen(".self::$pidFile.") failed");
  303.                 return false;
  304.             }
  305.         }
  306.         if (!fwrite($fd, "$pid")) {
  307.             self::log(1, "fwrite(".self::$pidFile.",$pid) failed");
  308.             return false;
  309.         }
  310.         fclose($fd);
  311.         return true;
  312.     }
  313. }


阅读(727) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~