Chinaunix首页 | 论坛 | 博客
  • 博客访问: 520321
  • 博文数量: 102
  • 博客积分: 950
  • 博客等级: 准尉
  • 技术积分: 1094
  • 用 户 组: 普通用户
  • 注册时间: 2010-07-28 16:07
文章分类

全部博文(102)

文章存档

2020年(2)

2016年(5)

2015年(15)

2014年(25)

2013年(18)

2012年(19)

2011年(18)

我的朋友

分类: LINUX

2014-10-11 16:34:38

具体参见:

使用的zkclient 模块是下面这个


安装zookeeper下的python模块

import zookeeper

1.由于python客户端依赖c的客户端所以要先安装c版本的客户端

  1. cd zookeeper-3.4.5/src/c  
  2. ./configure  
  3. make   
  4. make install  

2.测试c版本客户端

# /usr/local/bin/cli_mt localhost:2181
Watcher SESSION_EVENT state = CONNECTED_STATE
Got a new session id: 0x14907459b0700a8

help  
    create [+[e|s]]
    delete
    set
    get
    ls
    ls2
    sync
    exists
    wexists
    myid
    verbose
    addauth
    quit

    prefix the command with the character 'a' to run the command asynchronously.
    run the 'verbose' command to toggle verbose logging.
    i.e. 'aget /foo' to get /foo asynchronously
ls /
time = 0 msec
/: rc = 0
        controller
        Roles
        brokers
        zookeeper
        controller_epoch

4.看到以上信息说明c版本的客户端已经安装好了,下面安装python版本客户端


5.下载python扩展包,并且解压:

  

或者直接安装 pip install zkpython

6.如果找不到Pyhon.h,可能由于没有安装python26-devel

  1. 32位直接运行  
  2. yum install python26-devel.i386 

7.测试是否成功

  1. import zookeeper  
cat README 
利用zookeeper 更新游戏配置文件
        zk_confSrv.py 注册到zookeeper的/Roles/workers/worker000000000x,可以启动多个做热切,建议每个zookeeper机器
上启动一个. 同时会打开8877端口,监听trigger的指令来更新znode配置信息
        zk_confTrigger.py 连接zk_confSrv.py的8877,发送配置文件内容的指令,只有整体重写配置的功能
                exg: python zk_confTrigger.py 10.14.251.203 'serverlist conf content'
        zk_confApp.py 在每个客户端启动,连接zookeeper,监听目录/Applications/GameConf/conf-000000000X,每次zk_confTrigger.py 更新指令数字+1,最大的就是最新的conf配置
                发送指令后,zookeeper会通知zk_confApp.py目录更新了,读取最大的配置znode,获得内容。
~


