%pyspark
#查询认证用户
import sys
#import MySQLdb
import mysql.connector
import pandas as pd
import datetime
import time
optmap = {
'dbuser' : 'haoren',
'dbpass' : 'G4d',
'dbhost' : '172.12.1.8',
'dbport' : 3306,
'dbname' : 'HUIMDB'
}
optmap1 = {
'dbuser' : 'haoren',
'dbpass' : 'G4d',
'dbhost' : '172.12.1.5',
'dbport' : 3306,
'dbname' : 'PKGDMDB'
}
def sql_select(reqsql):
ret = ''
try:
db_conn = mysql.connector.connect(user=optmap['dbuser'], password=optmap['dbpass'], host=optmap['dbhost'], port=optmap['dbport'], database=optmap['dbname'])
db_cursor=db_conn.cursor()
count = db_cursor.execute(reqsql)
ret = db_cursor.fetchall()
except mysql.connector.Error as e:
print ('Error : {}'.format(e))
finally:
db_cursor.close()
db_conn.close
return ret
def sql_select1(reqsql1):
ret = ''
try:
db_conn1 = mysql.connector.connect(user=optmap1['dbuser'], password=optmap1['dbpass'], host=optmap1['dbhost'], port=optmap1['dbport'], database=optmap1['dbname'])
db_cursor1=db_conn1.cursor()
count = db_cursor1.execute(reqsql1)
ret1 = db_cursor1.fetchall()
except mysql.connector.Error as e:
print ('Error : {}'.format(e))
finally:
db_cursor1.close()
db_conn1.close
return ret1
#定义查询认证用户函数
def renzhengsingger(startday,endday):
t1 = int(time.mktime(time.strptime(startday,'%Y-%m-%d %H:%M:%S')) )
t2 = int(time.mktime(time.strptime(endday,'%Y-%m-%d %H:%M:%S')))
reqsql = "select PERFORMERID,from_unixtime(ADDTIME) from PERFORMERINFO where ADDTIME >=%s and ADDTIME < %s" %(t1,t2)
ret = sql_select(reqsql)
userdata = pd.DataFrame(columns=('id','dtime','shichang'))
index=0
for i in ret:
#print i[0]
id = int(i[0])%10
reqsql1 = "select sum(DURATION)/3600 from PERFORMERSHOWTIMERECORD%d where STARTTIME >=%s and STARTTIME <%s and PERFORMERID=%d" %(id,t1,t2,i[0])
#print reqsql1
ret1 = sql_select1(reqsql1)
userdata.loc[index]=(i[0],str(i[1]),str(ret1[0][0]))
index += 1
#print i[0],",",i[1],",",ret1[0][0]
df = spark.createDataFrame(userdata)
df.show()
renzhengsingger('2017-12-01 00:00:00','2017-12-28 23:00:00')
%pyspark
#encoding=gbk
#-*-coding:gbk-*-
#用户消费查询
import sys
#import MySQLdb
import mysql.connector
import pandas as pd
import datetime
import time
import urllib
import urllib2
optmap = {
'dbuser' : 'haoren',
'dbpass' : 'hG4d',
'dbhost' : '172.12.12.4',
'dbport' : 3306,
'dbname' : 'PHIMDB'
}
def sql_select(reqsql):
ret = ''
try:
db_conn = mysql.connector.connect(user=optmap['dbuser'], password=optmap['dbpass'], host=optmap['dbhost'], port=optmap['dbport'], database=optmap['dbname'])
db_cursor=db_conn.cursor()
count = db_cursor.execute(reqsql)
ret = db_cursor.fetchall()
except mysql.connector.Error as e:
print ('Error : {}'.format(e))
finally:
db_cursor.close()
db_conn.close
return ret
#用户充值
def getcharge(startday, endday):
strdate = startday.strftime("%y%m%d")
enddate = endday.strftime("%y%m%d")
tsstart=time.mktime(startday.timetuple())
tsend=time.mktime(endday.timetuple())
daycomsume = 0.0
pc = 0.0
wifi = 0.0
apple = 0.0
bag = 0.0
weixin = 0.0
other=0.0
dayuser = 0
dayusersum = 0
pcuser = 0
wifiuser = 0
appleuser = 0
baguser = 0
weixinuser = 0
otheruser=0
#type= 22-wifi充值 23-QQ直充|充值豪礼 0-点数消耗 19-苹果充值 -1-所有
#对应产品日报邮件中 0-QQpc版 22-QQapp版 23-QQ直充 24-微信公众号直充
type = -1
for n in range(0,20):
if type == -1:
#总计
reqsql = "select TYPE,SUM(CONSUME/100) AS ALLCOMSUME from `USERCONSUMPTIONRECORD%d` where TIME>=%d AND TIME < %d GROUP BY TYPE" % (n, tsstart, tsend)
#print reqsql
ret = sql_select(reqsql)
#print ret
#print ret[0][0]
if ret[0][0] is not None:
for i in range(len(ret)):
ctype = int(ret[i][0])
if ctype == 0:
pc = pc + float(ret[i][1])
elif ctype == 19:
apple = apple + float(ret[i][1])
elif ctype == 22:
wifi = wifi + float(ret[i][1])
elif ctype == 23:
bag = bag + float(ret[i][1])
elif ctype == 24:
weixin = weixin + float(ret[i][1])
else:
other = other + float(ret[i][1])
daycomsume = daycomsume + float(ret[i][1])
else:
#充值
reqsql = "select SUM(CONSUME/100) AS ALLCOMSUME from `USERCONSUMPTIONRECORD%d` where TYPE=%d AND TIME>=%d AND TIME < %d" % (n, type, tsstart, tsend)
#print reqsql
ret = sql_select(reqsql)
#print ret
#print ret[0][0]
if ret[0][0] is not None:
daycomsume = daycomsume + float(ret[0][0])
#人数
if type == -1:
reqsql = "select TYPE, COUNT(DISTINCT USERID) from `USERCONSUMPTIONRECORD%d` where TIME>=%d AND TIME < %d GROUP BY TYPE" % (n, tsstart, tsend)
ret = sql_select(reqsql)
if ret[0][0] is not None:
for i in range(len(ret)):
ctype = int(ret[i][0])
if ctype == 0:
pcuser = pcuser + int(ret[i][1])
elif ctype == 19:
appleuser = appleuser + int(ret[i][1])
elif ctype == 22:
wifiuser = wifiuser + int(ret[i][1])
elif ctype == 23:
baguser = baguser + int(ret[i][1])
elif ctype == 24:
weixinuser = weixinuser + int(ret[i][1])
else:
otheruser = otheruser + int(ret[i][1])
dayusersum = dayusersum + int(ret[i][1])
reqsql = "select COUNT(DISTINCT USERID) from `USERCONSUMPTIONRECORD%d` where TIME>=%d AND TIME < %d" % (n, tsstart, tsend)
ret = sql_select(reqsql)
if ret[0][0] is not None:
dayuser = dayuser + int(ret[0][0])
#print "strdate, dayuser, daycomsume, pc, wifi, bag, apple, pcuser, wifiuser, baguser, appleuser"
#print strdate, dayuser, daycomsume, pc, wifi, bag, apple, pcuser, wifiuser, baguser, appleuser
title = ("项目", "合计", "QQpc版", "QQapp版","QQ直充", "苹果充值", "微信公众号充值", "其他充值","平台核对数值")
pt = round(pc+ wifi+ bag)
ptuser = int(pcuser + wifiuser + baguser)
#data = (("充值金额", daycomsume, pc, wifi, bag, apple, pt), ("付费账号", dayuser, pcuser, wifiuser, baguser, appleuser, ptuser))
data = (("充值金额", str(daycomsume), str(pc), str(wifi), str(bag), str(apple), str(weixin), str(other), str(pt)),
("付费账号", str(dayuser), str(pcuser), str(wifiuser), str(baguser), str(appleuser), str(weixinuser), str(otheruser), str(ptuser)))
#print title
#print data
userdata = pd.DataFrame(columns=('strdate','sum','pc','wifi','bag','apple','weixin','other','pt'))
userdata.loc[0]=(strdate, str(daycomsume), str(pc), str(wifi), str(bag), str(apple), str(weixin), str(other), str(pt))
#userdata.loc[1]=("付费账号", str(dayuser), str(pcuser), str(wifiuser), str(baguser), str(appleuser), str(weixinuser), str(otheruser), str(ptuser))
#print userdata
df = spark.createDataFrame(userdata)
print '截止时间:'+ time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time()))
df.show()
#html = json2html(title, data)
#print html
#sendmail("ha@haoren.com,w@haoren.com", strdate + "QQ充值日报", html)
#sendmail("hao@haoren.com", strdate + "QQ充值日报", html)
def fromDayToDay(startdate, datelen, func):
delta = datetime.timedelta(days=1)
for i in range(0,datelen):
startday = startdate + delta * i
endday = startdate + delta * (i + 1)
func(startday, endday)
return
#测试
#html = '
项目
|
合计
|
QQpc版
|
QQapp版
|
QQ直充
|
苹果充值
|
充值金额
|
167765.5
|
114150.5
|
3928.0
|
49582.0
|
105.0
|
付费账号
|
430
|
348
|
48
|
74
|
2
|
'
#sendmail("hao@haoren.com", "人民币日报", html)
today = datetime.date.today()
#today = datetime.date(2017,7,24)
#yesterday = today - datetime.timedelta(days=1)
getcharge(yesterday, today)
fromDayToDay(today, 1, getcharge)
%pyspark
#encoding=gbk
#-*-coding:gbk-*-
import sys
#import MySQLdb
import mysql.connector
import pandas as pd
import datetime
import time
optmap = {
'dbuser' : 'haoren',
'dbpass' : 'G4d',
'dbhost' : '172.12.12.8',
'dbport' : 3306,
'dbname' : 'PHIMDB'
}
optmap1 = {
'dbuser' : 'haoren',
'dbpass' : 'G4d',
'dbhost' : '172.12.12.5',
'dbport' : 3306,
'dbname' : 'GKMDB'
}
def sql_select(reqsql):
ret = ''
try:
db_conn = mysql.connector.connect(user=optmap['dbuser'], password=optmap['dbpass'], host=optmap['dbhost'], port=optmap['dbport'], database=optmap['dbname'])
db_cursor=db_conn.cursor()
count = db_cursor.execute(reqsql)
ret = db_cursor.fetchall()
except mysql.connector.Error as e:
print ('Error : {}'.format(e))
finally:
db_cursor.close()
db_conn.close
return ret
def sql_select1(reqsql1):
ret = ''
try:
db_conn1 = mysql.connector.connect(user=optmap1['dbuser'], password=optmap1['dbpass'], host=optmap1['dbhost'], port=optmap1['dbport'], database=optmap1['dbname'])
db_cursor1=db_conn1.cursor()
count = db_cursor1.execute(reqsql1)
ret1 = db_cursor1.fetchall()
except mysql.connector.Error as e:
print ('Error : {}'.format(e))
finally:
db_cursor1.close()
db_conn1.close
return ret1
#批量查询用户的昵称
def getnickname(uid):
id = int(uid)%10
reqsql = "select CHANNELNICKNAME from CHARBASE%d where ID=%d" %(id,uid)
#reqsql = "select NICKNAME from CHARBASE%d where ID=%d" %(id,uid)
ret = sql_select(reqsql)
return ret
#中括号内填写用户内部ID,用逗号隔开
userlist = [67110207,76689594,90528820,90489527]
for i in userlist:
#(功能1)查询用户昵称
ret1 = getnickname(i)
ss = ret1[0][0]
ss = ss.encode('unicode-escape').decode('string_escape')
nick_name = ss.decode('gbk')
print i,",",nick_name