...
发起请求前需要使用
CipherAdapter
方法来处理ssl问题发请请求前需要指定userAgent值,创建验证码需要使用这个值
如果验证码接口返回的userAgent与值自己提交的不同,使用接口返回的
返回{"message":"Unknown Message""code": 10008}错误的原因未知,可能与账号有关
View file | ||
---|---|---|
|
代码块 |
---|
import sys
sys.path.append(".")
import os
import requests
import time
from loguru import logger
from faker import Faker
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.ssl_ import create_urllib3_context
# disable ssl warning
requests.packages.urllib3.disable_warnings()
class CipherAdapter(HTTPAdapter):
# 用于处理部分网站需要验证ssl
# 在test中的load方法中使用
# 使用self.mount_adapter()加载
def init_poolmanager(self, *args, **kwargs):
context = create_urllib3_context(ciphers='DEFAULT:@SECLEVEL=2')
kwargs['ssl_context'] = context
return super(CipherAdapter, self).init_poolmanager(*args, **kwargs)
def proxy_manager_for(self, *args, **kwargs):
context = create_urllib3_context(ciphers='DEFAULT:@SECLEVEL=2')
kwargs['ssl_context'] = context
return super(CipherAdapter, self).proxy_manager_for(*args, **kwargs)
class YesCaptchaSolver:
# Yescaptcha 接口测试工具
# YesCaptchaSolver:创建任务请求,生成验证码结果
def __init__(self, key="", url="", type="", proxy={}, args={}):
self.server = "https://api.yescaptcha.com"
self.clientkey = "xxxxxxxxxxxxxxxxxxxxxxxxxxxx" # 填你的密钥
self.key = key
self.url = url
self.type = type
self.proxy = proxy
self.args = args
def create(self, url):
data = {
"clientKey": self.clientkey,
"task": {
"websiteURL": self.url,
"websiteKey": self.key,
"type": self.type
}
}
if self.args:
data["task"].update(self.args)
print("create: ", data)
response = requests.post(f"{self.server}/createTask", json=data, timeout=30, proxies=self.proxy, verify=False)
return response.json()
def get(self, task_id):
data = {
"clientKey": self.clientkey,
"taskId": task_id
}
response = requests.post(f"{self.server}/getTaskResult", json=data, timeout=30, proxies=self.proxy, verify=False)
return response.json()
def solve(self):
task = self.create(self.url)
logger.info(f"Task created: {task}")
task_id = task.get("taskId")
if not task_id:
return
for i in range(60):
result = self.get(task_id)
# logger.info(f"Task result: {result}")
if result.get("errorId") == 1:
logger.info(f"Task fail: {result}")
return
if result.get("status") == "ready":
logger.info(f"Task result: {result}")
return result.get("solution")
time.sleep(2)
class YescaptchaTester:
# Yescaptcha 接口测试实例
# YescaptchaTester: 提交验证码结果,获取结果
def __init__(self, proxy={}):
self.solver = YesCaptchaSolver(proxy=proxy)
self.token = ""
self.proxy = proxy
self.session = requests.Session()
# 生成随机user-agent
# fake = Faker(locale='zh_CN')
# self.session.headers = {'User-Agent': fake.user_agent()}
# discord注册需要计算fingerprint,因此需要配套的user-agent
self.userAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36"
self.session.headers.update({'User-Agent': self.userAgent})
# 以下参数可以阅读开发文档获取
self.type = ""
# key和url可以在创建任务时获取
self.key = ""
self.url = ""
self.args = {
"userAgent": self.userAgent,
# rqdata值是通过请求获取的,这里先留空
"rqdata": "",
}
def get_token(self):
# 获取验证码token
self.solver.key = self.key
self.solver.url = self.url
self.solver.type = self.type
self.solver.proxy = self.proxy
self.solver.args = self.args
result = self.solver.solve()
if result and 'userAgent' in result:
return result
elif result and result.get("gRecaptchaResponse"):
return result.get("gRecaptchaResponse")
def load(self):
# 在提交验证之前可能需要处理其他内容,可以在这里处理
# mount adapter for ssl warning
self.session.mount('https://', CipherAdapter())
self.session.mount('http://', CipherAdapter())
# 添加登陆成功后的帐号token
self.session.headers.update({"authorization": "MTA3MTc1MzEwNDYwNjkwMDI5NQ.GraPXr.Z4LLGbChB62wKU13XMza_hZ27eAwWeoTqfrJT4"})
# 访问邀请页
url = self.url = "https://discord.com/invite/ |
...
fusionist" r = self.session.get(url, proxies=self.proxy) logger.info(f"load invite page: {r.status_code}") self.session.headers.update({"Referer": url}) # 点击进群按钮,获取验证码参数 url = "https://discord.com/api/v9/invites/ |
...
fusionist" r = self.session.post(url, json={}, proxies=self.proxy) logger.info(f"post invite page: {r.status_code}") if "401: Unauthorized" in r.text: logger.info(f"请先登陆,获取登陆token -> {r.text}") return False self.sitekey = self.key = r.json().get("captcha_sitekey") self.rqdata = r.json().get("captcha_rqdata") self.rqtoken = r.json().get("captcha_rqtoken") assert self.rqdata and self.rqtoken and self.sitekey, f"Args is None: {r.text}" self.args['rqdata'] = self.rqdata return True def submit(self): # discord提交进群 captcha_key = self.token.get('gRecaptchaResponse') userAgent = self.token.get('userAgent') if self.userAgent != userAgent: logger.info(f"User-Agent不一致,使用接口返回的User-Agent: {userAgent}") self.session.headers.update({ 'User-Agent': userAgent, }) url = "https://discord.com/api/v9/invites/ |
...
fusionist" data = {"captcha_key": captcha_key, "captcha_rqtoken":self.rqtoken} response = self.session.post(url, json=data, proxies=self.proxy) if 'show_verification_form' in response.text: logger.info(f"验证成功! -> {response.text}") return True else: logger.info(f"验证失败! -> {response.text}") def test(self): # 完整测试流程 # 加载网页 if self.load(): # 创建验证码任务和获取结果 self.token = self.get_token() assert self.token, "获取token失败" # 提交验证 self.submit() if __name__ == "__main__": tester = YescaptchaTester() # tester.type = "HCaptchaTaskProxylessM1" tester.type = "HCaptchaTaskProxyless" tester.solver.server = "https://dev.yescaptcha.com" # 填你的密钥,或者从环境变量中获取 tester.solver.clientkey = "" or os.environ.get("YESCAPTCHA_CLIENTKEY") tester.test() |
执行正确的结果:
...
再次提醒:协议进群有封号风险,请谨慎使用。
...