点击(此处)折叠或打开

  1. # cat zk_confSrv.py
  2. #!/usr/bin/env python2.7
  3. # -*- coding: UTF-8 -*-
  4.  
  5. import logging
  6. from os.path import basename, join

  7. from zkclient import ZKClient, zookeeper, watchmethod

  8. import os
  9. import sys
  10. import threading
  11. import signal
  12. import time

  13. logging.basicConfig(
  14.     level = logging.DEBUG,
  15.     format = "[%(asctime)s] %(levelname)-8s %(message)s"
  16. )

  17. log = logging

  18. class GJZookeeper(object):

  19. # ZK_HOST = "localhost:2181"
  20.     ZK_HOST = "localhost:11470"
  21.     ROOT = "/Roles"
  22.     WORKERS_PATH = join(ROOT, "workers")
  23.     MASTERS_NUM = 1
  24.     TIMEOUT = 10000

  25.     def __init__(self, verbose = True):
  26.         self.VERBOSE = verbose
  27.         self.masters = []
  28.         self.is_master = False
  29.         self.path = None

  30.         self.APP_ROOT = "/Applications"
  31.         self.APP_CONF = join(self.APP_ROOT,"GameConf")

  32.         self.zk = ZKClient(self.ZK_HOST, timeout = self.TIMEOUT)
  33.         self.say("login zookeeper successfully!")
  34.         # init
  35.         self.create_roles_znode()
  36.         # register
  37.         self.register()

  38.     def create_roles_znode(self):
  39.         """
  40.         create the zookeeper node if not exist
  41.         |-Roles
  42.              |-workers
  43.         """
  44.         nodes = (self.ROOT, self.WORKERS_PATH)
  45.         for node in nodes:
  46.             if not self.zk.exists(node):
  47.                 try:
  48.                     self.zk.create(node, "")
  49.                 except:
  50.                     pass

  51.     @property
  52.     def is_slave(self):
  53.         return not self.is_master

  54.     def register(self):
  55.         """
  56.         register a node for this worker,znode type : EPHEMERAL | SEQUENCE
  57.         |-Roles
  58.              |-workers
  59.                      |-worker000000000x ==>>master
  60.                      |-worker000000000x+1 ==>>worker
  61.                      ....
  62.         """
  63.         self.path = self.zk.create(self.WORKERS_PATH + "/worker", "1", flags=zookeeper.EPHEMERAL | zookeeper.SEQUENCE)
  64. # self.path = "/Roles/workers/worker"
  65.         self.path = basename(self.path)
  66.         self.say("I'm %s" % self.path)
  67.         # check who is the master
  68.         self.get_master()

  69.     def get_master(self):
  70.         """
  71.         get children, and check who is the smallest child
  72.         """
  73.         @watchmethod
  74.         def watcher(event):
  75.             self.say("child changed, try to get master again.")
  76.             self.get_master()

  77.         try:
  78.                 children = self.zk.get_children(self.WORKERS_PATH, watcher)
  79.         except zookeeper.ConnectionLossException:
  80.                 print "losing connection with zookeeper ..."
  81.                 return False
  82.         except:
  83.                 return False

  84.         children.sort()
  85.         self.say("%s's children: %s" % (self.WORKERS_PATH, children))

  86.         # check if I'm master
  87.         self.masters = children[:self.MASTERS_NUM]
  88. # self.say("tell me who is master!: %s. path: %s" % (self.masters,self.path))
  89.         if self.path in self.masters:
  90.             self.is_master = True
  91.             self.say("I've become master!")
  92.             self.create_app_znode()
  93.         else:
  94.             self.say("%s is masters, I'm slave" % self.masters)

  95.     def create_app_znode(self):
  96.         """
  97.         create the zookeeper node if not exist
  98.         |-Applications
  99.                     |-GameConf
  100.         """
  101.         nodes = (self.APP_ROOT, self.APP_CONF)
  102.         for node in nodes:
  103.             if not self.zk.exists(node):
  104.                 try:
  105.                     self.say("Create znode [%s] ..."%(node))
  106.                     self.zk.create(node, "")
  107.                 except:
  108.                     pass

  109.     def create_conf_znode(self,data):
  110.         """
  111.         create the zookeeper node's children if not exist,contents is conf data
  112.         |-Applications
  113.                     |-GameConf
  114.                             |-item-000000000x => data
  115.         """
  116.         self.child_node = join(self.APP_CONF,"conf-")
  117. # path = self.zk.create(self.child_node,data, flags=zookeeper.SEQUENCE|zookeeper.PERSISTENT)
  118.         path = self.zk.create(self.child_node,data, flags=zookeeper.SEQUENCE)
  119.         self.say("create znode %s"%path)

  120.     def say(self, msg):
  121.         """
  122.         print messages to screen
  123.         """
  124.         if self.VERBOSE:
  125.             if self.path:
  126.                 log.info("[ %s(%s) ] %s" % (self.path, "master" if self.is_master else "slave", msg))
  127.             else:
  128.                 log.info(msg)
  129.         
  130. class Watcher:
  131.     def __init__(self):
  132.         """ Creates a child thread, which returns. The parent
  133.             thread waits for a KeyboardInterrupt and then kills
  134.             the child thread.
  135.         """
  136.         self.child = os.fork()
  137.         if self.child == 0:
  138.             return
  139.         else:
  140.             self.watch()

  141.     def watch(self):
  142.         try:
  143.             os.wait()
  144.         except KeyboardInterrupt:
  145.             print ' exit...'
  146.             self.kill()
  147.         sys.exit()

  148.     def kill(self):
  149.         try:
  150.             os.kill(self.child, signal.SIGKILL)
  151.         except OSError:
  152.             pass


  153. def start_zk_worker():
  154.     """
  155.         连接到zookeeper执行初始化
  156.     """
  157.     gj_zookeeper = GJZookeeper()
  158.     th1 = threading.Thread(target = start_agent_worker, name = "thread_1", args = (gj_zookeeper,))

  159.     th1.start()

  160.     th1.join()

  161. def start_agent_worker(gj_zookeeper):
  162.     """
  163.         监听配置文件变更信息,解析trigger指令,增加到znode上
  164.     """
  165.     import socket

  166.     address = ('', 8877)
  167.     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # s = socket.socket()
  168.     s.bind(address)
  169.     s.listen(5)
  170.     print "listen on port 8877, wait trigger send cmd ..."

  171.     while True:
  172.         ss, addr = s.accept()
  173.         print "receive connetcion from " ,addr
  174.         content = ""
  175.         while True:
  176.             try :
  177.                 data = ss.recv(512)
  178.                 if not data:
  179.                     print "close connetcion " ,addr
  180.                     ss.close()
  181.                     break
  182.                 content = content + data
  183.                 print "receive message from %s : %s"%(addr[0],data)
  184.             except Exception,e:
  185.                 print "receive error from %s : %s"%(addr[0],str(e))
  186.                 ss.close()
  187.                 break
  188.         parse_trigger_package(content,gj_zookeeper)

  189.     s.close()

  190. def parse_trigger_package(data,gj_zookeeper):
  191.     try:
  192.         cmd = data.split('|')[0]
  193.         content = data.split('|')[1]
  194.     except Exception, e:
  195.         print "ERROR :",str(e)
  196.         return
  197.     if cmd == "ADD":
  198.         gj_zookeeper.create_conf_znode(content)
  199.     else:
  200.         pass


  201. def main():
  202.     Watcher()
  203.     start_zk_worker()

  204. if __name__ == "__main__


