//----------- 服务器端 -----------//
#!/usr/bin/python
class Server(FORK, TCP):
pass
class MyRequestHandler(SRH):
def handle(self):
print '...connected from :', self.client_address
cmd = self.get_cmd()
if not cmd :
print '... not receive parameters.'
else:
self.my_send(cmd)
def get_cmd(self):
self.cmd = self.rfile.readline().strip()
if self.cmd and 1 == int(self.cmd[0]):
self.cmd = self.cmd[2:]
self.cmd = self.cmd.split(',')
self.cmd = 'php /u01/python/testing/spider.php '+self.cmd[0]+' ' +self.cmd[1]+' '+self.cmd[2]
os.system(self.cmd)
return self.cmd
else :
return 'finished' ;
def my_send(self, cmd):
if 'finished' == cmd :
self.wfile.write('Finished at %s' % ctime())
else :
self.wfile.write('[%s] %s' % (ctime(), 'command executed.'))
if __name__ == '__main__' :
import os
from SocketServer import TCPServer as TCP, StreamRequestHandler as SRH, ForkingMixIn as FORK
from time import ctime
HOST = '192.168.1.182'
PORT = 31608
ADDR = (HOST, PORT)
#tcpServ = TCP(ADDR, MyRequestHandler)
tcpServ = Server(ADDR, MyRequestHandler)
print 'waiting for connection...'
tcpServ.serve_forever()
//----------- 客户端 -----------//
#!d:\python25/python
def get_task(db):
'''get spider tasks.'''
if not db:
db = OCI.connect('spidertk/n0sm0king@spider')
cursor = db.cursor()
thread = []
sql = '''
select *
from (select t7.*,
row_number() over(partition by t7.st_id order by dbms_random.value) rn2
from (select t6.*
from (select t5.st_id,
t5.sp_id,
t5.run_param,
t5.sn2,
t5.ss_ip,
t5.create_time,
row_number() over(partition by t5.sp_id, t5.sn2 order by dbms_random.value) rn1
from (select t4.*
from (select t1.st_id,
t1.sp_id,
t1.run_param,
t1.ss_name sn1,
t1.status,
t1.create_time,
t2.ss_name sn2,
t2.ss_ip,
t2.ss_thread
from test_task t1, test_server t2
where t2.ss_thread < 5) t4
where t4.status = 0
and not EXISTS
(select t3.sp_id, t3.sn2
from (select t1.sp_id,
t1.ss_name sn1,
t2.ss_name sn2
from test_task t1,
test_server t2
where t2.ss_thread < 5) t3
where lower(t3.sn2) = lower(t3.sn1)
and t3.sn2 = t4.sn2
and t4.sp_id = t3.sp_id)) t5) t6
where t6.rn1 = 1) t7) t8
where t8.rn2 = 1
order by t8.create_time
'''
cursor.execute(sql)
res = cursor.fetchall()
print 'Total tasks is : %d' % cursor.rowcount
return res
def send_task(tasks, db):
HOST = '192.168.1.182'
PORT = 31608
ADDR = (HOST, PORT)
BUFSIZ = 1024
if not db:
db = OCI.connect('spidertk/n0sm0king@spider')
cursor = db.cursor()
cmd = '1:'
for i in tasks:
p = cmd+str(i[0])+';'+str(i[1])+';'+i[2]
sql = "update test_task set ss_name = '"+i[3]+"', status = 1, deliver_time = sysdate where st_id="+str(i[0])
cursor.execute(sql)
db.commit()
HOST = str(i[2])
tcpClntSock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
#tcpClntSock.connect((HOST, PORT))
tcpClntSock.connect(ADDR)
tcpClntSock.send('%s \r\n' % p)
data = tcpClntSock.recv(BUFSIZ)
if not data :
break
print "buffer: "+data.strip()+" from python buff"
#close socket connection
tcpClntSock.close()
if __name__ == '__main__' :
import socket
import cx_Oracle as OCI
# create new global database connection
db = OCI.connect('spidertk/n0sm0king@spider')
res = get_task(db)
send_task(res, db)
#close global database connection
db.close()
阅读(3717) | 评论(0) | 转发(0) |