研究了一段时间OAuth,在SNS网站之间,OAuth用的很多,手机类应用也不少,但是在桌面程序中很少有使用到OAuth,至于CS架构的程序,至少我是没发现这样的应用了。
新浪的OAuth认证中,有提出一种桌面应用的OAuth认证实现,其原理是使用PIN码,即用户在登录成功之后记录下新浪反馈的一个PIN码,然后在客户端中输入这个PIN值,再用这个进行access token等后续的验证,这个方案相当的搓,试想,一个桌面应用程序,在登录完之后,再要输入一个PIN码,太不方便了,用户体验很差。
我研究了较长时间,最终设计出一个方案,可以实现CS架构模型下使用OAuth认证,这个架构有点一小复杂,也不是一般的程序能用的上的,但这个思路也许能给看本文的人一个思路。
在我的实现中,客户端程序发起登录请求,发起请求时产生一个唯一ID,请注意这个ID必须是绝对唯一的,我的DEMO中使用了UUID代替,我认为对UUID+特定字串做MD5处理,应该能达到要求。然后将这个唯一ID传给server端,同时打开一个IE界面,也将这个唯一ID传递给负责进行OAuth认证的我方网站程序。
server端在收到唯一ID后,保持和客户端的通讯,并可以不做反馈(在我的DEMO中为了简便,设计为客户端每1秒钟发起一次查询请求,server端收到后检查是否通过认证,并立即返回)
我方网站程序是核心,在接收到唯一ID后,构造request Token,向第三方网站发起Oauth认证,引导用户IE浏览器跳转至第三方网站,并最终令用户成功在第三方网站登录完成。
在我方网站最终登录完成之后作access token和唯一ID、网站用户名的绑定关系,并通过socket将唯一ID和网站用户名的对应关系发送给server端,server端在接收到这个信息后更新状态,并通知客户端此用户已经成功登录,即可以走后续业务流程了。
这个思路其实并不复杂,但走的路线比较曲折,附件是我用python写的DEMO,思路就是如上,client.py是模拟了一个客户端请求,其中也实现了模拟发成功登录请求的模块;server.py是模拟接入服务端;p.py是模拟网站程序,在成功OAuth认证完成之后向server.py发送成功登录的socket请求。
请注意p.py需要使用python -m CGIHTTPServer方式运行的,不懂的人自行看python的书吧。
本文中的代码如何运行其实并不重要,重要的是思路,程序我在python2.7,linux/windows平台下都测试通过了。
不熟悉本文的OAuth协议的,请自行google。
client.py
- #-*- coding:utf-8-*-
-
'''
-
Created on 2011-5-15
-
-
@author: HAWK.Li
-
'''
-
import uuid
-
import sys,os,time,subprocess
-
import socket
-
-
class Client():
-
"""封装客户端核心方法
-
"""
-
def login(self,tempid):
-
"""登录"""
-
-
-
try:
-
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-
self.sock.connect((host, port))
-
msg = 'login '+tempid
-
self.sock.sendall(msg)
-
self.response = self.sock.recv(sockSize)
-
#分割字串,result[0]为标识,result[1]为返回结果
-
result = self.response.split(' ')
-
if result[0] == 'success':
-
print "Login success! tempid(%s) is user(%s)!"%(tempid,result[1])
-
print "connect close."
-
global FLAG
-
FLAG = False
-
elif result[0] == 'again':
-
print "sleep 1 second and retry..."
-
elif result[0] == 'first':
-
print "First Login..."
-
else:
-
print "failed..."
-
#print self.response
-
return self.response
-
except:
-
print "登录出错!".decode('utf-8').encode("gbk")
-
print "Client.login Error:"+str(sys.exc_info()[0])+str(sys.exc_info()[1])
-
-
def finished(self,tempid):
-
"""模拟成功的socket通知"""
-
-
try:
-
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-
self.sock.connect((host, port))
-
username = "Andy"
-
msg = 'finished '+tempid+" "+username
-
self.sock.sendall(msg)
-
self.response = self.sock.recv(sockSize)
-
#分割字串,result[0]为标识,result[1]为返回结果
-
result = self.response.split(' ')
-
print result
-
except:
-
print "模拟通知出错!".decode('utf-8').encode("gbk")
-
print "finished Error:"+str(sys.exc_info()[0])+str(sys.exc_info()[1])
-
-
def listid(self):
-
"""列出服务器上的uuid和用户名的对应关系表"""
-
-
-
try:
-
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-
self.sock.connect((host, port))
-
msg = 'list '
-
self.sock.sendall(msg)
-
self.response = self.sock.recv(sockSize)
-
result = self.response
-
print result
-
except:
-
print "列表出错!".decode('utf-8').encode("gbk")
-
print "listid Error:"+str(sys.exc_info()[0])+str(sys.exc_info()[1])
-
-
def closeSock(self):
-
self.sock.close()
-
-
-
if __name__ == "__main__":
-
global host,port,client,sockSize
-
sockSize = 64000
-
uid = str(uuid.uuid1())
- host = '127.0.0.1'
-
port = int('1234')
-
-
client = Client()
- #这里的requestToken.py文件并不重要,我也没写
-
p = subprocess.Popen('"C:\Program Files\Internet Explorer\iexplore.exe" '+uid)
-
-
#退出标记
-
FLAG = True
-
-
if len(sys.argv) >= 2:
-
#登录成功的标记
-
if sys.argv[1] == 'finished':
-
existid = sys.argv[2]
-
r = client.finished(existid)
-
elif sys.argv[1] == 'list':
-
r = client.listid()
-
else:
-
print 'Send tempid('+uid+') to LoginServer...'
-
for i in range(120):
-
if FLAG:
-
r = client.login(uid)
-
time.sleep(1)
-
else:
-
break
server.py
- #-*- coding:utf-8-*-
-
'''
-
Created on 2011-5-15
-
-
@author: HAWK.Li
-
'''
-
import uuid
-
import sys,os,time,subprocess
-
import SocketServer,socket
-
import time,datetime
-
import threading
-
-
-
class MyTCPHandler(SocketServer.StreamRequestHandler):
-
def handle(self):
-
uid = None
-
-
while True:
-
self.data = self.request.recv(sockSize)
-
cur_thread = threading.currentThread()
-
cmd = self.data
-
if cmd == None or len(cmd) == 0:
-
break;
-
recvStr = cmd.split(' ')
-
#print recvStr
-
# try:
-
if cmd.startswith('login'):
-
firstflag = True
-
uid = recvStr[1]
-
print "RECV from ", self.client_address[0],uid
-
for key in ids.keys():
-
#检查登录的ID是否已经在列表中
-
if key == uid:
-
firstflag = False
-
#检查登录的ID是否已经通过用户名验证,是None则告诉客户端还没验证通过
-
if ids[key] == None:
-
result = "again %s"%(uid)
-
else:
-
result = "success %s"%(ids[key])
-
continue
-
if firstflag:
-
#说明这个客户端第一次登录
-
result = "first %s"%(uid)
-
username=None
-
ids[uid] = username
-
-
print result
-
print ids
-
self.request.sendall(result)
-
#收到用户名成功验证的通知
-
elif cmd.startswith('finished'):
-
uid = recvStr[1]
-
username = recvStr[2]
-
ids[uid] = username
-
self.request.sendall('OK')
-
elif cmd.startswith('list'):
-
self.request.sendall(str(ids))
-
else:
-
result = 'error'
-
# self.wfile.write(result)
-
# self.wfile.write('\n')
-
# except:
-
# print 'error'
-
break
-
try:
-
if uid != None:
-
ids.remove(uid)
-
except:
-
pass
-
#print uid, ' closed.'
-
-
class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
-
#socket.setdefaulttimeout(5)
-
pass
-
-
if __name__ == "__main__":
-
HOST = ''
-
PORT = int('1234')
-
sockSize = 64000
-
ids = {'id':None}
-
print 'login server start...'
-
server = ThreadedTCPServer((HOST, PORT), MyTCPHandler)
-
server_thread = threading.Thread(target=server.serve_forever)
-
server_thread.setDaemon(True)
-
server_thread.start()
-
server.serve_forever()
f.py
这个是一个CGI程序,需要使用python -m CGIHTTPServer方式运行,并放在cgi-bin目录下,给他执行权限
- #!/usr/bin/env python
-
-
import cgi
-
import uuid
-
import sys,os,time,subprocess
-
import socket
-
-
#CGIAuth DEMO
-
header = 'Content-Type: text/html\n\n'
-
-
formhtml = '''
- CGIAuth Demo
-
USER
-
-
Enter your uuid:
-
-
-
USERNAME:
-
%s
-
'''
-
-
fradio = ' %s\n'
-
-
def showForm():
-
friends = ''
-
for i in [0, 10, 25, 50, 100]:
-
checked = ''
-
if i == 0:
-
checked = 'CHECKED'
-
friends = friends + fradio % \
-
(str(i), checked, str(i))
-
-
print header + formhtml % (friends)
-
-
reshtml = '''
- CGIAuth Demo
-
Users : %s
-
Your uuid is: %s
-
Your username is: %s .
-
'''
-
-
def doResults(who, howmany):
-
print header + reshtml % (who, who, howmany)
-
global host,port,client,sockSize
-
sockSize = 64000
-
#uid = str(uuid.uuid1())
-
uid = who
-
host = '127.0.0.1'
-
port = int('1234')
-
client = Client()
-
users = howmany
-
r = client.finished(uid,users)
-
print r
-
-
def process():
-
form = cgi.FieldStorage()
-
if form.has_key('person'):
-
who = form['person'].value
-
else:
-
who = 'NEW USER'
-
-
if form.has_key('howmany'):
-
howmany = form['howmany'].value
-
else:
-
howmany = 0
-
-
if form.has_key('action'):
-
doResults(who, howmany)
-
else:
-
showForm()
-
-
class Client():
-
-
def finished(self,tempid,username):
-
try:
-
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-
self.sock.connect((host, port))
- msg = 'finished '+tempid+" "+username
-
self.sock.sendall(msg)
-
self.response = self.sock.recv(sockSize)
-
result = self.response.split(' ')
-
return result
-
except:
-
print "finished Error:"+str(sys.exc_info()[0])+str(sys.exc_info()[1])
-
-
-
def closeSock(self):
-
self.sock.close()
-
-
-
if __name__ == '__main__':
-
process()
阅读(3134) | 评论(0) | 转发(2) |