点击(此处)折叠或打开

  1. # cat zk_confTrigger.py
  2. #!/usr/bin/python
  3. import socket
  4. import sys

  5. if len(sys.argv) != 3:
  6.         print "Usage: %s ipaddr[10.14.2.72] confTextCentent" %sys.argv[0]
  7.         sys.exit(1)

  8. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  9. sock.connect((sys.argv[1], 8877))
  10. str = 'ADD' + '|' + sys.argv[2]
  11. sock.send(str)
  12. sock.close()


点击(此处)折叠或打开

  1. # cat zk_confApp.py
  2. #!/usr/bin/env python2.7
  3. # -*- coding: UTF-8 -*-

  4. # Note: exit after timeout. when have update yet only update once after exit

  5. import logging
  6. from os.path import basename, join

  7. from zkclient import ZKClient, zookeeper, watchmethod

  8. logging.basicConfig(
  9.         level = logging.DEBUG,
  10.         format = "[%(asctime)s] %(levelname)-8s %(message)s"
  11. )

  12. log = logging

  13. class GJZookeeper(object):

  14.         # connect agent port, get conf status
  15. # ZK_HOST = "localhost:2181"
  16.         ZK_HOST = "localhost:11470"
  17.         TIMEOUT = 10000

  18.         def __init__(self, verbose = True):
  19.                 self.VERBOSE = verbose
  20.                 self.masters = []
  21.                 self.is_master = False
  22.                 self.path = None

  23.                 self.APP_ROOT = "/Applications"
  24.                 self.APP_CONF = join(self.APP_ROOT,"GameConf")
  25.                 self.cur_id = "conf-"
  26.                 self.zk = ZKClient(self.ZK_HOST, timeout = self.TIMEOUT)
  27.                 self.say("login ok!")
  28.                 # register
  29.                 self.add_children_watch_on()

  30.         def add_children_watch_on(self):
  31.                 """
  32.                 get children, and check who is the smallest child
  33.                 """
  34.                 @watchmethod
  35.                 def watcher(event):
  36.                         self.say("child changed, try to get master again.")
  37.                         self.add_children_watch_on()

  38.                 children = self.zk.get_children(self.APP_CONF, watcher)
  39.                 if len(children) == 0:
  40.                         self.say("watch num is none: %s exit!" %children)
  41.                 else:
  42.                         children.sort()
  43.                         self.max_id = children[-1]
  44.                         self.say("%s's children: %s" % (self.APP_CONF, children))
  45.                         if cmp(self.max_id,self.cur_id) > 0:
  46.                                 self.say("max zid in zookeeper %s is %s"%(self.APP_CONF,self.max_id))
  47.                                 znode = join(self.APP_CONF,self.max_id)
  48.                                 value = self.zk.get(znode)
  49.                                 print value

  50.         def say(self, msg):
  51.                 """
  52.                 print messages to screen
  53.                 """
  54.                 if self.VERBOSE:
  55.                         if self.path:
  56.                                 log.info("[ %s(%s) ] %s" % (self.path, "master" if self.is_master else "slave", msg))
  57.                         else:
  58.                                 log.info(msg)

  59. def main():
  60.         gj_zookeeper = GJZookeeper()

  61. if __name__ == "__main__":
  62.         main()
  63.         import time
  64.         #time.sleep(20000)
  65.         time.sleep(3600*24*3650)



阅读(2625) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~