Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1093874
  • 博文数量: 165
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 1352
  • 用 户 组: 普通用户
  • 注册时间: 2016-03-11 14:13
个人简介

狂甩酷拽吊炸天

文章分类

全部博文(165)

文章存档

2024年(1)

2023年(1)

2022年(3)

2021年(4)

2020年(17)

2019年(37)

2018年(17)

2017年(35)

2016年(50)

分类: Python/Ruby

2017-06-07 17:56:26

用python实现微信机器人,可以监控到微信群里的聊天信息:
# -*- coding:utf-8 -*- """ python wx_robot.py 注意事项: 1.机器人运行期间不能该账号不能在手机退出,这样机器人会被强制下线 2.机器人运行期间不能再登录桌面微信或者web网页微信,这样机器人也会被强制下线 3.以上操作都会导致本地的cookie和其他信息失效,如果是实验阶段可以清除cookie.txt内容,删除robot.json文件,然后启动 """ import copy import re import json import os import sys import subprocess import time import random import urllib import xml.dom.minidom import httplib2

ROBOT_INFO_FILE = os.path.join(os.getcwd(), 'robot.json')
ROBOT_COOKIE_FILE = os.path.join(os.getcwd(), 'cookie.txt')

TIMEOUT = 30 WX_URLS = { 'jslogin': '%s', 'qrcode': '%s', 'login': '%s&uuid=%s&_=%s',
}
DEFAULT_HEADERS = { 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:18.0) Gecko/20100101 Firefox/18.0', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'en-us,en;q=0.5', 'Accept-Encoding': 'gzip', 'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', 'Keep-Alive': '115', 'Connection': 'keep-alive', 'Cache-Control': 'max-age=0' } def update_cookie(response): global DEFAULT_HEADERS if response.get('set-cookie'):
        cookie = re.sub(r'Domain.*?GMT,?', '', response['set-cookie'])
        DEFAULT_HEADERS['Cookie'] = cookie with open(ROBOT_COOKIE_FILE, 'wb') as fp:
            fp.write(DEFAULT_HEADERS['Cookie']) def http_post(url, params): global DEFAULT_HEADERS
    headers = copy.deepcopy(DEFAULT_HEADERS)
    headers['ContentType'] = 'application/json; charset=UTF-8'  conn = httplib2.Http(timeout=TIMEOUT)
    response, content = conn.request(uri=url, method='POST', headers=headers, body=json.dumps(params))
    update_cookie(response) return content def http_get(url):
    conn = httplib2.Http(timeout=TIMEOUT)
    response, content = conn.request(uri=url, method='GET', body=None, headers=DEFAULT_HEADERS)
    update_cookie(response) return content def get_icon(self, user_id):
    url = '%s' % user_id
    content = http_get(url)
    tmp_fn = 'img_%s.jpg' % user_id with open(tmp_fn, 'wb') as fp:
        fp.write(content) return tmp_fn class WeiXinRobot(object): def __init__(self): self.deviceId = 'e' + repr(random.random())[2:17] self.uuid = ''  self.redirect_uri = ''  self.base_uri = ''   self.skey = ''  self.sid = ''  self.uin = ''  self.pass_ticket = ''  self.BaseRequest = {} self.User = {} self.SyncKey = [] self.sync_key = ''  self.sync_host = ''   self.group_list = [] self.group_member_dict = {} def __json__(self):
        json_data = {} for k, v in self.__dict__.items(): if isinstance(v, (int, bool, str, unicode, list, dict)):
                json_data[k] = v return json_data def _echo(self, message, ends=''):
        sys.stdout.write(message + ends)
        sys.stdout.flush() def _run(self, msg, func, *args): self._echo(msg) if func(*args): self._echo(u'成功', '\n') else: self._echo(u'失败\n[*] 退出程序') exit() def get_uuid(self):
        params = { 'appid': 'wx782c26e4c19acffb', 'fun': 'new', 'lang': 'zh_CN', '_': int(time.time()),
        }
        content = http_get(WX_URLS['jslogin'] % urllib.urlencode(params))
        regex = r'window.QRLogin.code = (\d+); window.QRLogin.uuid = "(\S+?)"'  matches = re.search(regex, content) if matches and matches.group(1) == '200': self.uuid = matches.group(2) return True  return False def gen_qr_code(self):
        qr_code_path = os.path.join(os.getcwd(), 'qrcode.jpg')
        content = http_get(WX_URLS['qrcode'] % self.uuid) with open(qr_code_path, 'wb') as f:
            f.write(content) # if sys.platform == 'darwin':  #     subprocess.call(['open', qr_code_path])  # else:  #     subprocess.call(['xdg-open', qr_code_path])  return True   def wait_for_login(self, tip=1):
        time.sleep(tip)
        content = http_get(WX_URLS['login'] % (tip, self.uuid, str(time.time())))
        matches = re.search(r'window.code=(\d+);', content)
        code = int(matches.group(1)) if code == 201: # 手机扫描识别成功  return True  if code == 200: # 确认按钮点击成功  matches = re.search(r'window.redirect_uri="(\S+?)";', content) self.redirect_uri = matches.group(1) + '&fun=new'  self.base_uri = self.redirect_uri[:self.redirect_uri.rfind('/')] return True  if code == 408: self._echo('[登陆超时] ') return False self._echo('[登陆异常] ') return False def login(self):
        content = http_get(self.redirect_uri)
        doc = xml.dom.minidom.parseString(content)
        root = doc.documentElement for node in root.childNodes: if node.nodeName == 'skey': self.skey = node.childNodes[0].data elif node.nodeName == 'wxsid': self.sid = node.childNodes[0].data elif node.nodeName == 'wxuin': self.uin = node.childNodes[0].data elif node.nodeName == 'pass_ticket': self.pass_ticket = node.childNodes[0].data if all((self.skey, self.sid, self.uin, self.pass_ticket)): self.BaseRequest = {'Skey': self.skey, 'Sid': self.sid, 'Uin': int(self.uin), 'DeviceID': self.deviceId} return True  return False def wx_init(self):
        url = self.base_uri + '/webwxinit?pass_ticket=%s&skey=%s&r=%s' % (self.pass_ticket, self.skey, int(time.time()))
        content = http_post(url, {'BaseRequest': self.BaseRequest})
        json_data = json.loads(content) self.User = json_data['User'] #   self.SyncKey = json_data['SyncKey'] self.sync_key = '|'.join([str(item['Key']) + '_' + str(item['Val']) for item in self.SyncKey['List']]) return json_data['BaseResponse']['Ret'] == 0   def wx_notify(self):
        url = self.base_uri + '/webwxstatusnotify?lang=zh_CN&pass_ticket=%s' % self.pass_ticket
        params = { "Code": 3, 'BaseRequest': self.BaseRequest, "FromUserName": self.User['UserName'], "ToUserName": self.User['UserName'], "ClientMsgId": int(time.time())
        }
        content = http_post(url, params)
        json_data = json.loads(content) return json_data['BaseResponse']['Ret'] == 0   def wx_group_list(self):
        url = '%s/webwxgetcontact?pass_ticket=%s&skey=%s&r=%s' % ( self.base_uri, self.pass_ticket, self.skey, int(time.time()))
        content = http_get(url)
        json_data = json.loads(content) self.group_list = [member for member in json_data['MemberList'] if member['UserName'].startswith('@@')] return True   def wx_group_members(self, group_list=None):
        group_list = group_list or [{'UserName': g['UserName'], 'ChatRoomId': ''} for g in self.group_list]
        url = '%s/webwxbatchgetcontact?type=ex&r=%s&pass_ticket=%s' % ( self.base_uri, int(time.time()), self.pass_ticket)
        params = { 'BaseRequest': self.BaseRequest, 'Count': len(group_list), 'List': group_list
        }
        content = http_post(url, params)
        json_data = json.loads(content)
        old_group_list = [g['UserName'] for g in self.group_list] for group in json_data['ContactList']: self.group_member_dict[group['UserName']] = group['MemberList'] if group['UserName'] not in old_group_list: del group['MemberList'] self.group_list.append(group) if group_list: # TODO: 同步粉丝信息到数据库  with open(ROBOT_INFO_FILE, 'wb') as fp:
                fp.write(json.dumps(self.__json__())) return json_data['BaseResponse']['Ret'] == 0   def sync_check(self, host):
        params = { 'r': int(time.time()), 'sid': self.sid, 'uin': self.uin, 'skey': self.skey, 'deviceid': self.deviceId, 'synckey': self.sync_key, '_': int(time.time()),
        }
        url = 'https://%s/cgi-bin/mmwebwx-bin/synccheck?%s' % (host, urllib.urlencode(params))
        content = http_get(url)
        matches = re.search(r'window.synccheck=\{retcode:"(\d+)",selector:"(\d+)"\}', content) return matches.group(1), matches.group(2) def sync(self):
        sync_host_list = [ 'webpush.weixin.qq.com', 'webpush2.weixin.qq.com', 'webpush.wechat.com', 'webpush1.wechat.com', 'webpush2.wechat.com', 'webpush1.wechatapp.com', 'webpush.wechatapp.com'  ] for host in sync_host_list:
            retcode, selector = self.sync_check(host) if retcode == '0': self.sync_host = host return True   def wx_message_sync(self):
        url = self.base_uri + '/webwxsync?sid=%s&skey=%s&pass_ticket=%s' % (self.sid, self.skey, self.pass_ticket)
        params = { 'BaseRequest': self.BaseRequest, 'SyncKey': self.SyncKey, 'rr': ~int(time.time())
        }
        content = http_post(url, params)
        json_data = json.loads(content) if json_data['BaseResponse']['Ret'] == 0: # 更新同步键  self.SyncKey = json_data['SyncKey'] self.sync_key = '|'.join([str(item['Key']) + '_' + str(item['Val']) for item in self.SyncKey['List']]) return json_data def get_nickname(self, group_id, user_id=None, retry=3): while retry:
            retry -= 1  if user_id:
                target = user_id
                check_list = self.group_member_dict.get(group_id, []) else:
                target = group_id
                check_list = self.group_list for item in check_list: if item['UserName'] == target: return item['NickName'] # 没有记录就去服务器查一遍  self.wx_group_members([{'UserName': group_id, 'ChatRoomId': ''}]) def handle_message(self, json_data): if not json_data: return  for message in json_data['AddMsgList']:
            msg_type = message['MsgType']
            content = message['Content'].replace('<', '<').replace('>', '>')
            group_id = message['FromUserName'] if group_id.startswith('@@'):
                fans_id, content = content.split(':
')
                group_name = self.get_nickname(group_id)
                fans_name = self.get_nickname(group_id, fans_id) # group_id, fans_id, fans_name, current_time  if msg_type == 1: # 消息  self._echo(u'%s@%s: %s' % (fans_name, group_name, content), '\n') elif msg_type == 3: # 图片  self._echo(u'%s@%s: %s' % (fans_name, group_name, u'图片'), '\n') elif msg_type == 34: # 语音  self._echo(u'%s@%s: %s' % (fans_name, group_name, u'语音'), '\n') elif msg_type == 42: # 名片  self._echo(u'%s@%s: %s' % (fans_name, group_name, u'名片'), '\n') elif msg_type == 47: # 动画表情  self._echo(u'%s@%s: %s' % (fans_name, group_name, u'表情'), '\n') elif msg_type == 51: # 退出群  # TODO: 退出群聊,将该粉丝设置为已经退出  self._echo(u'%s@%s: %s' % (fans_name, group_name, u'退出群聊'), '\n') elif msg_type == 62: # 视频  self._echo(u'%s@%s: %s' % (fans_name, group_name, u'视频'), '\n') elif msg_type == 10000: # 进入群  self._echo(u'%s@%s: %s' % (fans_name, group_name, u'进入群聊'), '\n') self.wx_group_members([{'UserName': group_id, 'ChatRoomId': ''}]) elif msg_type == 10002: # 撤回消息  self._echo(u'%s@%s: %s' % (fans_name, group_name, u'撤回消息'), '\n') else: self._echo(u'%s@%s: 消息类型<%s>' % (fans_name, group_name, str(msg_type)), '\n') def wx_sync_loop(self): self._run(u'[*] 进入消息监听模式 ...', self.sync) while True:
            retcode, selector = self.sync_check(self.sync_host) if retcode == '0': if selector == '0': # 没有新消息, 也没有异常  time.sleep(1) elif selector == '2': # 有新消息  self.handle_message(self.wx_message_sync()) def main(self): global ROBOT_COOKIE_FILE, ROBOT_INFO_FILE, DEFAULT_HEADERS self._echo(u'微信机器人启动:\n') if os.path.exists(ROBOT_INFO_FILE): # 从文件加载初始化信息  with open(ROBOT_INFO_FILE) as fp:
                json_data = json.loads(fp.read()) for field, value in json_data.items(): self.__dict__[field] = value with open(ROBOT_COOKIE_FILE) as fp:
                DEFAULT_HEADERS['Cookie'] = fp.read() else: self._run(u'[*] 正在获取 uuid ... ', self.get_uuid) self._run(u'[*] 正在获取二维码 ... ', self.gen_qr_code) self._run(u'[*] 扫描二维码登录 ... ', self.wait_for_login) self._run(u'[*] 确认登录 ... ', self.wait_for_login, 0) self._run(u'[*] 正在登录 ... ', self.login) self._run(u'[*] 正在初始化 ... ', self.wx_init) self._run(u'[*] 正在开启通知 ... ', self.wx_notify) self._run(u'[*] 正在获取群聊列表 ... ', self.wx_group_list) self._run(u'[*] 正在获取各群成员 ... ', self.wx_group_members) with open(ROBOT_INFO_FILE, 'wb') as fp:
                fp.write(json.dumps(self.__json__())) self._echo(u'微信机器人成功启动!', '\n') try: self.wx_sync_loop() except KeyboardInterrupt: self._echo(u'微信机器人成功退出!', '\n') exit() if __name__ == '__main__':
    WeiXinRobot().main()


下面这部分是另一段代码,可以实现消息自动回复:
# -*- coding:utf-8 -*-  import requests import itchat # 这是一个用于微信回复的库  KEY = '8edce3ce905a4c1dbb965e6b35c3834d' # 这个key可以直接拿来用   # api发送请求 def get_response(msg):
    apiUrl = ''  data = { 'key': KEY, 'info': msg, 'userid': 'pth-robot',
    } try:
        r = requests.post(apiUrl, data=data).json() return r.get('text') except: return  # 注册方法 @itchat.msg_register(itchat.content.TEXT) def tuling_reply(msg): # 为了保证在图灵Key出现问题的时候仍旧可以回复,这里设置一个默认回复  defaultReply = 'I received: ' + msg['Text'] # 如果图灵Key出现问题,那么reply将会是None  reply = get_response(msg['Text']) # a or b的意思是,如果a有内容,那么返回a,否则返回b  return reply or defaultReply # 为了让修改程序不用多次扫码,使用热启动 itchat.auto_login(hotReload=True)
itchat.run()
  运行如图:

阅读(3460) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~