具体参见:
使用的zkclient 模块是下面这个
安装zookeeper下的python模块
import zookeeper
1.由于python客户端依赖c的客户端所以要先安装c版本的客户端
-
cd zookeeper-3.4.5/src/c
-
./configure
-
make
-
make install
2.测试c版本客户端
# /usr/local/bin/cli_mt localhost:2181
Watcher SESSION_EVENT state = CONNECTED_STATE
Got a new session id: 0x14907459b0700a8
help
create [+[e|s]]
delete
set
get
ls
ls2
sync
exists
wexists
myid
verbose
addauth
quit
prefix the command with the character 'a' to run the command asynchronously.
run the 'verbose' command to toggle verbose logging.
i.e. 'aget /foo' to get /foo asynchronously
ls /
time = 0 msec
/: rc = 0
controller
Roles
brokers
zookeeper
controller_epoch
4.看到以上信息说明c版本的客户端已经安装好了,下面安装python版本客户端
5.下载python扩展包,并且解压:
或者直接安装 pip install zkpython
6.如果找不到Pyhon.h,可能由于没有安装python26-devel
-
32位直接运行
-
yum install python26-devel.i386
7.测试是否成功
-
import zookeeper
cat README
利用zookeeper 更新游戏配置文件
zk_confSrv.py 注册到zookeeper的/Roles/workers/worker000000000x,可以启动多个做热切,建议每个zookeeper机器
上启动一个. 同时会打开8877端口,监听trigger的指令来更新znode配置信息
zk_confTrigger.py 连接zk_confSrv.py的8877,发送配置文件内容的指令,只有整体重写配置的功能
exg: python zk_confTrigger.py 10.14.251.203 'serverlist conf content'
zk_confApp.py 在每个客户端启动,连接zookeeper,监听目录/Applications/GameConf/conf-000000000X,每次zk_confTrigger.py 更新指令数字+1,最大的就是最新的conf配置
发送指令后,zookeeper会通知zk_confApp.py目录更新了,读取最大的配置znode,获得内容。
~
-
# cat zk_confSrv.py
-
#!/usr/bin/env python2.7
-
# -*- coding: UTF-8 -*-
-
-
import logging
-
from os.path import basename, join
-
-
from zkclient import ZKClient, zookeeper, watchmethod
-
-
import os
-
import sys
-
import threading
-
import signal
-
import time
-
-
logging.basicConfig(
-
level = logging.DEBUG,
-
format = "[%(asctime)s] %(levelname)-8s %(message)s"
-
)
-
-
log = logging
-
-
class GJZookeeper(object):
-
-
# ZK_HOST = "localhost:2181"
-
ZK_HOST = "localhost:11470"
-
ROOT = "/Roles"
-
WORKERS_PATH = join(ROOT, "workers")
-
MASTERS_NUM = 1
-
TIMEOUT = 10000
-
-
def __init__(self, verbose = True):
-
self.VERBOSE = verbose
-
self.masters = []
-
self.is_master = False
-
self.path = None
-
-
self.APP_ROOT = "/Applications"
-
self.APP_CONF = join(self.APP_ROOT,"GameConf")
-
-
self.zk = ZKClient(self.ZK_HOST, timeout = self.TIMEOUT)
-
self.say("login zookeeper successfully!")
-
# init
-
self.create_roles_znode()
-
# register
-
self.register()
-
-
def create_roles_znode(self):
-
"""
-
create the zookeeper node if not exist
-
|-Roles
-
|-workers
-
"""
-
nodes = (self.ROOT, self.WORKERS_PATH)
-
for node in nodes:
-
if not self.zk.exists(node):
-
try:
-
self.zk.create(node, "")
-
except:
-
pass
-
-
@property
-
def is_slave(self):
-
return not self.is_master
-
-
def register(self):
-
"""
-
register a node for this worker,znode type : EPHEMERAL | SEQUENCE
-
|-Roles
-
|-workers
-
|-worker000000000x ==>>master
-
|-worker000000000x+1 ==>>worker
-
....
-
"""
-
self.path = self.zk.create(self.WORKERS_PATH + "/worker", "1", flags=zookeeper.EPHEMERAL | zookeeper.SEQUENCE)
-
# self.path = "/Roles/workers/worker"
-
self.path = basename(self.path)
-
self.say("I'm %s" % self.path)
-
# check who is the master
-
self.get_master()
-
-
def get_master(self):
-
"""
-
get children, and check who is the smallest child
-
"""
-
@watchmethod
-
def watcher(event):
-
self.say("child changed, try to get master again.")
-
self.get_master()
-
-
try:
-
children = self.zk.get_children(self.WORKERS_PATH, watcher)
-
except zookeeper.ConnectionLossException:
-
print "losing connection with zookeeper ..."
-
return False
-
except:
-
return False
-
-
children.sort()
-
self.say("%s's children: %s" % (self.WORKERS_PATH, children))
-
-
# check if I'm master
-
self.masters = children[:self.MASTERS_NUM]
-
# self.say("tell me who is master!: %s. path: %s" % (self.masters,self.path))
-
if self.path in self.masters:
-
self.is_master = True
-
self.say("I've become master!")
-
self.create_app_znode()
-
else:
-
self.say("%s is masters, I'm slave" % self.masters)
-
-
def create_app_znode(self):
-
"""
-
create the zookeeper node if not exist
-
|-Applications
-
|-GameConf
-
"""
-
nodes = (self.APP_ROOT, self.APP_CONF)
-
for node in nodes:
-
if not self.zk.exists(node):
-
try:
-
self.say("Create znode [%s] ..."%(node))
-
self.zk.create(node, "")
-
except:
-
pass
-
-
def create_conf_znode(self,data):
-
"""
-
create the zookeeper node's children if not exist,contents is conf data
-
|-Applications
-
|-GameConf
-
|-item-000000000x => data
-
"""
-
self.child_node = join(self.APP_CONF,"conf-")
-
# path = self.zk.create(self.child_node,data, flags=zookeeper.SEQUENCE|zookeeper.PERSISTENT)
-
path = self.zk.create(self.child_node,data, flags=zookeeper.SEQUENCE)
-
self.say("create znode %s"%path)
-
-
def say(self, msg):
-
"""
-
print messages to screen
-
"""
-
if self.VERBOSE:
-
if self.path:
-
log.info("[ %s(%s) ] %s" % (self.path, "master" if self.is_master else "slave", msg))
-
else:
-
log.info(msg)
-
-
class Watcher:
-
def __init__(self):
-
""" Creates a child thread, which returns. The parent
-
thread waits for a KeyboardInterrupt and then kills
-
the child thread.
-
"""
-
self.child = os.fork()
-
if self.child == 0:
-
return
-
else:
-
self.watch()
-
-
def watch(self):
-
try:
-
os.wait()
-
except KeyboardInterrupt:
-
print ' exit...'
-
self.kill()
-
sys.exit()
-
-
def kill(self):
-
try:
-
os.kill(self.child, signal.SIGKILL)
-
except OSError:
-
pass
-
-
-
def start_zk_worker():
-
"""
-
连接到zookeeper执行初始化
-
"""
-
gj_zookeeper = GJZookeeper()
-
th1 = threading.Thread(target = start_agent_worker, name = "thread_1", args = (gj_zookeeper,))
-
-
th1.start()
-
-
th1.join()
-
-
def start_agent_worker(gj_zookeeper):
-
"""
-
监听配置文件变更信息,解析trigger指令,增加到znode上
-
"""
-
import socket
-
-
address = ('', 8877)
-
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # s = socket.socket()
-
s.bind(address)
-
s.listen(5)
-
print "listen on port 8877, wait trigger send cmd ..."
-
-
while True:
-
ss, addr = s.accept()
-
print "receive connetcion from " ,addr
-
content = ""
-
while True:
-
try :
-
data = ss.recv(512)
-
if not data:
-
print "close connetcion " ,addr
-
ss.close()
-
break
-
content = content + data
-
print "receive message from %s : %s"%(addr[0],data)
-
except Exception,e:
-
print "receive error from %s : %s"%(addr[0],str(e))
-
ss.close()
-
break
-
parse_trigger_package(content,gj_zookeeper)
-
-
s.close()
-
-
def parse_trigger_package(data,gj_zookeeper):
-
try:
-
cmd = data.split('|')[0]
-
content = data.split('|')[1]
-
except Exception, e:
-
print "ERROR :",str(e)
-
return
-
if cmd == "ADD":
-
gj_zookeeper.create_conf_znode(content)
-
else:
-
pass
-
-
-
def main():
-
Watcher()
-
start_zk_worker()
-
-
if __name__ == "__main__
-
# cat zk_confTrigger.py
-
#!/usr/bin/python
-
import socket
-
import sys
-
-
if len(sys.argv) != 3:
-
print "Usage: %s ipaddr[10.14.2.72] confTextCentent" %sys.argv[0]
-
sys.exit(1)
-
-
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-
sock.connect((sys.argv[1], 8877))
-
str = 'ADD' + '|' + sys.argv[2]
-
sock.send(str)
-
sock.close()
-
# cat zk_confApp.py
-
#!/usr/bin/env python2.7
-
# -*- coding: UTF-8 -*-
-
-
# Note: exit after timeout. when have update yet only update once after exit
-
-
import logging
-
from os.path import basename, join
-
-
from zkclient import ZKClient, zookeeper, watchmethod
-
-
logging.basicConfig(
-
level = logging.DEBUG,
-
format = "[%(asctime)s] %(levelname)-8s %(message)s"
-
)
-
-
log = logging
-
-
class GJZookeeper(object):
-
-
# connect agent port, get conf status
-
# ZK_HOST = "localhost:2181"
-
ZK_HOST = "localhost:11470"
-
TIMEOUT = 10000
-
-
def __init__(self, verbose = True):
-
self.VERBOSE = verbose
-
self.masters = []
-
self.is_master = False
-
self.path = None
-
-
self.APP_ROOT = "/Applications"
-
self.APP_CONF = join(self.APP_ROOT,"GameConf")
-
self.cur_id = "conf-"
-
self.zk = ZKClient(self.ZK_HOST, timeout = self.TIMEOUT)
-
self.say("login ok!")
-
# register
-
self.add_children_watch_on()
-
-
def add_children_watch_on(self):
-
"""
-
get children, and check who is the smallest child
-
"""
-
@watchmethod
-
def watcher(event):
-
self.say("child changed, try to get master again.")
-
self.add_children_watch_on()
-
-
children = self.zk.get_children(self.APP_CONF, watcher)
-
if len(children) == 0:
-
self.say("watch num is none: %s exit!" %children)
-
else:
-
children.sort()
-
self.max_id = children[-1]
-
self.say("%s's children: %s" % (self.APP_CONF, children))
-
if cmp(self.max_id,self.cur_id) > 0:
-
self.say("max zid in zookeeper %s is %s"%(self.APP_CONF,self.max_id))
-
znode = join(self.APP_CONF,self.max_id)
-
value = self.zk.get(znode)
-
print value
-
-
def say(self, msg):
-
"""
-
print messages to screen
-
"""
-
if self.VERBOSE:
-
if self.path:
-
log.info("[ %s(%s) ] %s" % (self.path, "master" if self.is_master else "slave", msg))
-
else:
-
log.info(msg)
-
-
def main():
-
gj_zookeeper = GJZookeeper()
-
-
if __name__ == "__main__":
-
main()
-
import time
-
#time.sleep(20000)
-
time.sleep(3600*24*3650)
阅读(2670) | 评论(0) | 转发(0) |