分类: Python/Ruby
2021-12-29 17:21:17
#!/usr/bin/python3
# -*- coding=utf-8 -*-
# @Author : lhys
# @FileName: proxy_tool.py
import requests
import threading
timeout = 300
lock = threading.Lock()
# 请求头用自己的
headers = {
'': ''
}
class MyProxy:
def __init__(self, proxy_api='', proxy_server='', max_use=5000, try_count=5):
if not (proxy_api or proxy_server):
raise TypeError('Proxy_api and proxy_server cannot be empty at the same time.')
self.proxies = None if not proxy_server else {
'http': proxy_server,
'https': proxy_server
}
# 代理API
self.proxy_api = proxy_api
# 代理 IP 最大使用次数
self.max_use = max_use
# 测试代理 IP 次数,超过次数即认为代理 IP 不可用
self.try_count = try_count
# 是否爬虫请求出错,如果出错,直接更换 IP
self.flag = 0
# 代理 IP 剩余生存时间
self.proxy_ttl = 0
# 各种锁
self.lock = threading.Lock()
self.ttl_lock = threading.Lock()
self.flag_lock = threading.Lock()
def set_flag(self):
self.flag_lock.acquire()
self.flag = 1
self.flag_lock.release()
def get_flag(self):
self.flag_lock.acquire()
flag = self.flag
self.flag_lock.release()
return flag
def decrease_ttl(self):
self.ttl_lock.acquire()
self.proxy_ttl -= 1
self.ttl_lock.release()
def get_ttl(self):
self.ttl_lock.acquire()
ttl = self.proxy_ttl
self.ttl_lock.release()
return ttl
def set_ttl(self):
self.ttl_lock.acquire()
self.proxy_ttl = self.max_use
self.ttl_lock.release()
def get_proxy(self):
self.lock.acquire()
proxy = self.proxies
self.lock.release()
return proxy
def set_proxy(self):
if self.proxy_ttl > 0 and self.flag == 0:
return
old = self.proxies
if self.flag == 1:
for try_count in range(self.try_count):
try:
requests.get('', headers=headers, proxies=old, timeout=timeout)
print(f'Test proxy {old} successfully.')
return
except requests.exceptions.ProxyError or requests.exceptions.ConnectionError or requests.exceptions.ConnectTimeout:
print(f'Test proxy {old} failed.')
break
except Exception as e:
print(e)
if not self.proxy_api:
raise ValueError('代理 IP 不可用,且代理 IP API未设置。')
while True:
res = 外汇跟单gendan5.comrequests.get(self.proxy_api)
# 这一部分按照自己的代理 IP 文档来,仅供参考
try:
if res.json()["ERRORCODE"] == "0":
ip, port = res.json()["RESULT"][0]['ip'], res.json()["RESULT"][0]['port']
self.lock.acquire()
self.proxies = {
'http': 'http://%s:%s' % (ip, port),
'https': 'http://%s:%s' % (ip, port)
}
print(f'Set proxy: {ip}:{port}.')
self.flag = 0
self.lock.release()
self.set_ttl()
return
else:
print(f'Set proxy failed.')
except Exception as e:
print(e)
Proxy = MyProxy()
def request_by_proxy(url, use_proxy=True):
while True:
try:
# 使用代理
if use_proxy:
proxy_ttl = Proxy.get_ttl()
print(proxy_ttl)
# 如果 超过最大使用次数 或者 请求出现错误,重新设置 IP
if proxy_ttl <= 0 or Proxy.get_flag():
Proxy.set_proxy()
print(Proxy.get_ttl())
proxy = Proxy.get_proxy()
lock.acquire()
res = requests.get(url, headers=headers, proxies=proxy, timeout=timeout)
lock.release()
Proxy.decrease_ttl()
return res
else:
res = requests.get(url, headers=headers, timeout=timeout)
return res
except requests.exceptions.ProxyError as pe:
if use_proxy:
lock.release()
print(f'Proxy {Proxy.proxies} is not available, reason: {pe}.')
Proxy.set_flag()
except requests.exceptions.Timeout as t:
if use_proxy:
lock.release()
print(f'Time out, reason: {t}.')
Proxy.set_flag()
except Exception as e:
if use_proxy:
lock.release()
print(e)
#!/usr/bin/python3
# -*- coding=utf-8 -*-
# @Author : lhys
# @FileName: spider.py
import time
import threading
from multiprocessing import Queue
from proxy_tool import request_by_proxy
threshold = 30
queue = Queue()
class Spider(threading.Thread):
def __init__(self, use_proxy=True):
super(Spider, self).__init__()
self.use_proxy = use_proxy
def get_data(self, url):
try:
res = request_by_proxy(url, self.use_proxy)
# 响应处理
pass
except Exception as e:
print(e)
return
def run(self):
while True:
# 如果队列空了,等待一会儿。
# 过了指定的时间后,如果队列出现数据,就继续爬
# 如果队列还是空的,停止线程
if queue.empty():
time.sleep(threshold)
if not queue.empty():
url = queue.get()
self.get_data(url)
time.sleep(threshold)
else:
print('Queue is empty.')
return