参考 : mysql doc
# 创建存储空间
$> python mysql_by_log.py "
create table test (
id int(11) unsigned NOT NULL AUTO_INCREMENT,
at datetime DEFAULT NULL ,
ab varchar(20) DEFAULT NULL ,
auid varchar(20) DEFAULT NULL ,
pv int DEFAULT NULL ,
PRIMARY KEY (id),
UNIQUE aaa (at,ab,auid)
);
"
# 插入数据
# mysql DUPLICATE KEY
$> python ./mysql_by_log.py '
insert into test values ( NULL,DATE_FORMAT( "#at#" , "%Y-%m-%d" ),"#ab#","#auid#",1) on duplicate key update pv = pv + 1 ;
' 'cat /data/tongji/iphone_app/imusic/app*/2010/09/20/*'
> insert_num = 65809 , insert_err_num = 2666
real 0m14.397s
user 0m1.928s
sys 0m0.980s
# 查询展现 用户去重复
$> time ./mysql_by_log.py ' select count( distinct auid ) from test '
> 2683
$> time cat /data/tongji/iphone_app/imusic/app*/2010/09/20/* |perl -nle ' print $1 if /auid=(.*?) ' |sort -u |wc -l
> 2684
import sys,os,re
import traceback
import MySQLdb
host='10.27.5.137'
user="root"
passwd='mysql'
db='ad'
port=3306
if __name__ == "__main__":
conn=MySQLdb.connect(host=host,user=user, passwd=passwd,db=db,port=port,charset="utf8")
c = conn.cursor()
if len(sys.argv)==2 :
# sql = create table ; select
sfile,sql = sys.argv
c.execute(sql)
for row in c:
tmp_row = ""
for col in row : tmp_row += "%s\t" % col
print tmp_row.encode("utf-8")
# Save (commit) the changes
conn.commit()
elif len(sys.argv)==3 :
# sql = insert by data_file
sfile,insert_sql,comm_popen = sys.argv
insert_num,insert_err_num,row_num = 0,0,0
err_row_num_arr = []
for row in os.popen( comm_popen ) :
t_insert_sql = insert_sql
row_num += 1
try :
tmp = {}
for cc in row.split('\n')[0].split('<|>'):
cd=cc.split('=')
if len(cd)==2 : tmp[cd[0]]=cd[1]
for kk in set( re.findall("#(.*?)#",t_insert_sql ) ) :
t_insert_sql = t_insert_sql.replace( "#"+kk+"#",tmp[kk] )
c.execute( t_insert_sql )
#for tsql in t_insert_sql.split(";") :
# c.execute( tsql )
insert_num += 1
except :
insert_err_num += 1
err_row_num_arr.append( str(row_num) )
#exceptionType, exceptionValue, exceptionTraceback = sys.exc_info()
#traceback.print_exception(exceptionType, exceptionValue, exceptionTraceback,limit=2)
if row_num % 100 == 0 : conn.commit()
else : conn.commit()
print " insert_num = %d , insert_err_num = %d " % (insert_num,insert_err_num)
#print " error_num = [ " + ",".join( err_row_num_arr ) + " ]"
# We can also close the cursor if we are done with it
c.close()
|
阅读(815) | 评论(1) | 转发(0) |