1
0
Fork 0

updated quora module, added selenium to get cookie

This commit is contained in:
Raju Komati 2023-04-24 03:08:47 +05:30
parent 1be1e440fd
commit a37920b27b
No known key found for this signature in database
GPG key ID: A581A5D67A8EB090
5 changed files with 562 additions and 386 deletions

16
.gitignore vendored Normal file
View file

@ -0,0 +1,16 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
.idea/
*/__pycache__/
*.log
cookie.json

View file

@ -1,28 +1,42 @@
from quora.api import Client as PoeClient
from quora.mail import Emailnator
from requests import Session
from tls_client import Session as TLS
from re import search, findall
from json import loads
from time import sleep
from pathlib import Path
from random import choice, choices, randint
from string import ascii_letters, digits
from urllib import parse
from os import urandom
from hashlib import md5
from json import dumps
from pypasser import reCaptchaV3
import json
from datetime import datetime
from hashlib import md5
from json import dumps
from pathlib import Path
from random import choice, choices, randint
from re import search, findall
from string import ascii_letters, digits
from urllib.parse import unquote
import selenium.webdriver.support.expected_conditions as EC
from pypasser import reCaptchaV3
from requests import Session
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from tls_client import Session as TLS
from quora.api import Client as PoeClient
from quora.mail import Emailnator
# from twocaptcha import TwoCaptcha
# solver = TwoCaptcha('72747bf24a9d89b4dcc1b24875efd358')
MODELS = {
"sage": "capybara",
"gpt-4": "beaver",
"claude-v1.2": "a2_2",
"claude-instant-v1.0": "a2",
"gpt-3.5-turbo": "chinchilla",
}
def extract_formkey(html):
script_regex = r'<script>if\(.+\)throw new Error;(.+)</script>'
script_text = search(script_regex, html).group(1)
key_regex = r'var .="([0-9a-f]+)",'
key_text = search(key_regex, script_text).group(1)
cipher_regex = r'.\[(\d+)\]=.\[(\d+)\]'
script_regex = r"<script>if\(.+\)throw new Error;(.+)</script>"
script_text = search(script_regex, html).group(1)
key_regex = r'var .="([0-9a-f]+)",'
key_text = search(key_regex, script_text).group(1)
cipher_regex = r".\[(\d+)\]=.\[(\d+)\]"
cipher_pairs = findall(cipher_regex, script_text)
formkey_list = [""] * len(cipher_pairs)
@ -33,42 +47,40 @@ def extract_formkey(html):
return formkey
class PoeResponse:
class Completion:
class Choices:
def __init__(self, choice: dict) -> None:
self.text = choice['text']
self.content = self.text.encode()
self.index = choice['index']
self.logprobs = choice['logprobs']
self.finish_reason = choice['finish_reason']
self.text = choice["text"]
self.content = self.text.encode()
self.index = choice["index"]
self.logprobs = choice["logprobs"]
self.finish_reason = choice["finish_reason"]
def __repr__(self) -> str:
return f'''<__main__.APIResponse.Completion.Choices(\n text = {self.text.encode()},\n index = {self.index},\n logprobs = {self.logprobs},\n finish_reason = {self.finish_reason})object at 0x1337>'''
return f"""<__main__.APIResponse.Completion.Choices(\n text = {self.text.encode()},\n index = {self.index},\n logprobs = {self.logprobs},\n finish_reason = {self.finish_reason})object at 0x1337>"""
def __init__(self, choices: dict) -> None:
self.choices = [self.Choices(choice) for choice in choices]
class Usage:
def __init__(self, usage_dict: dict) -> None:
self.prompt_tokens = usage_dict['prompt_tokens']
self.completion_tokens = usage_dict['completion_tokens']
self.total_tokens = usage_dict['total_tokens']
self.prompt_tokens = usage_dict["prompt_tokens"]
self.completion_tokens = usage_dict["completion_tokens"]
self.total_tokens = usage_dict["total_tokens"]
def __repr__(self):
return f'''<__main__.APIResponse.Usage(\n prompt_tokens = {self.prompt_tokens},\n completion_tokens = {self.completion_tokens},\n total_tokens = {self.total_tokens})object at 0x1337>'''
return f"""<__main__.APIResponse.Usage(\n prompt_tokens = {self.prompt_tokens},\n completion_tokens = {self.completion_tokens},\n total_tokens = {self.total_tokens})object at 0x1337>"""
def __init__(self, response_dict: dict) -> None:
self.response_dict = response_dict
self.id = response_dict['id']
self.object = response_dict['object']
self.created = response_dict['created']
self.model = response_dict['model']
self.completion = self.Completion(response_dict['choices'])
self.usage = self.Usage(response_dict['usage'])
self.response_dict = response_dict
self.id = response_dict["id"]
self.object = response_dict["object"]
self.created = response_dict["created"]
self.model = response_dict["model"]
self.completion = self.Completion(response_dict["choices"])
self.usage = self.Usage(response_dict["usage"])
def json(self) -> dict:
return self.response_dict
@ -76,127 +88,140 @@ class PoeResponse:
class ModelResponse:
def __init__(self, json_response: dict) -> None:
self.id = json_response['data']['poeBotCreate']['bot']['id']
self.name = json_response['data']['poeBotCreate']['bot']['displayName']
self.limit = json_response['data']['poeBotCreate']['bot']['messageLimit']['dailyLimit']
self.deleted = json_response['data']['poeBotCreate']['bot']['deletionState']
self.id = json_response["data"]["poeBotCreate"]["bot"]["id"]
self.name = json_response["data"]["poeBotCreate"]["bot"]["displayName"]
self.limit = json_response["data"]["poeBotCreate"]["bot"]["messageLimit"][
"dailyLimit"
]
self.deleted = json_response["data"]["poeBotCreate"]["bot"]["deletionState"]
class Model:
def create(
token: str,
model: str = 'gpt-3.5-turbo', # claude-instant
system_prompt: str = 'You are ChatGPT a large language model developed by Openai. Answer as consisely as possible',
description: str = 'gpt-3.5 language model from openai, skidded by poe.com',
handle: str = None) -> ModelResponse:
model: str = "gpt-3.5-turbo", # claude-instant
system_prompt: str = "You are ChatGPT a large language model developed by Openai. Answer as consisely as possible",
description: str = "gpt-3.5 language model from openai, skidded by poe.com",
handle: str = None,
) -> ModelResponse:
models = {
'gpt-3.5-turbo' : 'chinchilla',
'claude-instant-v1.0': 'a2',
'gpt-4': 'beaver'
"gpt-3.5-turbo": "chinchilla",
"claude-instant-v1.0": "a2",
"gpt-4": "beaver",
}
if not handle:
handle = f'gptx{randint(1111111, 9999999)}'
handle = f"gptx{randint(1111111, 9999999)}"
client = Session()
client.cookies['p-b'] = token
formkey = extract_formkey(client.get('https://poe.com').text)
settings = client.get('https://poe.com/api/settings').json()
client.cookies["p-b"] = token
formkey = extract_formkey(client.get("https://poe.com").text)
settings = client.get("https://poe.com/api/settings").json()
client.headers = {
"host" : "poe.com",
"origin" : "https://poe.com",
"referer" : "https://poe.com/",
"content-type" : "application/json",
"poe-formkey" : formkey,
"poe-tchannel" : settings['tchannelData']['channel'],
"user-agent" : "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36",
"connection" : "keep-alive",
"sec-ch-ua" : "\"Chromium\";v=\"112\", \"Google Chrome\";v=\"112\", \"Not:A-Brand\";v=\"99\"",
"sec-ch-ua-mobile" : "?0",
"sec-ch-ua-platform": "\"macOS\"",
"content-type" : "application/json",
"sec-fetch-site" : "same-origin",
"sec-fetch-mode" : "cors",
"sec-fetch-dest" : "empty",
"accept" : "*/*",
"accept-encoding" : "gzip, deflate, br",
"accept-language" : "en-GB,en-US;q=0.9,en;q=0.8",
"host": "poe.com",
"origin": "https://poe.com",
"referer": "https://poe.com/",
"poe-formkey": formkey,
"poe-tchannel": settings["tchannelData"]["channel"],
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36",
"connection": "keep-alive",
"sec-ch-ua": '"Chromium";v="112", "Google Chrome";v="112", "Not:A-Brand";v="99"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"macOS"',
"content-type": "application/json",
"sec-fetch-site": "same-origin",
"sec-fetch-mode": "cors",
"sec-fetch-dest": "empty",
"accept": "*/*",
"accept-encoding": "gzip, deflate, br",
"accept-language": "en-GB,en-US;q=0.9,en;q=0.8",
}
payload = dumps(separators=(',', ':'), obj = {
'queryName': 'CreateBotMain_poeBotCreate_Mutation',
'variables': {
'model' : models[model],
'handle' : handle,
'prompt' : system_prompt,
'isPromptPublic' : True,
'introduction' : '',
'description' : description,
'profilePictureUrl' : 'https://qph.fs.quoracdn.net/main-qimg-24e0b480dcd946e1cc6728802c5128b6',
'apiUrl' : None,
'apiKey' : ''.join(choices(ascii_letters + digits, k = 32)),
'isApiBot' : False,
'hasLinkification' : False,
'hasMarkdownRendering' : False,
'hasSuggestedReplies' : False,
'isPrivateBot' : False
payload = dumps(
separators=(",", ":"),
obj={
"queryName": "CreateBotMain_poeBotCreate_Mutation",
"variables": {
"model": models[model],
"handle": handle,
"prompt": system_prompt,
"isPromptPublic": True,
"introduction": "",
"description": description,
"profilePictureUrl": "https://qph.fs.quoracdn.net/main-qimg-24e0b480dcd946e1cc6728802c5128b6",
"apiUrl": None,
"apiKey": "".join(choices(ascii_letters + digits, k=32)),
"isApiBot": False,
"hasLinkification": False,
"hasMarkdownRendering": False,
"hasSuggestedReplies": False,
"isPrivateBot": False,
},
"query": "mutation CreateBotMain_poeBotCreate_Mutation(\n $model: String!\n $handle: String!\n $prompt: String!\n $isPromptPublic: Boolean!\n $introduction: String!\n $description: String!\n $profilePictureUrl: String\n $apiUrl: String\n $apiKey: String\n $isApiBot: Boolean\n $hasLinkification: Boolean\n $hasMarkdownRendering: Boolean\n $hasSuggestedReplies: Boolean\n $isPrivateBot: Boolean\n) {\n poeBotCreate(model: $model, handle: $handle, promptPlaintext: $prompt, isPromptPublic: $isPromptPublic, introduction: $introduction, description: $description, profilePicture: $profilePictureUrl, apiUrl: $apiUrl, apiKey: $apiKey, isApiBot: $isApiBot, hasLinkification: $hasLinkification, hasMarkdownRendering: $hasMarkdownRendering, hasSuggestedReplies: $hasSuggestedReplies, isPrivateBot: $isPrivateBot) {\n status\n bot {\n id\n ...BotHeader_bot\n }\n }\n}\n\nfragment BotHeader_bot on Bot {\n displayName\n messageLimit {\n dailyLimit\n }\n ...BotImage_bot\n ...BotLink_bot\n ...IdAnnotation_node\n ...botHelpers_useViewerCanAccessPrivateBot\n ...botHelpers_useDeletion_bot\n}\n\nfragment BotImage_bot on Bot {\n displayName\n ...botHelpers_useDeletion_bot\n ...BotImage_useProfileImage_bot\n}\n\nfragment BotImage_useProfileImage_bot on Bot {\n image {\n __typename\n ... on LocalBotImage {\n localName\n }\n ... on UrlBotImage {\n url\n }\n }\n ...botHelpers_useDeletion_bot\n}\n\nfragment BotLink_bot on Bot {\n displayName\n}\n\nfragment IdAnnotation_node on Node {\n __isNode: __typename\n id\n}\n\nfragment botHelpers_useDeletion_bot on Bot {\n deletionState\n}\n\nfragment botHelpers_useViewerCanAccessPrivateBot on Bot {\n isPrivateBot\n viewerIsCreator\n}\n",
},
'query': 'mutation CreateBotMain_poeBotCreate_Mutation(\n $model: String!\n $handle: String!\n $prompt: String!\n $isPromptPublic: Boolean!\n $introduction: String!\n $description: String!\n $profilePictureUrl: String\n $apiUrl: String\n $apiKey: String\n $isApiBot: Boolean\n $hasLinkification: Boolean\n $hasMarkdownRendering: Boolean\n $hasSuggestedReplies: Boolean\n $isPrivateBot: Boolean\n) {\n poeBotCreate(model: $model, handle: $handle, promptPlaintext: $prompt, isPromptPublic: $isPromptPublic, introduction: $introduction, description: $description, profilePicture: $profilePictureUrl, apiUrl: $apiUrl, apiKey: $apiKey, isApiBot: $isApiBot, hasLinkification: $hasLinkification, hasMarkdownRendering: $hasMarkdownRendering, hasSuggestedReplies: $hasSuggestedReplies, isPrivateBot: $isPrivateBot) {\n status\n bot {\n id\n ...BotHeader_bot\n }\n }\n}\n\nfragment BotHeader_bot on Bot {\n displayName\n messageLimit {\n dailyLimit\n }\n ...BotImage_bot\n ...BotLink_bot\n ...IdAnnotation_node\n ...botHelpers_useViewerCanAccessPrivateBot\n ...botHelpers_useDeletion_bot\n}\n\nfragment BotImage_bot on Bot {\n displayName\n ...botHelpers_useDeletion_bot\n ...BotImage_useProfileImage_bot\n}\n\nfragment BotImage_useProfileImage_bot on Bot {\n image {\n __typename\n ... on LocalBotImage {\n localName\n }\n ... on UrlBotImage {\n url\n }\n }\n ...botHelpers_useDeletion_bot\n}\n\nfragment BotLink_bot on Bot {\n displayName\n}\n\nfragment IdAnnotation_node on Node {\n __isNode: __typename\n id\n}\n\nfragment botHelpers_useDeletion_bot on Bot {\n deletionState\n}\n\nfragment botHelpers_useViewerCanAccessPrivateBot on Bot {\n isPrivateBot\n viewerIsCreator\n}\n',
})
)
base_string = payload + client.headers["poe-formkey"] + 'WpuLMiXEKKE98j56k'
client.headers["poe-tag-id"] = md5(base_string.encode()).hexdigest()
response = client.post("https://poe.com/api/gql_POST", data = payload)
base_string = payload + client.headers["poe-formkey"] + "WpuLMiXEKKE98j56k"
client.headers["poe-tag-id"] = md5(base_string.encode()).hexdigest()
if not 'success' in response.text:
raise Exception('''
response = client.post("https://poe.com/api/gql_POST", data=payload)
if "success" not in response.text:
raise Exception(
"""
Bot creation Failed
!! Important !!
Bot creation was not enabled on this account
please use: quora.Account.create with enable_bot_creation set to True
''')
"""
)
return ModelResponse(response.json())
class Account:
def create(proxy: None or str = None, logging: bool = False, enable_bot_creation: bool = False):
client = TLS(client_identifier='chrome110')
client.proxies = {
'http': f'http://{proxy}',
'https': f'http://{proxy}'} if proxy else None
def create(
proxy: str | None = None,
logging: bool = False,
enable_bot_creation: bool = False,
):
client = TLS(client_identifier="chrome110")
client.proxies = (
{"http": f"http://{proxy}", "https": f"http://{proxy}"} if proxy else None
)
mail_client = Emailnator()
mail_address = mail_client.get_mail()
mail_client = Emailnator()
mail_address = mail_client.get_mail()
if logging: print('email', mail_address)
if logging:
print("email", mail_address)
client.headers = {
'authority' : 'poe.com',
'accept' : '*/*',
'accept-language': 'en,fr-FR;q=0.9,fr;q=0.8,es-ES;q=0.7,es;q=0.6,en-US;q=0.5,am;q=0.4,de;q=0.3',
'content-type' : 'application/json',
'origin' : 'https://poe.com',
'poe-formkey' : 'null',
'poe-tag-id' : 'null',
'poe-tchannel' : 'null',
'referer' : 'https://poe.com/login',
'sec-ch-ua' : '"Chromium";v="112", "Google Chrome";v="112", "Not:A-Brand";v="99"',
'sec-ch-ua-mobile' : '?0',
'sec-ch-ua-platform': '"macOS"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-origin',
'user-agent' : 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36'
"authority": "poe.com",
"accept": "*/*",
"accept-language": "en,fr-FR;q=0.9,fr;q=0.8,es-ES;q=0.7,es;q=0.6,en-US;q=0.5,am;q=0.4,de;q=0.3",
"content-type": "application/json",
"origin": "https://poe.com",
"poe-tag-id": "null",
"referer": "https://poe.com/login",
"sec-ch-ua": '"Chromium";v="112", "Google Chrome";v="112", "Not:A-Brand";v="99"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"macOS"',
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36",
"poe-formkey": extract_formkey(client.get("https://poe.com/login").text),
"poe-tchannel": client.get("https://poe.com/api/settings").json()[
"tchannelData"
]["channel"],
}
client.headers["poe-formkey"] = extract_formkey(client.get('https://poe.com/login').text)
client.headers["poe-tchannel"] = client.get('https://poe.com/api/settings').json()['tchannelData']['channel']
token = reCaptchaV3('https://www.recaptcha.net/recaptcha/enterprise/anchor?ar=1&k=6LflhEElAAAAAI_ewVwRWI9hsyV4mbZnYAslSvlG&co=aHR0cHM6Ly9wb2UuY29tOjQ0Mw..&hl=en&v=4PnKmGB9wRHh1i04o7YUICeI&size=invisible&cb=bi6ivxoskyal')
token = reCaptchaV3(
"https://www.recaptcha.net/recaptcha/enterprise/anchor?ar=1&k=6LflhEElAAAAAI_ewVwRWI9hsyV4mbZnYAslSvlG&co=aHR0cHM6Ly9wb2UuY29tOjQ0Mw..&hl=en&v=4PnKmGB9wRHh1i04o7YUICeI&size=invisible&cb=bi6ivxoskyal"
)
# token = solver.recaptcha(sitekey='6LflhEElAAAAAI_ewVwRWI9hsyV4mbZnYAslSvlG',
# url = 'https://poe.com/login?redirect_url=%2F',
# version = 'v3',
@ -204,132 +229,219 @@ class Account:
# invisible = 1,
# action = 'login',)['code']
payload = dumps(separators = (',', ':'), obj = {
'queryName': 'MainSignupLoginSection_sendVerificationCodeMutation_Mutation',
'variables': {
'emailAddress' : mail_address,
'phoneNumber' : None,
'recaptchaToken': token
payload = dumps(
separators=(",", ":"),
obj={
"queryName": "MainSignupLoginSection_sendVerificationCodeMutation_Mutation",
"variables": {
"emailAddress": mail_address,
"phoneNumber": None,
"recaptchaToken": token,
},
"query": "mutation MainSignupLoginSection_sendVerificationCodeMutation_Mutation(\n $emailAddress: String\n $phoneNumber: String\n $recaptchaToken: String\n) {\n sendVerificationCode(verificationReason: login, emailAddress: $emailAddress, phoneNumber: $phoneNumber, recaptchaToken: $recaptchaToken) {\n status\n errorMessage\n }\n}\n",
},
'query': 'mutation MainSignupLoginSection_sendVerificationCodeMutation_Mutation(\n $emailAddress: String\n $phoneNumber: String\n $recaptchaToken: String\n) {\n sendVerificationCode(verificationReason: login, emailAddress: $emailAddress, phoneNumber: $phoneNumber, recaptchaToken: $recaptchaToken) {\n status\n errorMessage\n }\n}\n',
})
)
base_string = payload + client.headers["poe-formkey"] + "WpuLMiXEKKE98j56k"
client.headers["poe-tag-id"] = md5(base_string.encode()).hexdigest()
base_string = payload + client.headers["poe-formkey"] + 'WpuLMiXEKKE98j56k'
client.headers["poe-tag-id"] = md5(base_string.encode()).hexdigest()
print(dumps(client.headers, indent=4))
response = client.post('https://poe.com/api/gql_POST', data=payload)
if 'automated_request_detected' in response.text:
print('please try using a proxy / wait for fix')
if 'Bad Request' in response.text:
if logging: print('bad request, retrying...' , response.json())
response = client.post("https://poe.com/api/gql_POST", data=payload)
if "automated_request_detected" in response.text:
print("please try using a proxy / wait for fix")
if "Bad Request" in response.text:
if logging:
print("bad request, retrying...", response.json())
quit()
if logging: print('send_code' ,response.json())
if logging:
print("send_code", response.json())
mail_content = mail_client.get_message()
mail_token = findall(r';">(\d{6,7})</div>', mail_content)[0]
mail_token = findall(r';">(\d{6,7})</div>', mail_content)[0]
if logging: print('code', mail_token)
if logging:
print("code", mail_token)
payload = dumps(separators = (',', ':'), obj={
"queryName": "SignupOrLoginWithCodeSection_signupWithVerificationCodeMutation_Mutation",
"variables": {
"verificationCode": str(mail_token),
"emailAddress": mail_address,
"phoneNumber": None
payload = dumps(
separators=(",", ":"),
obj={
"queryName": "SignupOrLoginWithCodeSection_signupWithVerificationCodeMutation_Mutation",
"variables": {
"verificationCode": str(mail_token),
"emailAddress": mail_address,
"phoneNumber": None,
},
"query": "mutation SignupOrLoginWithCodeSection_signupWithVerificationCodeMutation_Mutation(\n $verificationCode: String!\n $emailAddress: String\n $phoneNumber: String\n) {\n signupWithVerificationCode(verificationCode: $verificationCode, emailAddress: $emailAddress, phoneNumber: $phoneNumber) {\n status\n errorMessage\n }\n}\n",
},
"query": "mutation SignupOrLoginWithCodeSection_signupWithVerificationCodeMutation_Mutation(\n $verificationCode: String!\n $emailAddress: String\n $phoneNumber: String\n) {\n signupWithVerificationCode(verificationCode: $verificationCode, emailAddress: $emailAddress, phoneNumber: $phoneNumber) {\n status\n errorMessage\n }\n}\n"
})
)
base_string = payload + client.headers["poe-formkey"] + 'WpuLMiXEKKE98j56k'
client.headers["poe-tag-id"] = md5(base_string.encode()).hexdigest()
base_string = payload + client.headers["poe-formkey"] + "WpuLMiXEKKE98j56k"
client.headers["poe-tag-id"] = md5(base_string.encode()).hexdigest()
response = client.post('https://poe.com/api/gql_POST', data = payload)
if logging: print('verify_code', response.json())
def get():
cookies = open(Path(__file__).resolve().parent / 'cookies.txt', 'r').read().splitlines()
response = client.post("https://poe.com/api/gql_POST", data=payload)
if logging:
print("verify_code", response.json())
def get(self):
cookies = (
open(Path(__file__).resolve().parent / "cookies.txt", "r")
.read()
.splitlines()
)
return choice(cookies)
class StreamingCompletion:
def create(
model : str = 'gpt-4',
custom_model : bool = None,
prompt: str = 'hello world',
token : str = ''):
model: str = "gpt-4",
custom_model: bool = None,
prompt: str = "hello world",
token: str = "",
):
_model = MODELS[model] if not custom_model else custom_model
models = {
'sage' : 'capybara',
'gpt-4' : 'beaver',
'claude-v1.2' : 'a2_2',
'claude-instant-v1.0' : 'a2',
'gpt-3.5-turbo' : 'chinchilla'
}
_model = models[model] if not custom_model else custom_model
client = PoeClient(token)
for chunk in client.send_message(_model, prompt):
yield PoeResponse({
'id' : chunk["messageId"],
'object' : 'text_completion',
'created': chunk['creationTime'],
'model' : _model,
'choices': [{
'text' : chunk["text_new"],
'index' : 0,
'logprobs' : None,
'finish_reason' : 'stop'
}],
'usage': {
'prompt_tokens' : len(prompt),
'completion_tokens' : len(chunk["text_new"]),
'total_tokens' : len(prompt) + len(chunk["text_new"])
yield PoeResponse(
{
"id": chunk["messageId"],
"object": "text_completion",
"created": chunk["creationTime"],
"model": _model,
"choices": [
{
"text": chunk["text_new"],
"index": 0,
"logprobs": None,
"finish_reason": "stop",
}
],
"usage": {
"prompt_tokens": len(prompt),
"completion_tokens": len(chunk["text_new"]),
"total_tokens": len(prompt) + len(chunk["text_new"]),
},
}
})
)
class Completion:
def create(
model : str = 'gpt-4',
custom_model : str = None,
prompt: str = 'hello world',
token : str = ''):
model: str = "gpt-4",
custom_model: str = None,
prompt: str = "hello world",
token: str = "",
):
models = {
'sage' : 'capybara',
'gpt-4' : 'beaver',
'claude-v1.2' : 'a2_2',
'claude-instant-v1.0' : 'a2',
'gpt-3.5-turbo' : 'chinchilla'
"sage": "capybara",
"gpt-4": "beaver",
"claude-v1.2": "a2_2",
"claude-instant-v1.0": "a2",
"gpt-3.5-turbo": "chinchilla",
}
_model = models[model] if not custom_model else custom_model
client = PoeClient(token)
for chunk in client.send_message(_model, prompt):
pass
return PoeResponse({
'id' : chunk["messageId"],
'object' : 'text_completion',
'created': chunk['creationTime'],
'model' : _model,
'choices': [{
'text' : chunk["text"],
'index' : 0,
'logprobs' : None,
'finish_reason' : 'stop'
}],
'usage': {
'prompt_tokens' : len(prompt),
'completion_tokens' : len(chunk["text"]),
'total_tokens' : len(prompt) + len(chunk["text"])
}
})
return PoeResponse(
{
"id": chunk["messageId"],
"object": "text_completion",
"created": chunk["creationTime"],
"model": _model,
"choices": [
{
"text": chunk["text"],
"index": 0,
"logprobs": None,
"finish_reason": "stop",
}
],
"usage": {
"prompt_tokens": len(prompt),
"completion_tokens": len(chunk["text"]),
"total_tokens": len(prompt) + len(chunk["text"]),
},
}
)
class Poe:
def __init__(self, model: str = "sage"):
self.cookie = self.__load_cookie()
self.model = MODELS[model]
self.client = PoeClient(self.cookie)
def __load_cookie(self) -> str:
if (cookie_file := Path("cookie.json")).exists():
with cookie_file.open() as fp:
cookie = json.load(fp)
if datetime.fromtimestamp(cookie["expiry"]) < datetime.now():
cookie = self.__register_and_get_cookie()
else:
print("Loading the cookie from file")
else:
cookie = self.__register_and_get_cookie()
return unquote(cookie["value"])
@classmethod
def __register_and_get_cookie(cls) -> dict:
mail_client = Emailnator()
mail_address = mail_client.get_mail()
print(mail_address)
options = webdriver.FirefoxOptions()
options.add_argument("-headless")
driver = webdriver.Firefox(options=options)
driver.get("https://www.poe.com")
# clicking use email button
driver.find_element(By.XPATH, '//button[contains(text(), "Use email")]').click()
email = WebDriverWait(driver, 30).until(
EC.presence_of_element_located((By.XPATH, '//input[@type="email"]'))
)
email.send_keys(mail_address)
driver.find_element(By.XPATH, '//button[text()="Go"]').click()
code = findall(r';">(\d{6,7})</div>', mail_client.get_message())[0]
print(code)
verification_code = WebDriverWait(driver, 30).until(
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Code"]'))
)
verification_code.send_keys(code)
verify_button = EC.presence_of_element_located(
(By.XPATH, '//button[text()="Verify"]')
)
login_button = EC.presence_of_element_located(
(By.XPATH, '//button[text()="Log In"]')
)
WebDriverWait(driver, 30).until(EC.any_of(verify_button, login_button)).click()
cookie = driver.get_cookie("p-b")
with open("cookie.json", "w") as fw:
json.dump(cookie, fw)
driver.close()
return cookie
def chat(self, message: str):
response = None
for chunk in self.client.send_message(self.model, message):
response = chunk["text"]
return response

View file

@ -55,10 +55,7 @@ def load_queries():
def generate_payload(query_name, variables):
return {
"query": queries[query_name],
"variables": variables
}
return {"query": queries[query_name], "variables": variables}
def request_with_retries(method, *args, **kwargs):
@ -69,7 +66,8 @@ def request_with_retries(method, *args, **kwargs):
if r.status_code == 200:
return r
logger.warn(
f"Server returned a status code of {r.status_code} while downloading {url}. Retrying ({i+1}/{attempts})...")
f"Server returned a status code of {r.status_code} while downloading {url}. Retrying ({i+1}/{attempts})..."
)
raise RuntimeError(f"Failed to download {url} too many times.")
@ -84,15 +82,13 @@ class Client:
self.proxy = proxy
self.session = requests.Session()
self.adapter = requests.adapters.HTTPAdapter(
pool_connections=100, pool_maxsize=100)
pool_connections=100, pool_maxsize=100
)
self.session.mount("http://", self.adapter)
self.session.mount("https://", self.adapter)
if proxy:
self.session.proxies = {
"http": self.proxy,
"https": self.proxy
}
self.session.proxies = {"http": self.proxy, "https": self.proxy}
logger.info(f"Proxy enabled: {self.proxy}")
self.active_messages = {}
@ -124,11 +120,11 @@ class Client:
self.subscribe()
def extract_formkey(self, html):
script_regex = r'<script>if\(.+\)throw new Error;(.+)</script>'
script_regex = r"<script>if\(.+\)throw new Error;(.+)</script>"
script_text = re.search(script_regex, html).group(1)
key_regex = r'var .="([0-9a-f]+)",'
key_text = re.search(key_regex, script_text).group(1)
cipher_regex = r'.\[(\d+)\]=.\[(\d+)\]'
cipher_regex = r".\[(\d+)\]=.\[(\d+)\]"
cipher_pairs = re.findall(cipher_regex, script_text)
formkey_list = [""] * len(cipher_pairs)
@ -143,7 +139,9 @@ class Client:
logger.info("Downloading next_data...")
r = request_with_retries(self.session.get, self.home_url)
json_regex = r'<script id="__NEXT_DATA__" type="application\/json">(.+?)</script>'
json_regex = (
r'<script id="__NEXT_DATA__" type="application\/json">(.+?)</script>'
)
json_text = re.search(json_regex, r.text).group(1)
next_data = json.loads(json_text)
@ -181,8 +179,7 @@ class Client:
bots[chat_data["defaultBotObject"]["nickname"]] = chat_data
for bot in bot_list:
thread = threading.Thread(
target=get_bot_thread, args=(bot,), daemon=True)
thread = threading.Thread(target=get_bot_thread, args=(bot,), daemon=True)
threads.append(thread)
for thread in threads:
@ -216,50 +213,59 @@ class Client:
if channel is None:
channel = self.channel
query = f'?min_seq={channel["minSeq"]}&channel={channel["channel"]}&hash={channel["channelHash"]}'
return f'wss://{self.ws_domain}.tch.{channel["baseHost"]}/up/{channel["boxName"]}/updates'+query
return (
f'wss://{self.ws_domain}.tch.{channel["baseHost"]}/up/{channel["boxName"]}/updates'
+ query
)
def send_query(self, query_name, variables):
for i in range(20):
json_data = generate_payload(query_name, variables)
payload = json.dumps(json_data, separators=(",", ":"))
base_string = payload + \
self.gql_headers["poe-formkey"] + "WpuLMiXEKKE98j56k"
base_string = (
payload + self.gql_headers["poe-formkey"] + "WpuLMiXEKKE98j56k"
)
headers = {
"content-type": "application/json",
"poe-tag-id": hashlib.md5(base_string.encode()).hexdigest()
"poe-tag-id": hashlib.md5(base_string.encode()).hexdigest(),
}
headers = {**self.gql_headers, **headers}
r = request_with_retries(
self.session.post, self.gql_url, data=payload, headers=headers)
self.session.post, self.gql_url, data=payload, headers=headers
)
data = r.json()
if data["data"] == None:
logger.warn(
f'{query_name} returned an error: {data["errors"][0]["message"]} | Retrying ({i+1}/20)')
f'{query_name} returned an error: {data["errors"][0]["message"]} | Retrying ({i+1}/20)'
)
time.sleep(2)
continue
return r.json()
raise RuntimeError(f'{query_name} failed too many times.')
raise RuntimeError(f"{query_name} failed too many times.")
def subscribe(self):
logger.info("Subscribing to mutations")
result = self.send_query("SubscriptionsMutation", {
"subscriptions": [
{
"subscriptionName": "messageAdded",
"query": queries["MessageAddedSubscription"]
},
{
"subscriptionName": "viewerStateUpdated",
"query": queries["ViewerStateUpdatedSubscription"]
}
]
})
result = self.send_query(
"SubscriptionsMutation",
{
"subscriptions": [
{
"subscriptionName": "messageAdded",
"query": queries["MessageAddedSubscription"],
},
{
"subscriptionName": "viewerStateUpdated",
"query": queries["ViewerStateUpdatedSubscription"],
},
]
},
)
def ws_run_thread(self):
kwargs = {}
@ -268,7 +274,7 @@ class Client:
kwargs = {
"proxy_type": proxy_parsed.scheme,
"http_proxy_host": proxy_parsed.hostname,
"http_proxy_port": proxy_parsed.port
"http_proxy_port": proxy_parsed.port,
}
self.ws.run_forever(**kwargs)
@ -281,7 +287,7 @@ class Client:
on_message=self.on_message,
on_open=self.on_ws_connect,
on_error=self.on_ws_error,
on_close=self.on_ws_close
on_close=self.on_ws_close,
)
t = threading.Thread(target=self.ws_run_thread, daemon=True)
t.start()
@ -299,7 +305,8 @@ class Client:
def on_ws_close(self, ws, close_status_code, close_message):
self.ws_connected = False
logger.warn(
f"Websocket closed with status {close_status_code}: {close_message}")
f"Websocket closed with status {close_status_code}: {close_message}"
)
def on_ws_error(self, ws, error):
self.disconnect_ws()
@ -326,7 +333,11 @@ class Client:
return
# indicate that the response id is tied to the human message id
elif key != "pending" and value == None and message["state"] != "complete":
elif (
key != "pending"
and value == None
and message["state"] != "complete"
):
self.active_messages[key] = message["messageId"]
self.message_queues[key].put(message)
return
@ -352,13 +363,16 @@ class Client:
self.setup_connection()
self.connect_ws()
message_data = self.send_query("SendMessageMutation", {
"bot": chatbot,
"query": message,
"chatId": self.bots[chatbot]["chatId"],
"source": None,
"withChatBreak": with_chat_break
})
message_data = self.send_query(
"SendMessageMutation",
{
"bot": chatbot,
"query": message,
"chatId": self.bots[chatbot]["chatId"],
"source": None,
"withChatBreak": with_chat_break,
},
)
del self.active_messages["pending"]
if not message_data["data"]["messageEdgeCreate"]["message"]:
@ -368,7 +382,8 @@ class Client:
human_message_id = human_message["node"]["messageId"]
except TypeError:
raise RuntimeError(
f"An unknown error occurred. Raw response data: {message_data}")
f"An unknown error occurred. Raw response data: {message_data}"
)
# indicate that the current message is waiting for a response
self.active_messages[human_message_id] = None
@ -378,8 +393,7 @@ class Client:
message_id = None
while True:
try:
message = self.message_queues[human_message_id].get(
timeout=timeout)
message = self.message_queues[human_message_id].get(timeout=timeout)
except queue.Empty:
del self.active_messages[human_message_id]
del self.message_queues[human_message_id]
@ -393,7 +407,7 @@ class Client:
continue
# update info about response
message["text_new"] = message["text"][len(last_text):]
message["text_new"] = message["text"][len(last_text) :]
last_text = message["text"]
message_id = message["messageId"]
@ -404,9 +418,9 @@ class Client:
def send_chat_break(self, chatbot):
logger.info(f"Sending chat break to {chatbot}")
result = self.send_query("AddMessageBreakMutation", {
"chatId": self.bots[chatbot]["chatId"]
})
result = self.send_query(
"AddMessageBreakMutation", {"chatId": self.bots[chatbot]["chatId"]}
)
return result["data"]["messageBreakCreate"]["message"]
def get_message_history(self, chatbot, count=25, cursor=None):
@ -423,23 +437,24 @@ class Client:
cursor = str(cursor)
if count > 50:
messages = self.get_message_history(
chatbot, count=50, cursor=cursor) + messages
messages = (
self.get_message_history(chatbot, count=50, cursor=cursor) + messages
)
while count > 0:
count -= 50
new_cursor = messages[0]["cursor"]
new_messages = self.get_message_history(
chatbot, min(50, count), cursor=new_cursor)
chatbot, min(50, count), cursor=new_cursor
)
messages = new_messages + messages
return messages
elif count <= 0:
return messages
result = self.send_query("ChatListPaginationQuery", {
"count": count,
"cursor": cursor,
"id": self.bots[chatbot]["id"]
})
result = self.send_query(
"ChatListPaginationQuery",
{"count": count, "cursor": cursor, "id": self.bots[chatbot]["id"]},
)
query_messages = result["data"]["node"]["messagesConnection"]["edges"]
messages = query_messages + messages
return messages
@ -449,9 +464,7 @@ class Client:
if not type(message_ids) is list:
message_ids = [int(message_ids)]
result = self.send_query("DeleteMessageMutation", {
"messageIds": message_ids
})
result = self.send_query("DeleteMessageMutation", {"messageIds": message_ids})
def purge_conversation(self, chatbot, count=-1):
logger.info(f"Purging messages from {chatbot}")
@ -471,60 +484,93 @@ class Client:
last_messages = self.get_message_history(chatbot, count=50)[::-1]
logger.info(f"No more messages left to delete.")
def create_bot(self, handle, prompt="", base_model="chinchilla", description="",
intro_message="", api_key=None, api_bot=False, api_url=None,
prompt_public=True, pfp_url=None, linkification=False,
markdown_rendering=True, suggested_replies=False, private=False):
result = self.send_query("PoeBotCreateMutation", {
"model": base_model,
"handle": handle,
"prompt": prompt,
"isPromptPublic": prompt_public,
"introduction": intro_message,
"description": description,
"profilePictureUrl": pfp_url,
"apiUrl": api_url,
"apiKey": api_key,
"isApiBot": api_bot,
"hasLinkification": linkification,
"hasMarkdownRendering": markdown_rendering,
"hasSuggestedReplies": suggested_replies,
"isPrivateBot": private
})
def create_bot(
self,
handle,
prompt="",
base_model="chinchilla",
description="",
intro_message="",
api_key=None,
api_bot=False,
api_url=None,
prompt_public=True,
pfp_url=None,
linkification=False,
markdown_rendering=True,
suggested_replies=False,
private=False,
):
result = self.send_query(
"PoeBotCreateMutation",
{
"model": base_model,
"handle": handle,
"prompt": prompt,
"isPromptPublic": prompt_public,
"introduction": intro_message,
"description": description,
"profilePictureUrl": pfp_url,
"apiUrl": api_url,
"apiKey": api_key,
"isApiBot": api_bot,
"hasLinkification": linkification,
"hasMarkdownRendering": markdown_rendering,
"hasSuggestedReplies": suggested_replies,
"isPrivateBot": private,
},
)
data = result["data"]["poeBotCreate"]
if data["status"] != "success":
raise RuntimeError(
f"Poe returned an error while trying to create a bot: {data['status']}")
f"Poe returned an error while trying to create a bot: {data['status']}"
)
self.get_bots()
return data
def edit_bot(self, bot_id, handle, prompt="", base_model="chinchilla", description="",
intro_message="", api_key=None, api_url=None, private=False,
prompt_public=True, pfp_url=None, linkification=False,
markdown_rendering=True, suggested_replies=False):
result = self.send_query("PoeBotEditMutation", {
"baseBot": base_model,
"botId": bot_id,
"handle": handle,
"prompt": prompt,
"isPromptPublic": prompt_public,
"introduction": intro_message,
"description": description,
"profilePictureUrl": pfp_url,
"apiUrl": api_url,
"apiKey": api_key,
"hasLinkification": linkification,
"hasMarkdownRendering": markdown_rendering,
"hasSuggestedReplies": suggested_replies,
"isPrivateBot": private
})
def edit_bot(
self,
bot_id,
handle,
prompt="",
base_model="chinchilla",
description="",
intro_message="",
api_key=None,
api_url=None,
private=False,
prompt_public=True,
pfp_url=None,
linkification=False,
markdown_rendering=True,
suggested_replies=False,
):
result = self.send_query(
"PoeBotEditMutation",
{
"baseBot": base_model,
"botId": bot_id,
"handle": handle,
"prompt": prompt,
"isPromptPublic": prompt_public,
"introduction": intro_message,
"description": description,
"profilePictureUrl": pfp_url,
"apiUrl": api_url,
"apiKey": api_key,
"hasLinkification": linkification,
"hasMarkdownRendering": markdown_rendering,
"hasSuggestedReplies": suggested_replies,
"isPrivateBot": private,
},
)
data = result["data"]["poeBotEdit"]
if data["status"] != "success":
raise RuntimeError(
f"Poe returned an error while trying to edit a bot: {data['status']}")
f"Poe returned an error while trying to edit a bot: {data['status']}"
)
self.get_bots()
return data

View file

@ -1,66 +1,66 @@
from json import loads
from re import findall
from time import sleep
from fake_useragent import UserAgent
from requests import Session
from time import sleep
from re import search, findall
from json import loads
class Emailnator:
def __init__(self) -> None:
self.client = Session()
self.client.get('https://www.emailnator.com/', timeout=6)
self.client.get("https://www.emailnator.com/", timeout=6)
self.cookies = self.client.cookies.get_dict()
self.client.headers = {
'authority' : 'www.emailnator.com',
'origin' : 'https://www.emailnator.com',
'referer' : 'https://www.emailnator.com/',
'user-agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.0.0 Safari/537.36 Edg/101.0.1722.39',
'x-xsrf-token' : self.client.cookies.get("XSRF-TOKEN")[:-3]+"=",
"authority": "www.emailnator.com",
"origin": "https://www.emailnator.com",
"referer": "https://www.emailnator.com/",
"user-agent": UserAgent().random,
"x-xsrf-token": self.client.cookies.get("XSRF-TOKEN")[:-3] + "=",
}
self.email = None
def get_mail(self):
response = self.client.post('https://www.emailnator.com/generate-email',json = {
'email': [
'domain',
'plusGmail',
'dotGmail',
]
})
response = self.client.post(
"https://www.emailnator.com/generate-email",
json={
"email": [
"domain",
"plusGmail",
"dotGmail",
]
},
)
self.email = loads(response.text)["email"][0]
return self.email
def get_message(self):
print("waiting for code...")
while True:
sleep(2)
mail_token = self.client.post('https://www.emailnator.com/message-list',
json = {'email': self.email})
mail_token = self.client.post(
"https://www.emailnator.com/message-list", json={"email": self.email}
)
mail_token = loads(mail_token.text)["messageData"]
if len(mail_token) == 2:
print(mail_token[1]["messageID"])
break
mail_context = self.client.post('https://www.emailnator.com/message-list', json = {
'email' : self.email,
'messageID': mail_token[1]["messageID"],
})
mail_context = self.client.post(
"https://www.emailnator.com/message-list",
json={
"email": self.email,
"messageID": mail_token[1]["messageID"],
},
)
return mail_context.text
# mail_client = Emailnator()
# mail_adress = mail_client.get_mail()
# print(mail_adress)
# mail_content = mail_client.get_message()
# print(mail_content)
# code = findall(r';">(\d{6,7})</div>', mail_content)[0]
# print(code)
def get_verification_code(self):
return findall(r';">(\d{6,7})</div>', self.get_message())[0]

View file

@ -4,4 +4,6 @@ tls-client
pypasser
names
colorama
curl_cffi
curl_cffi
selenium
fake-useragent