%pyspark
#用户消费查询
import sys
import MySQLdb
import pandas as pd
import datetime
import time
import urllib
import urllib2
optmap = {
'dbuser' : 'game',
'dbpass' : 'G4d',
'dbhost' : '172.17.10.104',
'dbport' : 3306,
'dbname' : 'UDUIMDB'
}
#另外一种配色方案 ff9900 00B2EE FF33FF EEEEEE
def json2html(title, data):
#str = '
'
str = ''
if title:
str += ''
for col in range(len(title)):
str += ''
str += ''
rowcount = 0
for row in range(len(data)):
rowcount+=1
str += ''
for col in range(len(data[row])):
if rowcount % 2 == 0:
str += ''
else:
str += ''
str += ''
#str += '
' + title[col] + '
|
' + data[row][col] + '
|
' + data[row][col] + '
|
'
str += ''
return str;
def sendmail(mailto, subject, body):
#发起请求的url
post_url = '';
postData = 'mail_to=' + mailto + '&smtp_subject=' + subject + '&mailtype=HTML&body=' + body
req = urllib2.Request(post_url)
response = urllib2.urlopen(req, postData)
#打印返回值
print response.read()
def sql_select(reqsql):
try:
db_conn = MySQLdb.connect(user=optmap['dbuser'], passwd=optmap['dbpass'], host=optmap['dbhost'], port=optmap['dbport'], db=optmap['dbname'])
db_cursor=db_conn.cursor()
db_conn.query("use %s"%optmap['dbname'])
count = db_cursor.execute(reqsql)
ret = db_cursor.fetchall()
db_cursor.close()
db_conn.close
return ret
except MySQLdb.Error,e:
print "Mysql ERROR %d:%s" %(e.args[0], e.args[1])
return ''
#用户充值
def getcharge(startday, endday):
strdate = startday.strftime("%y%m%d")
enddate = endday.strftime("%y%m%d")
tsstart=time.mktime(startday.timetuple())
tsend=time.mktime(endday.timetuple())
daycomsume = 0.0
pc = 0.0
wifi = 0.0
apple = 0.0
bag = 0.0
dayuser = 0
dayusersum = 0
pcuser = 0
wifiuser = 0
appleuser = 0
baguser = 0
#type= 22-wifi充值 23-用户号直充|充值豪礼 0-点数消耗 19-苹果充值 -1-所有
#对应产品日报邮件中 0-用户号pc版 22-用户号app版 23-用户号直充
type = -1
for n in range(0,20):
if type == -1:
#总计
reqsql = "select TYPE,SUM(CONSUME/100) AS ALLCOMSUME from `USERCONSUMPTIONRECORD%d` where TIME>=%d AND TIME < %d GROUP BY TYPE" % (n, tsstart, tsend)
print reqsql
ret = sql_select(reqsql)
print ret
#print ret[0][0]
if ret[0][0] is not None:
for i in range(len(ret)):
ctype = int(ret[i][0])
if ctype == 0:
pc = pc + float(ret[i][1])
elif ctype == 19:
apple = apple + float(ret[i][1])
elif ctype == 22:
wifi = wifi + float(ret[i][1])
elif ctype == 23:
bag = bag + float(ret[i][1])
daycomsume = daycomsume + float(ret[i][1])
else:
#充值
reqsql = "select SUM(CONSUME/100) AS ALLCOMSUME from `USERCONSUMPTIONRECORD%d` where TYPE=%d AND TIME>=%d AND TIME < %d" % (n, type, tsstart, tsend)
print reqsql
ret = sql_select(reqsql)
print ret
#print ret[0][0]
if ret[0][0] is not None:
daycomsume = daycomsume + float(ret[0][0])
#人数
if type == -1:
reqsql = "select TYPE, COUNT(DISTINCT USERID) from `USERCONSUMPTIONRECORD%d` where TIME>=%d AND TIME < %d GROUP BY TYPE" % (n, tsstart, tsend)
ret = sql_select(reqsql)
if ret[0][0] is not None:
for i in range(len(ret)):
ctype = int(ret[i][0])
if ctype == 0:
pcuser = pcuser + int(ret[i][1])
elif ctype == 19:
appleuser = appleuser + int(ret[i][1])
elif ctype == 22:
wifiuser = wifiuser + int(ret[i][1])
elif ctype == 23:
baguser = baguser + int(ret[i][1])
dayusersum = dayusersum + int(ret[i][1])
reqsql = "select COUNT(DISTINCT USERID) from `USERCONSUMPTIONRECORD%d` where TIME>=%d AND TIME < %d" % (n, tsstart, tsend)
ret = sql_select(reqsql)
if ret[0][0] is not None:
dayuser = dayuser + int(ret[0][0])
print "strdate, dayuser, daycomsume, pc, wifi, bag, apple, pcuser, wifiuser, baguser, appleuser"
print strdate, dayuser, daycomsume, pc, wifi, bag, apple, pcuser, wifiuser, baguser, appleuser
title = ("项目", "合计", "用户号pc版", "用户号app版","用户号直充", "苹果充值", "平台核对数值")
pt = round(pc+ wifi+ bag)
ptuser = int(pcuser + wifiuser + baguser)
#data = (("充值金额", daycomsume, pc, wifi, bag, apple, pt), ("付费账号", dayuser, pcuser, wifiuser, baguser, appleuser, ptuser))
data = (("充值金额", str(daycomsume), str(pc), str(wifi), str(bag), str(apple), str(pt)),
("付费账号", str(dayuser), str(pcuser), str(wifiuser), str(baguser), str(appleuser), str(ptuser)))
print title
print data
html = json2html(title, data)
#print html
sendmail("haoyuanli@sjshgame.com,weipengfei@sjshgame.com", strdate + "用户号充值日报", html)
#sendmail("haoyuanli@sjshgame.com", strdate + "用户号充值日报", html)
def fromDayToDay(startdate, datelen, func):
delta = datetime.timedelta(days=1)
for i in range(0,datelen):
startday = startdate + delta * i
endday = startdate + delta * (i + 1)
func(startday, endday)
return
#测试
#html = '
项目
|
合计
|
用户号pc版
|
用户号app版
|
用户号直充
|
苹果充值
|
充值金额
|
167765.5
|
114150.5
|
3928.0
|
49582.0
|
105.0
|
付费账号
|
430
|
348
|
48
|
74
|
2
|
'
#sendmail("haoyuanli@sjshgame.com", "人民币日报", html)
today = datetime.date.today()
#today = datetime.date(2017,7,24)
yesterday = today - datetime.timedelta(days=1)
fromDayToDay(yesterday, 1, getcharge)
======================================================================================================================
import sys
import MySQLdb
import pandas as pd
import datetime
import time
optmap = {
'dbuser' : 'sjshgame',
'dbpass' : 'KKSLLuhsjgyHGFqDxhG4d',
'dbhost' : '172.17.1.12',
'dbport' : 3306,
'dbname' : 'JIESUANDB'
}
def getusercharge(userid, startday):
strdate = startday.strftime("%y%m%d")
reqsql = "select CONSUMERID,SUM(DUBIOPNUM) from `DUBIJIESUANTONGJI_%s` where CONSUMERID=%u AND DUBIOPTYPE=1 AND (OPTYPE=1016 OR OPTYPE=1020 OR OPTYPE=1021)" % (strdate, int(userid))
print reqsql
ret = sql_select(reqsql) #调用前面的函数
print ret
if ret[0][0] is not None:
return float(ret[0][1])/100.0
else:
return 0
startdate = datetime.date(2017, 4, 1)
======================================================================================================================
#查询用户余额
import sys
import MySQLdb
import pandas as pd
optmap = {
'dbuser' : 'sjshgame',
'dbpass' : 'KKSLLuhsjgyHGFqDxhG4d',
'dbhost' : '172.17.1.104',
'dbport' : 3306,
'dbname' : 'DUIMDB'
}
def sql_select(reqsql):
try:
db_conn = MySQLdb.connect(user=optmap['dbuser'], passwd=optmap['dbpass'], host=optmap['dbhost'], port=optmap['dbport'], db=optmap['dbname'])
db_cursor=db_conn.cursor()
db_conn.query("use %s"%optmap['dbname'])
count = db_cursor.execute(reqsql)
ret = db_cursor.fetchall() #获取结果集
db_cursor.close()
db_conn.close
return ret
except MySQLdb.Error,e:
print "Mysql ERROR %d:%s" %(e.args[0], e.args[1])
return ''
def getusercoin(userid):
i = int(userid) % 10
reqsql = "select ID,COINREFER from CHARCOIN%u where ID=%u" % (int(i), int(userid))
#print reqsql
ret = sql_select(reqsql) #调用前面的函数
#print ret
return ret[0]
def getall(userlist):
userdata = pd.DataFrame(columns=('userid', 'coin'))
index = 0
for userid in userlist:
coins = getusercoin(userid) #调用前面的函数
#print coins[0],coins[1]/100.0
if coins[0] is not None:
userdata.loc[index] = (str(coins[0]), coins[1]/100.0)
else:
userdata.loc[index] = (str(userid), 0)
index += 1
#print userdata.tail(10)
df = spark.createDataFrame(userdata)
#df.createOrReplaceTempView('userdata')
df.show(50)
#这里填写用户号ID
userlist = [
84163232,
50859780
]
getall(userlist) #调用前面的函数
======================================================================================================================
import sys
import MySQLdb
import pandas as pd
import datetime
import time
optmap = {
'dbuser' : 'sjshgame',
'dbpass' : 'KKSLLuhsjgyHGFqDxhG4d',
'dbhost' : '172.17.1.12',
'dbport' : 3306,
'dbname' : 'JIESUANDB'
}
def sql_select(reqsql): #定义函数
try:
db_conn = MySQLdb.connect(user=optmap['dbuser'], passwd=optmap['dbpass'], host=optmap['dbhost'], port=optmap['dbport'], db=optmap['dbname'])
db_cursor=db_conn.cursor()
db_conn.query("use %s"%optmap['dbname'])
count = db_cursor.execute(reqsql)
ret = db_cursor.fetchall()
db_cursor.close()
db_conn.close
return ret
except MySQLdb.Error,e:
print "Mysql ERROR %d:%s" %(e.args[0], e.args[1])
return ''
#用户消费
def getuserconsume(userid, startday): #定义函数
strdate = startday.strftime("%y%m%d")
# 送道具 + 活动 + 点歌 + 表情贴
reqsql = "select CONSUMERID,SUM(DUBIOPNUM) from `DUBIJIESUANTONGJI_%s` where CONSUMERID=%u AND DUBIOPTYPE=2 AND (OPTYPE=1 OR OPTYPE=4 OR OPTYPE=17 OR OPTYPE=25)" % (strdate, int(userid))
print reqsql
ret = sql_select(reqsql) #调用前面定义的函数
#print ret
if ret[0][0] is not None:
return float(ret[0][1])/100.0
else:
return 0
#用户充值
def getusercharge(userid, startday): #定义函数
strdate = startday.strftime("%y%m%d")
reqsql = "select CONSUMERID,SUM(DUBIOPNUM) from `DUBIJIESUANTONGJI_%s` where CONSUMERID=%u AND DUBIOPTYPE=1 AND (OPTYPE=1016 OR OPTYPE=1020 OR OPTYPE=1021)" % (strdate, int(userid))
print reqsql
ret = sql_select(reqsql) #调用前面定义的函数
print ret
if ret[0][0] is not None:
return float(ret[0][1])/100.0
else:
return 0
#用户当天结余人民币
def getusercurcoin(userid, startday): #定义函数
strdate = startday.strftime("%y%m%d")
reqsql = "select CONSUMERID,CURRENTNUM from `DUBIJIESUANTONGJI_%s` where CONSUMERID=%u ORDER BY OPTIME DESC LIMIT 1" % (strdate, int(userid))
print reqsql
ret = sql_select(reqsql) #调用前面的函数
print ret
if ret:
return float(ret[0][1])/100.0
else:
return 0
def getconsume(): #定义函数
startdate = datetime.date(2017, 4, 1)
enddate = datetime.date(2017, 4, 15)
userid = 61090932
userdata = pd.DataFrame(columns=('date','userid','charge', 'consume', 'dayleftcoin'))
index = 0
# 计算日差
td = enddate - startdate
datelen = td.days + 1
#print datelen
delta = datetime.timedelta(days=1)
allcoins = 0
for i in range(0,datelen):
startday = startdate + delta * i
consume_coin = getuserconsume(userid, startday)
charge = getusercharge(userid, startday)
dayleftcoin = getusercurcoin(userid, startday)
userdata.loc[index] = (startday.strftime("%Y-%m-%d"),str(userid), charge, consume_coin, dayleftcoin)
index += 1
#userdata.loc[index] = ('total',str(userid), allcoins, 0)
print userdata.tail(100)
return
getconsume()
======================================================================================================================
import sys
import MySQLdb
import pandas as pd
import datetime
import time
optmap = {
'dbuser' : 'sjshgame',
'dbpass' : 'KKSLLuhsjgyHGFqDxhG4d',
'dbhost' : '172.17.1.12',
'dbport' : 3306,
'dbname' : 'JIESUANDB'
}
def sql_select(reqsql): #定义函数
try:
db_conn = MySQLdb.connect(user=optmap['dbuser'], passwd=optmap['dbpass'], host=optmap['dbhost'], port=optmap['dbport'], db=optmap['dbname'])
db_cursor=db_conn.cursor()
db_conn.query("use %s"%optmap['dbname'])
count = db_cursor.execute(reqsql)
ret = db_cursor.fetchall()
db_cursor.close()
db_conn.close
return ret
except MySQLdb.Error,e:
print "Mysql ERROR %d:%s" %(e.args[0], e.args[1])
return ''
#购买人当天结余人民币
def getusercurcoin(userid, startday): #定义函数
strdate = startday.strftime("%y%m%d")
reqsql = "select SINGERID,CURRENTSINGERGOLD from `JIESUANTONGJI_%s` where SINGERID=%u ORDER BY OPTIME DESC LIMIT 1" % (strdate, int(userid))
print reqsql
ret = sql_select(reqsql) #调用前面的函数
print ret
if ret:
return float(ret[0][1])/100.0
else:
return 0
#购买人当月结余人民币
def getusermonthgold(userid, startday):
strdate = startday.strftime("%Y%m")
reqsql = "select SINGERID,CURRENTSINGERGOLD,FROM_UNIXTIME(OPTIME) from `JIESUANTONGJI_ZONG_%s` where SINGERID=%u ORDER BY OPTIME DESC LIMIT 1" % (strdate, int(userid))
print reqsql
ret = sql_select(reqsql)
print ret, len(ret)
if len(ret) > 0:
return (str(userid), ret[0][1]/100.0, ret[0][2].strftime("%Y-%m-%d %H:%M:%S"))
else:
return (str(userid), 0.0,'')
def getall(userlist): #定义函数
startdate = datetime.date(2017, 5, 1)
userdata = pd.DataFrame(columns=('userid', 'glod', 'time'))
index = 0
for userid in userlist:
coins = getusermonthgold(userid, startdate) #调用前面定义的函数
print coins
#userdata.loc[index] = (str(coins[0]), coins[1])
userdata.loc[index] = coins
index += 1
#print userdata.tail(10)
df = spark.createDataFrame(userdata)
#df.createOrReplaceTempView('userdata')
df.show(50)
#这里填写购买人用户号ID
userlist = [
91730167,
91855869,
92099710
]
getall(userlist) #调用前面定义的函数
======================================================================================================================
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import udf
import MySQLdb
import mysql_op
import datetime
import time
from mysql_op import MySQL
import pandas as pd
import numpy as np
from fastparquet import ParquetFile
from fastparquet import write
def fromDayToDay(startdate, datelen, func):
delta = datetime.timedelta(days=1)
for i in range(0,datelen):
startday = startdate + delta * i
endday = startdate + delta * (i + 1)
func(startday, endday)
return
def fromDayToEndDay(startdate, datelen, endday, func):
delta = datetime.timedelta(days=1)
for i in range(0,datelen):
startday = startdate + delta * i
#endday = startdate + delta * (i + 1)
func(startday, endday)
return
# 获取人民币数据
def saveDayBillData(startday, endday): #定义函数
strday = startday.strftime("%Y%m%d")
if os.path.exists("/home/sjshgame/logstatis/billdata"+strday+".parq"):
return
#数据库连接参数
dbconfig = {'host':'172.17.1.12',
'port': 3306,
'user':'sjshgame',
'passwd':'KKSLLuhsjgyHGFqDxhG4d',
'db':'JIESUANDB',
'charset':'utf8'}
#连接数据库,创建这个类的实例
mysql_cn= MySQLdb.connect(host=dbconfig['host'], port=dbconfig['port'],user=dbconfig['user'], passwd=dbconfig['passwd'], db=dbconfig['db'])
tsstart=time.mktime(startday.timetuple())
tsend=time.mktime(endday.timetuple())
strdate = startday.strftime("%y%m%d")
sql = "SELECT OPTIME,CONSUMERID,PRESERVENUM,CURRENTNUM,DUBIOPTYPE,DUBIOPNUM,OPTYPE,OPDETAIL,OPNUM FROM `DUBIJIESUANTONGJI_%s`" % (strdate)
print sql
pddf = pd.read_sql(sql, con=mysql_cn)
mysql_cn.close()
print pddf.head(5)
dflen = len(pddf.index)
if dflen > 0:
print pddf.describe()
write("/home/sjshgame/logstatis/billdata"+strday+".parq", pddf)
return
def saveBillData(): #定义函数
startday = datetime.date(2017, 2, 28)
endday = datetime.date(2017, 2, 28)
td = endday - startday
datelen = td.days + 1
# 获取包裹数据
fromDayToDay(startday, datelen, saveDayBillData)
# 获取Wf注册数据
def saveDayWifiPhoneRegData(startday, endday): #定义函数
#数据库连接参数
dbconfig = {'host':'172.17.1.105',
'port': 3306,
'user':'sjshgame',
'passwd':'KKSLLuhsjgyHGFqDxhG4d',
'db':'OTHERDB',
'charset':'utf8'}
#连接数据库,创建这个类的实例
mysql_cn= MySQLdb.connect(host=dbconfig['host'], port=dbconfig['port'],user=dbconfig['user'], passwd=dbconfig['passwd'], db=dbconfig['db'])
strday = startday.strftime("%Y%m%d")
tsstart=time.mktime(startday.timetuple())
tsend=time.mktime(endday.timetuple())
strdate = startday.strftime("%y%m%d")
sql = "select USERID from NEW_WEB_USER where TIME< %d AND TYPE=17" % (tsend)
print sql
pddf = pd.read_sql(sql, con=mysql_cn)
mysql_cn.close()
print pddf.head(5)
dflen = len(pddf.index)
if dflen > 0:
print pddf.describe()
write("/home/sjshgame/logstatis/wifiphonereg"+strday+".parq", pddf)
return
def saveWifiPhoneReg(): #定义函数
startday = datetime.date(2016, 12, 1)
endday = datetime.date(2016, 12, 1)
td = endday - startday
datelen = td.days + 1
# 获取包裹数据
fromDayToDay(startday, datelen, saveDayWifiPhoneRegData) #调用前面的函数
OPTypeName = {
0:"人民币充会员",
1:"送道具",
4:"充值活动",
6:"喊话",
}
OpDetailName19 = {
1:"猜保存收益",
2:"赚下注和返注",
3:"发红包",
4:"抢红包",
}
OpDetailName22 = {
1:"猜猜转存收益到总账号",
2:"赚转转存收益到总账号",
3:"赛车存收益到总账号",
}
OpDetailName23 = {
0:"购买会员",
1:"购买道具",
2:"扫雷",
3:"视频直播间点播",
4:"活动",
}
def getOpTypeName(func): #定义函数
name = OPTypeName.get(func)
if name == None:
return ""
else:
return name.decode('utf8')
def getOpDetailName(func, detail): #定义函数
if func == 19:
if detail > 10000 and detail < 20000:
return "包裹用物品扣人民币".decode('utf8')
elif detail > 20000 and detail < 30000:
return "包裹回滚".decode('utf8')
elif detail > 50000 and detail < 60000:
return "红包接龙".decode('utf8')
else:
name = OpDetailName19.get(detail)
if name == None:
return ""
else:
return name.decode('utf8')
elif func == 22:
name = OpDetailName22.get(detail)
if name == None:
return ""
else:
return name.decode('utf8')
elif func == 23:
name = OpDetailName23.get(detail)
if name == None:
return ""
else:
return name.decode('utf8')
else:
return ""
def getDayBillData(startday, endday): #定义函数
global allBillData
strday = startday.strftime("%Y%m%d")
print strday + '人民币数据'
df = spark.read.load("/home/sjshgame/logstatis/billdata"+strday+".parq")
df.show(10)
if allBillData == None:
allBillData = df
else:
allBillData = allBillData.unionAll(df)
return
"""
#df.createOrReplaceTempView('billdata')
#df.registerTempTable("billdata")
#sqlret = sqlc.sql("SELECT count(*) from billdata")
#sqlret.show(1)
df2 = df.withColumn('OPTYPENAME', udf(getOpTypeName)(df.OPTYPE))
df2.show(10)
df = df2.withColumn('DETAILNAME', udf(getOpDetailName)(df2.OPTYPE, df2.OPDETAIL))
df.show(10)
df.createOrReplaceTempView('billdata')
return
"""
allBillData=None
def getBillData():
#startday = datetime.date(2016, 12, 1)
#endday = datetime.date(2016, 12, 31)
startday = datetime.date(2017, 2, 1)
endday = datetime.date(2017, 2, 28)
td = endday - startday
datelen = td.days + 1
# 获取包裹数据
fromDayToDay(startday, datelen, getDayBillData)
global allBillData
allBillData.registerTempTable('billdata')
"""
#保存合并后的数据
strmonth = startday.strftime("%Y%m")
allBillData.write.parquet("/home/sjshgame/logstatis/billdata"+strmonth+".parq")
allBillData = spark.read.load("/home/sjshgame/logstatis/billdata"+strmonth+".parq")
allBillData.registerTempTable("billdata")
df=spark.sql("select OPTYPE,DUBIOPTYPE,OPDETAIL, sum(DUBIOPNUM) as sumnum from billdata group by OPTYPE,DUBIOPTYPE,OPDETAIL order by sumnum desc")
df.show(5)
df2 = df.withColumn('OPTYPENAME', udf(getOpTypeName)(df.OPTYPE))
df2.show(5)
df = df2.withColumn('DETAILNAME', udf(getOpDetailName)(df2.OPTYPE, df2.OPDETAIL))
df.show(5)
df.registerTempTable("billdatastatis")
"""
print 'getBillData finish'
# 获取充值数据
def getChargeInfo(startday, endday):
#数据库连接参数
dbconfig = {'host':'172.17.1.104',
'port': 3306,
'user':'sjshgame',
'passwd':'KKSLLuhsjgyHGFqDxhG4d',
'db':'DUIMDB',
'charset':'utf8'}
#连接数据库,创建这个类的实例
mysql_cn= MySQLdb.connect(host=dbconfig['host'], port=dbconfig['port'],user=dbconfig['user'], passwd=dbconfig['passwd'], db=dbconfig['db'])
strday = startday.strftime("%Y%m%d")
tsstart=time.mktime(startday.timetuple())
tsend=time.mktime(endday.timetuple())
regdata = pd.DataFrame()
for i in range(0, 20):
sql = "SELECT * FROM `USERCONSUMPTIONRECORD%d` where TIME > %d AND TIME < %d" % (i, tsstart, tsend)
print sql
#pddf = pd.DataFrame()
pddf = pd.read_sql(sql, con=mysql_cn)
#print pddf.head(5)
if len(pddf.index) > 0:
regdata = regdata.append(pddf,ignore_index=True)
print regdata.tail(5)
if len(regdata.index) > 0:
print regdata.describe()
write("/home/sjshgame/logstatis/register"+strday+".parq", regdata)
mysql_cn.close()
return
def pudf(x):
return getOpTypeName(x.OPTYPE)
def getMergeData(strday):
dfbill = ParquetFile("/home/sjshgame/logstatis/billdata"+strday+".parq").to_pandas()
dfwifireg = ParquetFile("/home/sjshgame/logstatis/wifiphonereg"+strday+".parq").to_pandas()
tempdf = pd.merge(dfbill, dfwifireg, left_on='CONSUMERID', right_on='USERID')
#write("/home/sjshgame/logstatis/analyze"+strday+".parq", tempdf)
#print tempdf.head(10)
tempdf['OPTYPENAME'] = tempdf.apply(lambda x:getOpTypeName(x.OPTYPE), axis=1)
#print tempdf.head(10)
tempdf['DETAILNAME'] = tempdf.apply(lambda x:getOpDetailName(x.OPTYPE,x.OPDETAIL), axis=1)
df = spark.createDataFrame(tempdf)
df.show(10)
return df
def analyzeDayBillData(startday, endday):
strday = startday.strftime("%Y%m%d")
print strday + '人民币数据'
df = spark.read.load("/home/sjshgame/logstatis/billdata"+strday+".parq")
dfwifireg = spark.read.load("/home/sjshgame/logstatis/wifiphonereg"+strday+".parq")
df3 = df.join(dfwifireg, df.CONSUMERID == dfwifireg.USERID)
df3.show(10)
df3.write.parquet("/home/sjshgame/logstatis/analyze"+strday+".parq")
#df2 = df3.withColumn('OPTYPENAME', udf(getOpTypeName)(df3.OPTYPE))
#df2.show(10)
#df = df2.withColumn('DETAILNAME', udf(getOpDetailName)(df2.OPTYPE, df2.OPDETAIL))
#df.show(10)
#df.createOrReplaceTempView('analyzebilldata')
return
def analyzeDayBillData2(startday, endday):
strday = startday.strftime("%Y%m%d")
print strday + '人民币数据'
#df = spark.read.load("/home/sjshgame/logstatis/analyze"+strday+".parq")
df = getMergeData(strday)
return
df2 = df.withColumn('OPTYPENAME', udf(getOpTypeName)(df.OPTYPE))
df2.show(10)
df = df2.withColumn('DETAILNAME', udf(getOpDetailName)(df2.OPTYPE, df2.OPDETAIL))
df.show(10)
df.createOrReplaceTempView('analyzebilldata')
return
def analyzeBillData():
startday = datetime.date(2016, 12, 28)
endday = datetime.date(2016, 12, 28)
td = endday - startday
datelen = td.days + 1
# 获取包裹数据
fromDayToDay(startday, datelen, analyzeDayBillData2)
print 'analyzeBillData finish'
#saveBillData()
getBillData()
#saveWifiPhoneReg()
#analyzeBillData()
======================================================================================================================
%pyspark
#查询用户机器ID
import sys
import MySQLdb
import pandas as pd
import datetime
optmap = {
'dbuser' : 'sjshgame',
'dbpass' : 'KKSLLuhsjgyHGFqDxhG4d',
'dbhost' : '172.17.1.105',
'dbport' : 3306,
'dbname' : 'GUIMDB'
}
def sql_select(reqsql):
try:
db_conn = MySQLdb.connect(user=optmap['dbuser'], passwd=optmap['dbpass'], host=optmap['dbhost'], port=optmap['dbport'], db=optmap['dbname'])
db_cursor=db_conn.cursor()
db_conn.query("use %s"%optmap['dbname'])
count = db_cursor.execute(reqsql)
ret = db_cursor.fetchall()
db_cursor.close()
db_conn.close
return ret
except MySQLdb.Error,e:
print "Mysql ERROR %d:%s" %(e.args[0], e.args[1])
return ''
def getusermid(userid, months): #定义函数
i = int(userid) % 50
reqsql = "select USERID,MACHINEID from LOGINHISTORY%s%u where USERID=%u group by MACHINEID" % (months,int(i), int(userid))
print reqsql
ret = sql_select(reqsql)
#print ret
#print ret[0]
return ret
def getall(userlist): #定义函数
today = datetime.date.today()
months = today.strftime("%Y%m")
userdata = pd.DataFrame(columns=('USERID', 'MACHINEID'))
index = 0
for userid in userlist:
coins = getusermid(userid, months)
for i in range(len(coins)):
#print coins[i]
userdata.loc[index] = (str(coins[i][0]), str(coins[i][1]))
index += 1
#print coins[0],coins[1]/100.0
#userdata.loc[index] = (str(coins[0]), coins[1]/100.0)
#index += 1
#print userdata.tail(10)
df = spark.createDataFrame(userdata)
#df.createOrReplaceTempView('userdata')
df.show(1000)
#这里填写用户号ID
userlist = [
67164289,
62640564,
25806997,
]
getall(userlist)
======================================================================================================================
#查询用户IP地址
import sys
import MySQLdb
import pandas as pd
import datetime
optmap = {
'dbuser' : 'sjshgame',
'dbpass' : 'KKSLLuhsjgyHGFqDxhG4d',
'dbhost' : '172.17.1.105',
'dbport' : 3306,
'dbname' : 'GUIMDB'
}
def sql_select(reqsql):
try:
db_conn = MySQLdb.connect(user=optmap['dbuser'], passwd=optmap['dbpass'], host=optmap['dbhost'], port=optmap['dbport'], db=optmap['dbname'])
db_cursor=db_conn.cursor()
db_conn.query("use %s"%optmap['dbname'])
count = db_cursor.execute(reqsql)
ret = db_cursor.fetchall()
db_cursor.close()
db_conn.close
return ret
except MySQLdb.Error,e:
print "Mysql ERROR %d:%s" %(e.args[0], e.args[1])
return ''
def getusermid(userid, months): #定义函数
i = int(userid) % 50
reqsql = "select USERID,IP from LOGINHISTORY%s%u where USERID=%u group by IP" % (months,int(i), int(userid))
print reqsql
ret = sql_select(reqsql)
#print ret
#print ret[0]
return ret
ipnum2str = lambda x: '.'.join([str(x/(256**i)%256) for i in range(3,-1,-1)])
ipstr2num = lambda x:sum([256**j*int(i) for j,i in enumerate(x.split('.')[::-1])])
def getall(userlist):
today = datetime.date.today()
months = today.strftime("%Y%m")
userdata = pd.DataFrame(columns=('USERID', 'IP'))
index = 0
for userid in userlist:
coins = getusermid(userid, months) #调用前面的函数
for i in range(len(coins)):
#print coins[i]
userdata.loc[index] = (str(coins[i][0]), ipnum2str(coins[i][1]))
index += 1
#print coins[0],coins[1]/100.0
#userdata.loc[index] = (str(coins[0]), coins[1]/100.0)
#index += 1
#print userdata.tail(10)
df = spark.createDataFrame(userdata)
#df.createOrReplaceTempView('userdata')
df.show(1000)
#这里填写用户号ID
userlist = [
87592013,
21319743,
]
getall(userlist)
=========================================================================================
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import udf
import MySQLdb
import mysql_op
import datetime
import time
from mysql_op import MySQL
import pandas as pd
import numpy as np
from fastparquet import ParquetFile
from fastparquet import write
import os
def fromDayToDay(startdate, datelen, func):
delta = datetime.timedelta(days=1)
for i in range(0,datelen):
startday = startdate + delta * i
endday = startdate + delta * (i + 1)
func(startday, endday)
return
def fromDayToEndDay(startdate, datelen, endday, func):
delta = datetime.timedelta(days=1)
for i in range(0,datelen):
startday = startdate + delta * i
#endday = startdate + delta * (i + 1)
func(startday, endday)
return
# 获取包裹数据
def saveDayPackageData(startday, endday): #定义函数
strday = startday.strftime("%Y%m%d")
if os.path.exists("/home/sjshgame/logstatis/packagedata"+strday+".parq"):
return
#数据库连接参数
dbconfig = {'host':'172.17.1.12',
'port': 3306,
'user':'sjshgame',
'passwd':'KKSLLuhsjgyHGFqDxhG4d',
'db':'JIESUANDB',
'charset':'utf8'}
#连接数据库,创建这个类的实例
mysql_cn= MySQLdb.connect(host=dbconfig['host'], port=dbconfig['port'],user=dbconfig['user'], passwd=dbconfig['passwd'], db=dbconfig['db'])
#strday = startday.strftime("%Y%m%d")
tsstart=time.mktime(startday.timetuple())
tsend=time.mktime(endday.timetuple())
strdate = startday.strftime("%y%m%d")
sql = "SELECT FUNCTION,IDX,UID,OLD,OP,NUM,NEW,`ID` FROM `BAOGUOTONGJI_%s`" % (strdate)
print sql
pddf = pd.read_sql(sql, con=mysql_cn)
mysql_cn.close()
print pddf.head(5)
dflen = len(pddf.index)
if dflen > 0:
print pddf.describe()
write("/home/sjshgame/logstatis/packagedata"+strday+".parq", pddf)
return
PackageFuncName = {
0:"包裹过期",
1:"赚券、赚票",
2:"砸蛋",
6:"镖",
7:"激活星级(通用消费人民币)",
}
PackageItemName = {
1:"喊话卡",
2:"免费糖",
3:"个人积分卡",
4:"赞",
5:"真好听",
6:"牛B",
}
def getPackageFuncName(func):
name = PackageFuncName.get(func)
if name == None:
return str(func)
else:
return name.decode('utf8')
def getPackageItemName(func):
name = PackageItemName.get(func)
if name == None:
return str(func)
else:
return name.decode('utf8')
def savePackageData():
startday = datetime.date(2017, 5, 17)
endday = datetime.date(2017, 6, 14)
td = endday - startday
datelen = td.days + 1
# 获取包裹数据
fromDayToDay(startday, datelen, saveDayPackageData)
def getDayPackageData(startday, endday):
strday = startday.strftime("%Y%m%d")
print strday + '包裹数据'
df = spark.read.load("/home/sjshgame/logstatis/packagedata"+strday+".parq")
#df.show(10)
#df.createOrReplaceTempView('packagedata')
#df.registerTempTable("packagedata")
#sqlret = sqlc.sql("SELECT count(*) from packagedata")
#sqlret.show(1)
df2 = df.withColumn('FUNCNAME', udf(getPackageFuncName)(df.FUNCTION))
#df2.show(10)
df = df2.withColumn('ITEMNAME', udf(getPackageItemName)(df2.ID))
df.show(10)
df.createOrReplaceTempView('packagedata')
return
def getPackageData(): #定义函数
startday = datetime.date(2017, 5, 17)
endday = datetime.date(2017, 6, 14)
td = endday - startday
datelen = td.days + 1
# 获取包裹数据
fromDayToDay(startday, datelen, getDayPackageData)
print 'getPackageData finish'
def todayPackageData(): #定义函数
today = datetime.date.today()
# today = datetime.date(2017,7,30)
yesterday = today - datetime.timedelta(days=1)
fromDayToDay(yesterday, 1, saveDayPackageData)
fromDayToDay(yesterday, 1, getDayPackageData)
#savePackageData()
#getPackageData()
todayPackageData()
======================================================================================================================
from pyspark.sql import Row
import MySQLdb
import mysql_op
import datetime
import time
import os
from mysql_op import MySQL
import pandas as pd
import numpy as np
from fastparquet import ParquetFile
from fastparquet import write
def fromDayToDay(startdate, datelen, func):
delta = datetime.timedelta(days=1)
for i in range(0,datelen):
startday = startdate + delta * i
endday = startdate + delta * (i + 1)
func(startday, endday)
return
def fromDayToEndDay(startdate, datelen, endday, func):
delta = datetime.timedelta(days=1)
for i in range(0,datelen):
startday = startdate + delta * i
#endday = startdate + delta * (i + 1)
func(startday, endday)
return
# 获取注册数据
def dayRegisterUser(startday, endday):
#数据库连接参数
dbconfig = {'host':'172.17.1.105',
'port': 3306,
'user':'sjshgame',
'passwd':'KKSLLuhsjgyHGFqDxhG4d',
'db':'OTHERDB',
'charset':'utf8'}
#连接数据库,创建这个类的实例
mysql_cn= MySQLdb.connect(host=dbconfig['host'], port=dbconfig['port'],user=dbconfig['user'], passwd=dbconfig['passwd'], db=dbconfig['db'])
strday = startday.strftime("%Y%m%d")
tsstart=time.mktime(startday.timetuple())
tsend=time.mktime(endday.timetuple())
regdata = pd.DataFrame()
sql = "select USERID from `NEW_WEB_USER` where TIME >= %d AND TIME < %d and TYPE=17" % (tsstart, tsend)
print sql
#pddf = pd.DataFrame()
pddf = pd.read_sql(sql, con=mysql_cn)
#print pddf.head(5)
if len(pddf.index) > 0:
regdata = regdata.append(pddf,ignore_index=True)
print regdata.tail(5)
if len(regdata.index) > 0:
print regdata.describe()
write("/home/sjshgame/logstatis/wifiregister"+strday+".parq", regdata)
mysql_cn.close()
return
def allRegisterUser(endday):
#数据库连接参数
dbconfig = {'host':'172.17.1.105',
'port': 3306,
'user':'sjshgame',
'passwd':'KKSLLuhsjgyHGFqDxhG4d',
'db':'OTHERDB',
'charset':'utf8'}
#连接数据库,创建这个类的实例
mysql_cn= MySQLdb.connect(host=dbconfig['host'], port=dbconfig['port'],user=dbconfig['user'], passwd=dbconfig['passwd'], db=dbconfig['db'])
tsend=time.mktime(endday.timetuple())
sql = "select USERID from `NEW_WEB_USER` where TIME < %d and TYPE=17" % (tsend)
print sql
#pddf = pd.DataFrame()
pddf = pd.read_sql(sql, con=mysql_cn)
print pddf.head(5)
if len(pddf.index) > 0:
print pddf.describe()
write("/home/sjshgame/logstatis/wifiallregister.parq", pddf)
mysql_cn.close()
return
# 获取登陆数据
def dayLoginUser(startday, endday): #定义函数
strday = startday.strftime("%Y%m%d")
if os.path.exists("/home/sjshgame/logstatis/wifilogin"+strday+".parq"):
return
#数据库连接参数
dbconfig = {'host':'172.17.1.105',
'port': 3306,
'user':'sjshgame',
'passwd':'KKSLLuhsjgyHGFqDxhG4d',
'db':'GUIMDB',
'charset':'utf8'}
#连接数据库,创建这个类的实例
mysql_cn= MySQLdb.connect(host=dbconfig['host'], port=dbconfig['port'],user=dbconfig['user'], passwd=dbconfig['passwd'], db=dbconfig['db'])
strmonth = startday.strftime("%Y%m")
tsstart=time.mktime(startday.timetuple())
tsend=time.mktime(endday.timetuple())
regdata = pd.DataFrame()
for i in range(0, 50):
sql = "SELECT USERID,ONLINETIME FROM `LOGINHISTORY%s%d` where ONLINETIME >= %d AND ONLINETIME < %d group by USERID" % (strmonth,i, tsstart, tsend)
print sql
#pddf = pd.DataFrame()
pddf = pd.read_sql(sql, con=mysql_cn)
#print pddf.head(5)
if len(pddf.index) > 0:
regdata = regdata.append(pddf,ignore_index=True)
#print regdata.tail(5)
if len(regdata.index) > 0:
print regdata.describe()
write("/home/sjshgame/logstatis/wifilogin"+strday+".parq", regdata)
mysql_cn.close()
return
# 获取留存
def getLoginData(startday, endday):
strday = startday.strftime("%Y%m%d")
print strday + '留存'
# 获取当天注册数据
pf = ParquetFile("/home/sjshgame/logstatis/wifiregister"+strday+".parq")
regdata = pf.to_pandas()
regcount = len(regdata.index)
#print regdata.head(5)
#leftdata = pd.DataFrame(columns=('0', '1', '2','3','4','5','6','7','8','9','10','11','12','13','14','15'))
#leftdata = pd.DataFrame()
global leftdata
tempdata = {}
tempdata['day']=startday.strftime("%Y-%m-%d")
tempdata['reg']=regcount
#for j in range(0, 16):
# tempdata[str(j)] = 0
delta = datetime.timedelta(days=1)
td = endday - startday
datelen = td.days + 1
for i in range(0,datelen):
startdate = startday + delta * i
strdate = startdate.strftime("%Y%m%d")
pf = ParquetFile("/home/sjshgame/logstatis/login"+strdate+".parq")
logindata = pf.to_pandas()
tempdf = pd.merge(regdata, logindata, left_on='ID', right_on='USERID')
logincount = len(tempdf)
#print strdate, regcount, logincount, logincount*100.0/regcount
tempdata[str(i)] = logincount*100.0/regcount
#print tempdata
leftdata.loc[strday] = pd.Series(tempdata)
#leftdata[strday] = pd.Series(tempdata)
#print leftdata.tail(10)
return
"""
pf = ParquetFile("/home/sjshgame/logstatis/login"+strday+".parq")
logindata = pf.to_pandas()
leftdata = pd.merge(regdata, logindata, left_on='ID', right_on='USERID')
print leftdata.tail(5)
logincount = len(leftdata.index)
print regcount, logincount, logincount/regcount
"""
def saveDayInfo():
# 获取时间
startday = datetime.date(2016, 12, 1)
endday = datetime.date(2016, 12, 1)
td = endday - startday
datelen = td.days + 1
# 获取注册用户
fromDayToDay(startday, datelen, dayRegisterUser)
# 获取登录用户
fromDayToDay(startday, datelen, dayLoginUser)
#leftdata = pd.DataFrame()
leftdata = pd.DataFrame(columns=('day','reg','0', '1', '2','3','4','5','6','7','8','9','10','11','12','13','14','15'))
def getLeftData():
# 获取留存用户
startday = datetime.date(2016, 11, 1)
endday = datetime.date(2016, 12, 1)
td = endday - startday
datelen = td.days + 1
fromDayToEndDay(startday, datelen, endday, getLoginData)
#getLoginData(startday, endday)
#getLoginData(datetime.date(2016, 11, 9), endday)
df = spark.createDataFrame(leftdata)
df.createOrReplaceTempView('leftdata')
def todayLeftData():
today = datetime.date.today()
#yesterday = today - datetime.timedelta(days=1)
yesterday = datetime.date(2016, 12, 13)
fromDayToDay(yesterday, 1, dayLoginUser)
allRegisterUser(yesterday)
analyzeDayBillData(yesterday)
#lastMonthday = yesterday - datetime.timedelta(days=30)
#fromDayToDay(lastMonthday, 30, getDayPackageData)
def analyzeDayBillData(startday):
strday = startday.strftime("%Y%m%d")
print strday + 'wifi活跃数据'
df = spark.read.load("/home/sjshgame/logstatis/wifilogin"+strday+".parq")
dfwifireg = spark.read.load("/home/sjshgame/logstatis/wifiallregister.parq")
df = df.join(dfwifireg, df.USERID == dfwifireg.USERID).select(df.USERID, df.ONLINETIME)
df.show()
df.createOrReplaceTempView('analyzedata')
#df3.write.parquet("/home/sjshgame/logstatis/analyze"+strday+".parq")
#df2 = df3.withColumn('OPTYPENAME', udf(getOpTypeName)(df3.OPTYPE))
#df2.show(10)
#df = df2.withColumn('DETAILNAME', udf(getOpDetailName)(df2.OPTYPE, df2.OPDETAIL))
#df.show(10)
#df.createOrReplaceTempView('analyzebilldata')
return
saveDayInfo()
#getLeftData()
#allRegisterUser(datetime.date(2016, 12, 1))
analyzeDayBillData(datetime.date(2016, 12, 1))
#todayLeftData()
=================================================================================================================
%pyspark
#频道签约用户信息
import sys
import MySQLdb
import pandas as pd
import datetime
import time
import urllib
import urllib2
optmap = {
'dbuser' : 'game',
'dbpass' : 'g6',
'dbhost' : '172.12.10.8',
'dbport' : 3306,
'dbname' : 'DDIMDB'
}
def sql_select(reqsql): #定义函数
try:
db_conn = MySQLdb.connect(user=optmap['dbuser'], passwd=optmap['dbpass'], host=optmap['dbhost'], port=optmap['dbport'], db=optmap['dbname'])
db_cursor=db_conn.cursor()
db_conn.query("use %s"%optmap['dbname'])
count = db_cursor.execute(reqsql)
ret = db_cursor.fetchall()
db_cursor.close()
db_conn.close
return ret
except MySQLdb.Error,e:
print "Mysql ERROR %d:%s" %(e.args[0], e.args[1])
return ''
#查询一个的平台的签约客户
def getuser(channelid): #定义函数
reqsql = "select * from PERFORMERINFO where CHANNELID=%u" %(int(channelid))
print reqsql
ret = sql_select(reqsql) #调用函数
#print ret
#print ret[0],ret[1]
#print ret[0][0] ,ret[1][0] ,ret[2][0] ,ret[3][0]
#print len(ret)
return ret
#查询客户的当前人民币
def gettime(userid): #定义函数
i = int(userid) % 10
#reqsql = "select sum(DURATION)/3600 from PERFORMERSHOWTIMERECORD%u where STARTTIME>=unix_timestamp('2017-07-14 00:00:00') and STARTTIME <=unix_timestamp('2017-08-14 00:00:00') and PERFORMERID=%u" %(i, int(userid))
reqsql = "select GOLDREFER/100 from CHARCOIN%u where ID=%u" %(i, int(userid))
#print reqsql
ret1 = sql_select(reqsql) #调用函数
#print ret1
if len(ret1) > 0:
#通过返回值长度判断返回值是否为空
return ret1[0][0]
else:
return None
ret = getuser(3830989) #调用函数
for i in ret :
#print "i===" ,i
userid = i[0]
print userid
ret1 = gettime(userid) #调用函数
print userid ,str(ret1)
=====================================================================================================================
def getincome(startday,endday,userid): #定义函数
begin = datetime.datetime.strptime(startday,'%Y-%m-%d')
end = datetime.datetime.strptime(endday,'%Y-%m-%d')
goldsum = 0
for i in range((end - begin).days+1):
#print i
day = begin + datetime.timedelta(days=i)
day1 = day.strftime('%y%m%d')
#下面一句用于查询分成后客户的收入
#reqsql = "select sum(SINGERRECVGOLD)/100 from `JIESUANTONGJI_%s` where SINGERID=%u" % (day1, int(userid))
#下面一句用于查询分成前用户消费多少人民币
reqsql = "select sum(CONSUMECOIN)/100 from `JIESUANTONGJI_%s` where SINGERID=%u" % (day1, int(userid))
#print reqsql
ret = sql_select(reqsql) #调用之前的函数
print day1, ret[0][0]
if ret[0][0] is not None:
goldsum = goldsum + float(ret[0][0])
print goldsum
#return float(ret[0][0])/100
getincome('2017-06-25','2017-07-24',92270525) #调用之前的函数
=================================================================================================================
def getincome(startday,endday,userid): #定义函数
begin = datetime.datetime.strptime(startday,'%Y-%m-%d')
end = datetime.datetime.strptime(endday,'%Y-%m-%d')
goldsum = 0
i = int(userid) % 10
#reqsql = "select sum(DURATION)/3600 from PERFORMERSHOWTIMERECORD%u where PERFORMERID=%u and STARTTIME>=unix_timestamp('%s') and STARTTIME
'%%Y-%%m-%%d') as daytime ,sum(DURATION)/3600 from PERFORMERSHOWTIMERECORD%u where PERFORMERID=%u and STARTTIME>=unix_timestamp('%s') and STARTTIME
for m in ret:
print m[0] ,m[1]
if m[1] is not None:
goldsum = goldsum + float(m[1])
print goldsum
#return float(ret[0][0])/100
getincome('2017-08-10','2017-08-15',23154709) #调用函数
连接数据库也可以这样写
def sql_select(reqsql):
ret = ''
try:
db_conn = mysql.connector.connect(user=optmap['dbuser'], password=optmap['dbpass'], host=optmap['dbhost'], port=optmap['dbport'], database=optmap['dbname'])
db_cursor=db_conn.cursor()
count = db_cursor.execute(reqsql)
ret = db_cursor.fetchall()
except mysql.connector.Error as e:
print ('Error : {}'.format(e))
finally:
db_cursor.close()
db_conn.close
return ret
==============================================================================================
#!/usr/bin/python
#-*-coding:utf-8-*-
import sys
import MySQLdb
import datetime
import random
import fnmatch
import random
import os, sys, re,string
import time, tarfile,getopt
import calendar
import fnmatch
def get_files(dir, pattern): #定义函数遍历目录
res_file_list =[]
if os.path.exists(dir):
cur_file_list = os.listdir(dir)
for file_name in cur_file_list:
if re.search(pattern, file_name):
res_file_list.append(file_name)
return res_file_list
else:
return 'no'
begin = datetime.date(2017,8,10)
end = datetime.date(2017,8,11)
#print "difference :",(end - begin).days
userlist = [] #定义空列表
for i in range((end - begin).days+1):
day = begin + datetime.timedelta(days=i)
#print "second :" ,str(day)
day1 = day.strftime('%y%m%d')
print "day1=" ,day1
dirname = "/home/haoren/logdir/%s_34/" %day1
#print dirname
ret = get_files(dirname,'gameappserver') #返回一个列表
ret.sort() #对列表进行排序
#print ret
for file2 in ret: #循环每一个文件
print file2
f = open(dirname+file2 , 'r')
#line = f.readline()
#line = line[:-1]
#print line
#userlist = []
for a_line in f.readlines():
#170703-00:00:26 GameAppServer[17400] INFO: [GiftRoll2] [10461539]用户(21441172)增加人民币(1000)成功,剩余人民币(16377)
m = re.search("^(\S+) GameAppServer\[\d+\] INFO: \[GiftRoll2\] \[\d+\]用户\((\d+)\)增加人民币\((\d+)\)成功,剩余人民币\((\d+)\)",a_line) #正则匹配
if m:
#print m.group(0)
userid = int(m.group(2))
if (userid not in userlist):
userlist.append(userid)
print userlist
print len(userlist)
=============================================================================================================
#!/usr/bin/python
#-*-coding:utf-8-*-
import sys
import MySQLdb
import datetime
import random
import fnmatch
import random
import os, sys, re,string
import time, tarfile,getopt
import calendar
import fnmatch
def get_files(dir, pattern): #定义一个函数,遍历文件夹
res_file_list =[]
if os.path.exists(dir):
cur_file_list = os.listdir(dir)
for file_name in cur_file_list:
if re.search(pattern, file_name):
res_file_list.append(file_name)
return res_file_list
else:
return 'no'
begin = datetime.date(2017,8,10)
end = datetime.date(2017,8,10)
#print "difference :",(end - begin).days
userlist = {} #定义空字典
for i in range((end - begin).days+1):
day = begin + datetime.timedelta(days=i)
print "second :" ,str(day)
day1 = day.strftime('%y%m%d')
print "day1=" ,day1
dirname = "/home/haoren/logdir/%s_34/" %day1
print dirname
ret = get_files(dirname,'gameappserver') #调用函数
ret.sort() #列表排序
#print ret
for file2 in ret:
print file2
f = open(dirname+file2 , 'r') #调用函数打开文件
#line = f.readline()
#line = line[:-1]
#print line
#userlist = []
for a_line in f.readlines():
#170703-00:00:26 GameAppServer[17400] INFO: [GiftRoll2] [10461539]用户(21441172)增加人民币(1000)成功,剩余人民币(16377)
m = re.search("^(\S+) GameAppServer\[\d+\] INFO: \[GiftRoll2\] \[\d+\]用户\((\d+)\)增加人民币\((\d+)\)成功,剩余人民币\((\d+)\)",a_line)
if m:
#print m.group(0)
singerid = int(m.group(2))
gold = int(m.group(3))
if (singerid in userlist):
userlist[singerid] += gold
else:
userlist[singerid] = gold
#print "第一次打印字典\n"
#print userlist.items()
#print "第二次打印字典\n"
#for k in userlist :
# print "userlist[%s] =" %k,userlist[k]
#print "第三次打印字典\n"
#for (k ,v) in userlist.items():
# print "userlist[%s] =" %k,v
#print "第四次打印字典"
#print userlist.iteritems()
#for (k,v) in userlist.iteritems():
# print "userlist[%s]=" %k,v
#print "第五次打印字典\n"
#for (k,v) in zip(userlist.iterkeys(),userlist.itervalues()):
#
# print "userlist[%s]=" %k,v
print "第六次打印字典\n"
for (k ,v) in userlist.items():
print k,v
#按key排序
print sorted(userlist.keys())
#按value排序
t = sorted(userlist.items(),key=lambda item:item[1],reverse=True)
n=1 #打印第一行
for m in t:
print n, m[0], ",", m[1] #打印行号
n += 1
if n > 100 : #取前100名
break
sorted(iterable,key,reverse),sorted一共有iterable,key,reverse这三个参数。
其中iterable表示可以迭代的对象,例如可以是dict.items()、dict.keys()等,key是一个函数,用来选取参与比较的元素,reverse则是用来指定排序是倒序还是顺序,reverse=true则是倒序,reverse=false时则是顺序,默认时reverse=false。
直接使用sorted(d.keys())就能按key值对字典排序,这里是按照顺序对key值排序的,如果想按照倒序排序的话,则只要将reverse置为true即可。
要对字典的value排序则需要用到key参数,在这里主要提供一种使用lambda表达式的方法
这里的d.items()实际上是将d转换为可迭代对象,迭代对象的元素为(‘lilee’,25)、(‘wangyan’,21)、(‘liqun’,32)、(‘lidaming’,19),items()方法将字典的元素转化为了元组,
而这里key参数对应的lambda表达式的意思则是选取元组中的第二个元素作为比较参数(如果写作key=lambda item:item[0]的话则是选取第一个元素作为比较对象,也就是key值作为比较对象。
lambda x:y中x表示输出参数,y表示lambda函数的返回值),所以采用这种方法可以对字典的value进行排序。注意排序后的返回值是一个list,而原字典中的名值对被转换为了list中的元组。