python编辑大文思路。
先通过f.open对象readline找到需要编辑开始和结束的字节数。
再通过mmap加偏移量映射这部分文件然后写回。
ucloud的登录图片比较简单,还是能实现自动登录的。
由于ucloud的数据库是备份全库的,所以需要编辑ucloud的数据库文件,将mysql库删除再倒入本地数据库,这就需要编辑大文件并修改,于是就有了上面的python编辑大文件。
导入的时候,可以用sed把所有imnndb替换为MyISAM加速导入
下面上代码(部分参数和url与ucloud机房有关,比如1001是ucloud A机房) 需要安装pyocr
-
#!/usr/bin/python
-
# -*- coding: UTF-8 -*-
-
-
import urllib
-
import urllib2
-
import cookielib
-
import shutil
-
import os
-
import sys
-
import re
-
import subprocess
-
import pyocr
-
import json
-
import Image
-
import time
-
#import pdb
-
import mmap
-
import pwd
-
-
def open_mainpage():
-
#打开登录页面,初始化opener
-
url_main_page = ''
-
cj = cookielib.LWPCookieJar()
-
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))
-
urllib2.install_opener(opener)
-
res = urllib2.Request(url_main_page)
-
req = opener.open(res)
-
return opener, req.read()
-
-
-
def get_lt_code(main_pages):
-
#从登录页面获取LT code,这是登录时需要post的一个value
-
filter_lt = re.compile('LT-[A-Za-z0-9]{20,40}')
-
page_text = main_pages.strip()
-
if filter_lt.search(page_text,re.X) is not None:
-
return filter_lt.search(page_text).group(0)
-
else:
-
#print main_pages
-
return ''
-
-
-
def get_verification_code(opener, verification_code_file = 'D:\\Downloads\\1.png'):
-
#获取验证码文件,并用pyocr解析验证码
-
url_verification_code = '/verification_code'
-
res = urllib2.Request(url_verification_code)
-
req = opener.open(res)
-
-
try:
-
if os.path.isfile(verification_code_file):os.remove(verification_code_file)
-
with open(verification_code_file, 'wb') as fp:
-
shutil.copyfileobj(req, fp)
-
except:
-
return None
-
tools = pyocr.get_available_tools()[:]
-
if len(tools) == 0:
-
print("No OCR tool found")
-
return None
-
print("Using [%s] to get key from image" % (tools[0].get_name()))
-
verification_code = tools[1].image_to_string(Image.open(verification_code_file))
-
return str(verification_code)
-
-
-
def post_login(opener, post_data):
-
url_login = ''
-
for i in range(5):
-
post_data['verify_code'] = get_verification_code(opener, '/tmp/1.png')
-
if post_data['verify_code'] is None:
-
continue
-
res = urllib2.Request(url_login, urllib.urlencode(post_data))
-
req = opener.open(res)
-
json_response = json.loads(req.read())
-
if json_response['ret_code'] == 0:
-
return True
-
else:
-
print json_response['ret_code']
-
#print json_response['error_message']
-
if i == 4:
-
return False
-
continue
-
-
-
def page_db(opener):
-
#切换到dbbackup页面
-
url_db = ''
-
headers = {'User-Agent' : 'Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; Win64; x64; Trident/6.0)'}
-
res = urllib2.Request(url_db, headers = headers)
-
req = opener.open(res)
-
#print req.read()
-
-
-
def get_db_list(opener=None, db_name_list=None):
-
#获取db列表
-
if db_name_list is None:
-
return None
-
db_list = None
-
#先查询本地db列表文件,如果存在不再去ucloud获取db列表
-
try:
-
f = open('./db.list')
-
db_list_text = f.readlines()
-
f.close()
-
db_list = json.loads(db_list_text[0].strip())['data']
-
except Exception,e:
-
print 'error',e
-
return None
-
-
#不存在本地db列表,去ucloud获取
-
if db_list is None or type(db_list) != type([]):
-
list_url = '%d' % int(time.time()*1000)
-
print 'list not ok! get url [%s]' % list_url
-
try:
-
#pdb.set_trace()
-
res = urllib2.Request(list_url)
-
req = opener.open(res)
-
#print 'req is ',req.read().strip()
-
json_text = req.read()
-
db_list = json.loads(json_text.strip())['data']
-
try:
-
f = open('./db.list','w')
-
f.writelines(json_text)
-
f.close()
-
except Exception, e:
-
print 'wtfffff',e
-
pass
-
except Exception,e:
-
print 'error open db_list url', e
-
-
if db_list is None or type(db_list) != type([]):
-
return None
-
-
db_dict = {}
-
for db in db_list:
-
if db['instance_name'] in db_name_list:
-
db_dict[db['instance_name']] = {}
-
db_dict[db['instance_name']]['db_id'] = db['db_id']
-
db_dict[db['instance_name']]['ip'] = db['virtual_ip']
-
db_dict[db['instance_name']]['port'] = db['port']
-
-
return db_dict
-
-
-
def get_down_url(opener, db_dict):
-
#获取db文件下载地址
-
for instance_name in db_dict.keys():
-
api_get_download_id = '%s&use_session=yes&format=json®ion_id=1001&zone_id=1&_=%d' % (db_dict[instance_name]['db_id'],int(time.time()*1000))
-
api_get_download = ''
-
#print api_get_download_id
-
back_list = None
-
try:
-
#pdb.set_trace()
-
res = urllib2.Request(api_get_download_id)
-
req = opener.open(res)
-
#print 'req is ',req.read().strip()
-
json_text = req.read()
-
back_list = json.loads(json_text.strip())['data']
-
except Exception, e:
-
print 'error open get download api',e
-
-
if back_list is None:
-
return None
-
-
post_data = {'backup_id':0, 'use_session':'yes', 'format':'json', 'region_id':1001, 'zone_id':1}
-
#print back_list[0],back_list[0].keys()
-
if back_list[0]['dbid'] == db_dict[instance_name]['db_id']:
-
post_data['backup_id'] = back_list[0]['id']
-
#pdb.set_trace()
-
headers = {'User-Agent' : 'Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; Win64; x64; Trident/6.0)',
-
'X-Requested-With': 'XMLHttpRequest',
-
'Referer': '',
-
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8'}
-
res = urllib2.Request(api_get_download, urllib.urlencode(post_data), headers = headers)
-
req = opener.open(res)
-
down_md5 = req.read()
-
#print len(down_md5)
-
if len(down_md5) == 40:
-
down_url = '%s' % down_md5
-
db_dict[instance_name]['down_url'] = down_url
-
-
-
def down_file_by_wget(db_dict):
-
#使用wget下载db文件
-
try:
-
shutil.rmtree('./tmp')
-
except:
-
pass
-
time.sleep(5)
-
for instance_name in db_dict.keys():
-
if 'down_url' in db_dict[instance_name].keys():
-
command = '/usr/bin/wget -q %s -O ./%s.tgz' % (db_dict[instance_name]['down_url'], instance_name)
-
down_process = subprocess.Popen(command,shell=True)
-
down_process.wait()
-
#print 'finsh download file %s,start untar' % instance_name
-
command = '/bin/tar -xf %s.tgz -C ./' % (instance_name)
-
#print command
-
untar_process = subprocess.Popen(command,shell=True)
-
untar_process.wait()
-
#print 'untar finsh'
-
-
-
def import_db(db_dict, max_serach_num=800,max_mysqldb_len=30):
-
#将sql文件编辑修改后倒入本地数据库
-
page_size = 4096
-
return_value = {'code':0, 'reason':'import database success'}
-
sql_files = os.listdir('./tmp/')
-
if len(sql_files) != len(db_dict.keys()):
-
print 'error'
-
for file_name in sql_files:
-
full_file_path = './tmp/' + file_name
-
# delte system database mysql from sql file
-
f = open(full_file_path,'r+')
-
count_len = 0
-
start_len = 0
-
end_len = 0
-
for i in range(max_serach_num):
-
line = f.readline()
-
count_len += len(line)
-
#获取创建mysql数据库的起始位置
-
if line[0:28] == "-- Current Database: `mysql`":
-
print 'fine mysql len is ' + str(count_len)
-
start_len = count_len
-
#获取创建mysql数据库的结束位置
-
if line[0:40] == "-- Dumping routines for database 'mysql'":
-
end_len = count_len
-
break
-
if i == max_serach_num - 1:
-
return_value['code'] = 1
-
return_value['reason'] = 'find mysql databse start or end line fail over max'
-
return return_value
-
if not end_len > start_len:
-
return_value['code'] = 1
-
return_value['reason'] = 'find mysql databse start or end line fail'
-
return return_value
-
-
#偏移量必须是pagesize的整数倍,补全下偏移量
-
completion_pagesize = start_len % page_size
-
print 'cp size is %d ' % completion_pagesize
-
start_len = start_len - completion_pagesize
-
mmap_len = end_len - start_len
-
#映射数据库文件 创建mysql的文件断
-
map = mmap.mmap(f.fileno(), length=mmap_len, access = mmap.ACCESS_WRITE, offset=start_len)
-
-
mark_start = 0
-
mark_end = 0
-
for i in range(max_mysqldb_len):
-
mark = map.tell()
-
line = map.readline()
-
print line
-
if line[0:28] == "-- Current Database: `mysql`":
-
#创建位置标记
-
mark_start = mark
-
if line[0:40] == "-- Dumping routines for database 'mysql'":
-
#结束位置标记
-
mark_end = map.tell()
-
break
-
if i == max_mysqldb_len - 1 :
-
return_value['code'] = 1
-
return_value['reason'] = 'mysql database too large!'
-
-
#头3字节转为注释符号
-
map[mark_start:mark_start+3] = '/*\n'
-
#用@填充
-
for k in range(mark_start+3, mark_end):
-
map[k] = '@'
-
#尾3字节添加注释符号
-
map[-3:mark_end] = '*/\n'
-
map.flush()
-
f.close()
-
-
print 'delete databae mysql.mysql finsh!!'
-
# change innodb to myisam
-
command = "/bin/sed -i s/^\)\ ENGINE=InnoDB/\)\ ENGINE=MyISAM/ %s" % full_file_path
-
print command
-
sed_process = subprocess.Popen(command,shell=True)
-
sed_process.wait()
-
# import sql file
-
command = '/usr/bin/mysql -uroot -S /var/lib/mysql/mysql_tmp.sock < %s' % full_file_path
-
import_process = subprocess.Popen(command,shell=True)
-
import_process.wait()
-
return return_value
-
-
-
-
def main():
-
curuid = os.getuid()
-
user_info = pwd.getpwuid(curuid)
-
home_dir = user_info[5]
-
user_name = user_info[0]
-
crontab_dir = home_dir + '/crontab_shell/'
-
try:
-
os.chdir(crontab_dir)
-
# print os.getcwd()
-
except:
-
return None
-
#return None
-
post_data = {}
-
post_data['username'] = '帐号'
-
post_data['password'] = '密码'
-
post_data['verify_code'] = ''
-
post_data['lt'] = ''
-
post_data['service'] = ''
-
-
global_openr = None
-
global_openr, main_pages = open_mainpage()
-
post_data['lt'] = get_lt_code(main_pages)
-
if not post_login(global_openr, post_data):
-
print 'get login faile'
-
sys.exit(1)
-
page_db(global_openr)
-
db_name_list = ('your db 1','your db 2')
-
db_dict = get_db_list(global_openr, db_name_list)
-
get_down_url(global_openr, db_dict)
-
print 'down load db file from ucloud'
-
down_file_by_wget(db_dict)
-
print 'download finsh start import db file'
-
print import_db(db_dict, 5000, 5000)
-
-
-
if __name__ == '__main__':
-
main()
pyorc需要安装下列东西,自己打包编译
阅读(2737) | 评论(0) | 转发(0) |