前言
因为项目原因选择了gearman作为任务委派的中间件,但原生的python拓展包只支持单进程,期间为了将gearman改造成自适应多进程的方式在实现方式上走进了些误区,故在此记录这些误区的坑以及目前的最优解决方案。
实现思路
实现方式
-
主进程接收任务,子进程处理任务。以一个主进程作为任务委派的接收进程,接收到任务后将任务分派给子进程进行处理,处理完成后由该子进程直接返回任务结果给gearman。
-
多进程接收并处理任务。批量fork多个子进程注册任务,子进程间互不影响,各自完成接收、处理任务的过程。
先说说第一种实现方式的优缺点
优点:
-
由于worker较多的时间是消耗在等待接收请求上,因此主进程只单一的进行轮训任务接收可以提高单条gearman请求通道的利用率。
-
由子进程直接返回任务结果可分离主进程与子进程的作业,主进程无需关心任务的结果而只专注于接收任务。
缺点:
-
主进程接收到任务请求后将请求转发给子进程处理任务,由于子进程处理任务完成后需要将任务结果返回给gearman,因此子进程需要将该任务请求对应的gearman socket传递给子进程,而该过程实现起来过于复杂。(通常具有socket的实例无法通过pickle传递给子进程,虽然Unix的sendmsg可以用来传递socket,但将传递的socket构造成一个GearmanWorker又是另外一件痛苦的事情)
-
子进程通过传递的socket构造出GearmanWorker后,由于原socket的句柄仍被父进程持有,所以在等待结果的任务请求方无法收到子进程所返回的处理结果。
再来说说第二种实现方式的优缺点
优点:
-
等价于fork多个原进程,逻辑、作业方式均无改变。
-
可在fork子进程之前完成公有资源的加载而无需每个GearmanWorker匀加载一次。
缺点:
-
子进程异常退出后主进程无法正确感知,虽然主进程会维持相同的子进程数,但是异常退出所重启的子进程没有正确注册到gearman接收任务。
-
主进程异常退出后子进程无法感知,将导致出现僵尸进程。
选择第二种方案,针对缺点的解决办法
-
利用PID文件记录每个子进程的pid,确保主进程退出后仍能通过PID文件退出子进程。
-
利用Redis的发布订阅模式实现GearmanWorker的正常退出。
client 端代码: test_multi_gearman_worker.py
-
#!/usr/bin/env python
-
from gearman import GearmanClient
-
-
gearman_client = GearmanClient(['10.10.13.5:4730'])
-
-
-
new_jobs = [
-
dict(task='test_multi_gearman_worker', data='job-0.mp4'),
-
dict(task='test_multi_gearman_worker', data='job-1.mp4'),
-
dict(task='test_multi_gearman_worker', data='job-2.mp4'),
-
dict(task='test_multi_gearman_worker', data='job-3.mp4'),
-
dict(task='test_multi_gearman_worker', data='job-4.mp4'),
-
dict(task='test_multi_gearman_worker', data='job-5.mp4'),
-
dict(task='test_multi_gearman_worker', data='job-6.mp4'),
-
dict(task='test_multi_gearman_worker', data='job-7.mp4'),
-
dict(task='test_multi_gearman_worker', data='job-8.mp4'),
-
dict(task='test_multi_gearman_worker', data='job-9.mp4'),
-
dict(task='test_multi_gearman_worker', data='job-10.mp4'),
-
]
-
-
-
completed_requests = gearman_client.submit_multiple_jobs(new_jobs)
-
for current_request in completed_requests:
-
print (current_request.result)
-
-
-
print ("Game over!")
worker端代码: multi-gearman-worker.py
-
#!/usr/bin/env python
-
-
import os
-
import string
-
import signal
-
import threading
-
import subprocess
-
import multiprocessing
-
-
import redis
-
from gearman.worker import GearmanWorker, POLL_TIMEOUT_IN_SECONDS
-
-
WORKER_PROCESS_PID = '/tmp/multi_gearman_worker.pid'
-
-
-
class MultiGearmanWorker(GearmanWorker):
-
""" multi-process gearman worker """
-
def __init__(self, host_list=None, redis_host=None, redis_port=None, pid=WORKER_PROCESS_PID):
-
super(MultiGearmanWorker, self).__init__(host_list=host_list)
-
self.redis_host = redis_host
-
self.redis_port = redis_port
-
self.pid = pid
-
-
def work(self, poll_timeout=POLL_TIMEOUT_IN_SECONDS, process=multiprocessing.cpu_count()):
-
"""
-
start working, blocking process first
-
:param poll_timeout: int , the connection time of gearman
-
:param process: int , the number of process for working , the default is the number of cpu-core
-
:return:
-
"""
-
print('Clear last process.')
-
self.gearman_worker_exit()
-
print('Ready to start %d process for work.' % process)
-
gm_poll = multiprocessing.Pool(process)
-
for x in range(0, process):
-
print('start %d child process', x)
-
gm_poll.apply_async(gearman_work, (self, poll_timeout, self.pid))
-
-
gm_poll.close()
-
gm_poll.join()
-
-
# Delete the PID file if all sub-process exit normally
-
if os.path.isfile(self.pid):
-
os.remove(self.pid)
-
-
print('Multi gearman worker exit.')
-
-
-
def gearman_worker_exit(self):
-
""" Terminate sub-process """
-
if not os.path.isfile(self.pid):
-
return True
-
-
with open(self.pid, 'r+') as f:
-
for pid in f.readlines():
-
pid = int(pid)
-
try:
-
os.kill(pid, signal.SIGKILL)
-
print('Kill process %d.' % pid)
-
except OSError:
-
print('Process %d not exists' % pid)
-
continue
-
-
os.remove(self.pid)
-
print('Remove process pid file.')
-
return True
-
-
-
#The gearman job switch identifier used by the child process
-
GEARMAN_CONTINUE_WORK = True
-
-
-
def gearman_work(gm_worker, poll_timeout=POLL_TIMEOUT_IN_SECONDS, pid=WORKER_PROCESS_PID):
-
""" Open gearman;s worker in multiple processes """
-
try:
-
""" Record the child process pid,
-
so that the main process is cleared by the supervisor
-
to clear the child process did not exit the last time
-
"""
-
with open(pid, 'a+') as f:
-
f.write("%d%s" % (os.getpid(), os.linesep))
-
-
print('Child process start for work.')
-
continue_working = True
-
worker_connections = []
-
d = threading.Thread(name='monitor', target=gearman_monitor,
-
args=(gm_worker.redis_host, gm_worker.redis_port))
-
d.start()
-
-
def continue_while_connections_alive(any_activity):
-
return gm_worker.after_poll(any_activity)
-
-
# Shuffle our connections after the poll timeout
-
while continue_working and GEARMAN_CONTINUE_WORK:
-
worker_connections = gm_worker.establish_worker_connections()
-
continue_working = gm_worker.poll_connections_until_stopped(
-
worker_connections, continue_while_connections_alive, timeout=poll_timeout)
-
-
# If we were kicked out of the worker loop, we should shutdown all our connections
-
for current_connection in worker_connections:
-
current_connection.close()
-
-
print('Gearman worker closed')
-
return None
-
except Exception as e:
-
print(e)
-
-
-
def gearman_monitor(redis_host, redis_port):
-
""" Listen to dynamic update instructions """
-
global GEARMAN_CONTINUE_WORK
-
print('Start gearman monitor.')
-
while GEARMAN_CONTINUE_WORK:
-
"""To prevent abnormal operation caused by the thread is not monitoring the redis response after hanging up,
-
exception handling on here, after an exception re-listen
-
"""
-
try:
-
sub = redis.StrictRedis(redis_host, redis_port).pubsub()
-
sub.subscribe('hot')
-
for i in sub.listen():
-
if isinstance(i.get('data'), str):
-
if i.get('data') == 'exit':
-
# worker???????????????????
-
print('Gearman monitor receive restart signal.')
-
GEARMAN_CONTINUE_WORK = False
-
sub.unsubscribe('hot')
-
break
-
# ????????,?????????gearman worker????????
-
-
except Exception as e:
-
print(e)
-
try:
-
sub.unsubscribe('hot')
-
except Exception:
-
pass
-
-
print('Gearman monitor closed')
-
-
-
if __name__ == '__main__':
-
def test_multi_gearman_worker(worker, job):
-
print('who ', worker)
-
print('do what ', job.data)
-
print('Game over! ', job.data)
-
return job.data
-
-
-
-
-
gearman_worker = MultiGearmanWorker(('10.10.13.5:4730', ), '10.10.13.8', 6379)
-
gearman_worker.register_task('test_multi_gearman_worker', test_multi_gearman_worker)
-
gearman_worker.work(POLL_TIMEOUT_IN_SECONDS, 5)
注:
1. 需要先有安装好的redis;
2. 需要先有安装好的gearman
阅读(1463) | 评论(0) | 转发(0) |