版本比较

密钥

  • 该行被添加。
  • 该行被删除。
  • 格式已经改变。

...

  1. 发起请求前需要使用CipherAdapter方法来处理ssl问题

  2. 发请请求前需要指定userAgent值,创建验证码需要使用这个值

  3. 如果验证码接口返回的userAgent与值自己提交的不同,使用接口返回的

  4. 返回{"message":"Unknown Message""code": 10008}错误的原因未知,可能与账号有关

View file
namedemo_discord.com.invite.py

代码块
import sys
sys.path.append(".")

import os
import requests
import time
from loguru import logger
from faker import Faker
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.ssl_ import create_urllib3_context

# disable ssl warning
requests.packages.urllib3.disable_warnings()



class CipherAdapter(HTTPAdapter):    
    
    # 用于处理部分网站需要验证ssl
    # 在test中的load方法中使用
    # 使用self.mount_adapter()加载
    
    def init_poolmanager(self, *args, **kwargs):        
        context = create_urllib3_context(ciphers='DEFAULT:@SECLEVEL=2')        
        kwargs['ssl_context'] = context        
        return super(CipherAdapter, self).init_poolmanager(*args, **kwargs)    
    
    def proxy_manager_for(self, *args, **kwargs):        
        context = create_urllib3_context(ciphers='DEFAULT:@SECLEVEL=2')        
        kwargs['ssl_context'] = context        
        return super(CipherAdapter, self).proxy_manager_for(*args, **kwargs)



class YesCaptchaSolver:
    
    # Yescaptcha 接口测试工具
    # YesCaptchaSolver:创建任务请求,生成验证码结果
    
    def __init__(self, key="", url="", type="", proxy={}, args={}):
        self.server = "https://api.yescaptcha.com"
        self.clientkey =  "xxxxxxxxxxxxxxxxxxxxxxxxxxxx"   # 填你的密钥
        self.key = key
        self.url = url
        self.type = type
        self.proxy = proxy
        self.args = args
       
    def create(self, url):
        data = {
            "clientKey": self.clientkey,
            "task": {
                "websiteURL": self.url,
                "websiteKey": self.key,
                "type": self.type
            }
        }
        if self.args:
            data["task"].update(self.args)
        print("create: ", data)
        response = requests.post(f"{self.server}/createTask", json=data, timeout=30, proxies=self.proxy, verify=False)
        return response.json()

    def get(self, task_id):
        data = {
            "clientKey": self.clientkey,
            "taskId": task_id
        }
        response = requests.post(f"{self.server}/getTaskResult", json=data, timeout=30, proxies=self.proxy, verify=False)
        return response.json()

    def solve(self):
        task = self.create(self.url)
        logger.info(f"Task created: {task}")
        task_id = task.get("taskId")
        if not task_id:
            return
        
        for i in range(60):
            result = self.get(task_id)
            # logger.info(f"Task result: {result}")
            if result.get("errorId") == 1:
                logger.info(f"Task fail: {result}")
                return
            if result.get("status") == "ready":
                logger.info(f"Task result: {result}")
                return result.get("solution")
            time.sleep(2)






class YescaptchaTester:
    
    # Yescaptcha 接口测试实例
    # YescaptchaTester: 提交验证码结果,获取结果
    
    def __init__(self, proxy={}):
        self.solver = YesCaptchaSolver(proxy=proxy)
        self.token = ""
        self.proxy = proxy
        self.session = requests.Session()
        
        # 生成随机user-agent
        # fake = Faker(locale='zh_CN')
        # self.session.headers = {'User-Agent': fake.user_agent()}
        
        # discord注册需要计算fingerprint,因此需要配套的user-agent
        self.userAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36"
        self.session.headers.update({'User-Agent': self.userAgent})
        
        # 以下参数可以阅读开发文档获取
        self.type = ""
        # key和url可以在创建任务时获取
        self.key = ""
        self.url = ""
        self.args = {
            "userAgent": self.userAgent,
            # rqdata值是通过请求获取的,这里先留空
            "rqdata": "",
        }
        
    
    def get_token(self):
        # 获取验证码token
        self.solver.key = self.key
        self.solver.url = self.url
        self.solver.type = self.type
        self.solver.proxy = self.proxy
        self.solver.args = self.args
        result = self.solver.solve()
        if result and 'userAgent' in result:
            return result
        elif result and result.get("gRecaptchaResponse"):
            return result.get("gRecaptchaResponse")
    
    
    def load(self):
        # 在提交验证之前可能需要处理其他内容,可以在这里处理
        
        # mount adapter for ssl warning
        self.session.mount('https://', CipherAdapter())
        self.session.mount('http://', CipherAdapter())
        
        # 添加登陆成功后的帐号token
        self.session.headers.update({"authorization": "

...

这里填自己的帐号Token"})

        # 访问邀请页
        url = self.url = "https://discord.com/invite/fusionist"
        r = self.session.get(url, proxies=self.proxy)
        logger.info(f"load invite page: {r.status_code}")
        self.session.headers.update({"Referer": url})
        
        # 点击进群按钮,获取验证码参数
        url = "https://discord.com/api/v9/invites/fusionist"
        r = self.session.post(url, json={}, proxies=self.proxy)
        logger.info(f"post invite page: {r.status_code}")
        if "401: Unauthorized" in r.text:
            logger.info(f"请先登陆,获取登陆token -> {r.text}")
            return False
        
        self.sitekey = self.key = r.json().get("captcha_sitekey")
        self.rqdata = r.json().get("captcha_rqdata")
        self.rqtoken = r.json().get("captcha_rqtoken")
        assert self.rqdata and self.rqtoken and self.sitekey, f"Args is None: {r.text}"
        self.args['rqdata'] = self.rqdata
        return True
        

    def submit(self):
        # discord提交进群
        captcha_key = self.token.get('gRecaptchaResponse')
        userAgent = self.token.get('userAgent')
        if self.userAgent != userAgent:
            logger.info(f"User-Agent不一致,使用接口返回的User-Agent: {userAgent}")
            self.session.headers.update({
                'User-Agent': userAgent,
            })
        url = "https://discord.com/api/v9/invites/fusionist"
        data = {"captcha_key": captcha_key, "captcha_rqtoken":self.rqtoken}
        response = self.session.post(url, json=data, proxies=self.proxy)

        if 'show_verification_form' in response.text:
            logger.info(f"验证成功! -> {response.text}")
            return True
        else:
            logger.info(f"验证失败! -> {response.text}")
        
    
    
    def test(self):
        # 完整测试流程
        # 加载网页
        if self.load():
            # 创建验证码任务和获取结果
            self.token = self.get_token()
            assert self.token, "获取token失败"
            # 提交验证
            self.submit()
    
    




if __name__ == "__main__":
    
    tester = YescaptchaTester()
    # tester.type = "HCaptchaTaskProxylessM1"
    tester.type = "HCaptchaTaskProxyless"
    tester.solver.server = "https://dev.yescaptcha.com"
    # 填你的密钥,或者从环境变量中获取
    tester.solver.clientkey = "" or os.environ.get("YESCAPTCHA_CLIENTKEY")
    tester.test()
    

执行正确的结果:

...

再次提醒:协议进群有封号风险,请谨慎使用。

...