最近有个需求要用Python做一个爬虫不间断运行,但是对方网站做了比较严格的反爬,然后就选择了用随机header和代理。
刚开始的时候使用的免费代理,后来发现免费才是最贵的,经常失效或者连接不上,于是改为使用付费代理,最后选择了芝麻代理
但是爬虫每秒请求可能为5QPS左右,芝麻代理默认请求为1QPS,所以只能采取维护一个代理池的方式,每次请求从中随机选取。
刚开始的时候使用的是购买套餐,后来发现并不划算,套餐每天有使用上限,IP存活时间长的,上限数量就低,IP上限高的,存活时间又比较低,有预算上限的长时间爬虫类项目可以选择IP存活时间长的套餐,把维护IP池数量稍微降低一点,勉强够用一天了。不过后期还是建议使用按次购买,控制好频率就不怕超限。
注意IP获取不计次,但是一旦使用就会计次
之前没经验选择5分钟套餐,不到两点IP用量就要到上限了,选套餐还是建议存活时间长的
这个维护IP池同时支持套餐和按次收费,所以也不用太过纠结
首先你要获取到你的AppKey
和Neek
参数,在官网提取IP
生成的API链接里可以获取到http://h.zhimaruanjian.com/getapi/#obtain_ip
如果是选择的用套餐提取,还会有对应的pack
参数,是你自己的对应套餐ID,在上面的链接同样可以获取到,然后实例化时把参数填进去即可。pack为0则按次提取IP
这里我们按照维护大小为50个的IP池为例,使用示例如下:
while True:
try:
begin = time.time() * 1000
zm = ZhiMaPool('You App Key', 'Your Neek') # 你的AppKey的Neek参数
zm.ip_sum = 50 # IP池总数量
# zm.pack = 121888 # 套餐pack参数,为0则使用按次提取,默认为0
# zm.ip_type = 'http' # http or https
# zm.ttl = 60 # 过期提前失效时间,默认提前60秒
# zm.pool_path = './zhima_pool.json' # IP池保存路径
zm.check_ip()
cost = time.time() * 1000 - begin
print('本次耗时'+ str(cost) + '毫秒')
time.sleep(30) # 间隔时间请少于ttl时长
except Exception as e:
print(e)
完整代码
# -*- coding: utf-8 -*-
# __author__ = "Yuuuu" gty0211@foxmail.com
# Date: 2020-10-22 Python: 3.8
import os
import time
import requests
import json
import re
class ZhiMaPool(object):
pool_path = './zhima_pool.json' # IP池保存地址
ttl = 60 # 过期间隔
pack = 0 # 套餐pack参数,为0则使用按次提取
ip_pool = []
ip_type = 'http' # 提取IP类别,http或者https
ip_sum = 100 # IP池总数
def __init__(self,key,neek,ip_sum = 100,ttl = 60):
self.key = key
self.neek = neek
self.ttl = ttl
self.ip_sum = ip_sum
self._init() # init the proxy
def _init(self):
print('初始化中...')
if os.path.exists(self.pool_path):
with open(self.pool_path,'r') as f:
self.ip_pool = json.loads(f.read())
response = requests.get('http://pv.sohu.com/cityjson?ie=utf-8')
address = re.search(r'"cip": "(.*?)", "cid', response.text).group(1)
# add ip_white
url = 'http://web.http.cnapi.cc/index/index/save_white?neek={neek}&appkey={key}&white={local}'.format(
neek=self.neek, key=self.key, local=address)
response = requests.get(url=url)
code = json.loads(response.text).get('code')
if code == 0 or code == 115:
print('初始化成功,启动中稍等..')
else:
print('初始化芝麻账号失败')
time.sleep(2)
def check_ip(self):
for index,node in enumerate(self.ip_pool):
ip = node[0]
port = node[1]
expire_time = node[2]
if expire_time - self.ttl < time.time():
del self.ip_pool[index]
print('IP即将超时,已删除',ip)
if not self.checkproxy(ip + ':' + port,ip):
del self.ip_pool[index] # 删除
print('使用IP代理请求出错,删除',ip)
while len(self.ip_pool) < self.ip_sum:
if self.pack == 0:
self.add_ip_count()
else:
self.add_ip()
time.sleep(2) # 不能请求太快
self.save_to_file() # 存到json文件
# 根据套餐提取
def add_ip(self, num=20):
port = '11' if self.ip_type == 'https' else '1' # http(default) & https
get_url = 'http://webapi.http.zhimacangku.com/getip?num={num}&type=2&pro=&city=0&yys=0&port={port}&pack={pack}&ts=1&ys=0&cs=0&lb=1&sb=0&pb=4&mr=1®ions='.format(port=port,num=num,pack=self.pack)
response = requests.get(get_url)
# code = json.loads(response.text).get('code')
self.parse(response.text)
# 按次提取IP num:每次提取数量,invalid:有效时长 TODO 这里可以修改为其他
def add_ip_count(self, num=20, invalid_time=2):
port = '11' if self.ip_type == 'https' else '1' # http(default) & https
get_url = 'http://webapi.http.zhimacangku.com/getip?num={num}&type=2&pro=&city=0&yys=0&port={port}&time={invalid_time}&ts=1&ys=0&cs=0&lb=1&sb=0&pb=4&mr=1®ions='.format(num=num,invalid_time=invalid_time,port=port)
response = requests.get(get_url)
self.parse(response.text)
# 解析提取的IP
def parse(self, json_data):
count = 0
ret_dict = json.loads(json_data)
if ret_dict.get('success'):
nodes = ret_dict.get('data')
for node in nodes:
expire_time = self.str_to_time(node.get('expire_time'))
if expire_time - self.ttl < time.time():
print('该IP存活时间过短,已弃用',node.get('ip'))
continue
tmp = [[str(node.get('ip')),str(node.get('port')),expire_time,self.ip_type]]
self.ip_pool.extend(tmp) # 添加到ip池
count += 1
print('本次获取' + str(count) + '个IP')
# 时间字符串转时间戳
def str_to_time(self, time_str):
# 先转换为时间数组
timeArray = time.strptime(time_str, "%Y-%m-%d %H:%M:%S")
# 转换为时间戳
timeStamp = int(time.mktime(timeArray))
return timeStamp
# 检查代理IP有效性
def checkproxy(self,proxy,ip):
return True
# response = requests.get('http://pv.sohu.com/cityjson?ie=utf-8',proxies={self.ip_type:proxy})
# address = re.search(r'"cip": "(.*?)", "cid', response.text).group(1)
# time.sleep(2)
# return str(address) == str(ip)
def save_to_file(self):
with open(self.pool_path, 'w') as f:
f.write(json.dumps(self.ip_pool))
# 使用方法
while True:
try:
begin = time.time() * 1000
zm = ZhiMaPool('You App Key', 'Your Neek')
zm.ip_sum = 50 # IP池总数量
# zm.pack = 121888 # 套餐pack参数,为0则使用按次提取,默认为0
# zm.ip_type = 'http' # http or https
# zm.ttl = 60 # 过期提前失效时间,默认提前60秒
# zm.pool_path = './zhima_pool.json' # IP池保存路径
zm.check_ip()
cost = time.time() * 1000 - begin
print('本次耗时'+ str(cost) + '毫秒')
time.sleep(30) # 间隔时间请少于ttl时长
except Exception as e:
print(e)
后台运行nohup python3 ZhiMaProxy.py >/dev/null 2>log &
即可在同目录下的zhima_pool.json
文件得到一个健康的IP池
重复一遍,获取IP不收费,使用才收费
评论 (0)