From a37920b27b73627fd100e6cb7e3e692f8e4fd3fd Mon Sep 17 00:00:00 2001 From: Raju Komati Date: Mon, 24 Apr 2023 03:08:47 +0530 Subject: [PATCH] updated quora module, added selenium to get cookie --- .gitignore | 16 ++ quora/__init__.py | 584 +++++++++++++++++++++++++++------------------- quora/api.py | 258 +++++++++++--------- quora/mail.py | 86 +++---- requirements.txt | 4 +- 5 files changed, 562 insertions(+), 386 deletions(-) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..300ad22 --- /dev/null +++ b/.gitignore @@ -0,0 +1,16 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml + +.idea/ + +*/__pycache__/ + +*.log + +cookie.json diff --git a/quora/__init__.py b/quora/__init__.py index 72e4cc4..c02246f 100644 --- a/quora/__init__.py +++ b/quora/__init__.py @@ -1,28 +1,42 @@ -from quora.api import Client as PoeClient -from quora.mail import Emailnator -from requests import Session -from tls_client import Session as TLS -from re import search, findall -from json import loads -from time import sleep -from pathlib import Path -from random import choice, choices, randint -from string import ascii_letters, digits -from urllib import parse -from os import urandom -from hashlib import md5 -from json import dumps -from pypasser import reCaptchaV3 +import json +from datetime import datetime +from hashlib import md5 +from json import dumps +from pathlib import Path +from random import choice, choices, randint +from re import search, findall +from string import ascii_letters, digits +from urllib.parse import unquote + +import selenium.webdriver.support.expected_conditions as EC +from pypasser import reCaptchaV3 +from requests import Session +from selenium import webdriver +from selenium.webdriver.common.by import By +from selenium.webdriver.support.wait import WebDriverWait +from tls_client import Session as TLS + +from quora.api import Client as PoeClient +from quora.mail import Emailnator # from twocaptcha import TwoCaptcha # solver = TwoCaptcha('72747bf24a9d89b4dcc1b24875efd358') +MODELS = { + "sage": "capybara", + "gpt-4": "beaver", + "claude-v1.2": "a2_2", + "claude-instant-v1.0": "a2", + "gpt-3.5-turbo": "chinchilla", +} + + def extract_formkey(html): - script_regex = r'' - script_text = search(script_regex, html).group(1) - key_regex = r'var .="([0-9a-f]+)",' - key_text = search(key_regex, script_text).group(1) - cipher_regex = r'.\[(\d+)\]=.\[(\d+)\]' + script_regex = r"" + script_text = search(script_regex, html).group(1) + key_regex = r'var .="([0-9a-f]+)",' + key_text = search(key_regex, script_text).group(1) + cipher_regex = r".\[(\d+)\]=.\[(\d+)\]" cipher_pairs = findall(cipher_regex, script_text) formkey_list = [""] * len(cipher_pairs) @@ -33,42 +47,40 @@ def extract_formkey(html): return formkey + class PoeResponse: - class Completion: - class Choices: def __init__(self, choice: dict) -> None: - self.text = choice['text'] - self.content = self.text.encode() - self.index = choice['index'] - self.logprobs = choice['logprobs'] - self.finish_reason = choice['finish_reason'] - + self.text = choice["text"] + self.content = self.text.encode() + self.index = choice["index"] + self.logprobs = choice["logprobs"] + self.finish_reason = choice["finish_reason"] + def __repr__(self) -> str: - return f'''<__main__.APIResponse.Completion.Choices(\n text = {self.text.encode()},\n index = {self.index},\n logprobs = {self.logprobs},\n finish_reason = {self.finish_reason})object at 0x1337>''' + return f"""<__main__.APIResponse.Completion.Choices(\n text = {self.text.encode()},\n index = {self.index},\n logprobs = {self.logprobs},\n finish_reason = {self.finish_reason})object at 0x1337>""" def __init__(self, choices: dict) -> None: self.choices = [self.Choices(choice) for choice in choices] class Usage: def __init__(self, usage_dict: dict) -> None: - self.prompt_tokens = usage_dict['prompt_tokens'] - self.completion_tokens = usage_dict['completion_tokens'] - self.total_tokens = usage_dict['total_tokens'] + self.prompt_tokens = usage_dict["prompt_tokens"] + self.completion_tokens = usage_dict["completion_tokens"] + self.total_tokens = usage_dict["total_tokens"] def __repr__(self): - return f'''<__main__.APIResponse.Usage(\n prompt_tokens = {self.prompt_tokens},\n completion_tokens = {self.completion_tokens},\n total_tokens = {self.total_tokens})object at 0x1337>''' - + return f"""<__main__.APIResponse.Usage(\n prompt_tokens = {self.prompt_tokens},\n completion_tokens = {self.completion_tokens},\n total_tokens = {self.total_tokens})object at 0x1337>""" + def __init__(self, response_dict: dict) -> None: - - self.response_dict = response_dict - self.id = response_dict['id'] - self.object = response_dict['object'] - self.created = response_dict['created'] - self.model = response_dict['model'] - self.completion = self.Completion(response_dict['choices']) - self.usage = self.Usage(response_dict['usage']) + self.response_dict = response_dict + self.id = response_dict["id"] + self.object = response_dict["object"] + self.created = response_dict["created"] + self.model = response_dict["model"] + self.completion = self.Completion(response_dict["choices"]) + self.usage = self.Usage(response_dict["usage"]) def json(self) -> dict: return self.response_dict @@ -76,127 +88,140 @@ class PoeResponse: class ModelResponse: def __init__(self, json_response: dict) -> None: - self.id = json_response['data']['poeBotCreate']['bot']['id'] - self.name = json_response['data']['poeBotCreate']['bot']['displayName'] - self.limit = json_response['data']['poeBotCreate']['bot']['messageLimit']['dailyLimit'] - self.deleted = json_response['data']['poeBotCreate']['bot']['deletionState'] + self.id = json_response["data"]["poeBotCreate"]["bot"]["id"] + self.name = json_response["data"]["poeBotCreate"]["bot"]["displayName"] + self.limit = json_response["data"]["poeBotCreate"]["bot"]["messageLimit"][ + "dailyLimit" + ] + self.deleted = json_response["data"]["poeBotCreate"]["bot"]["deletionState"] + class Model: def create( token: str, - model: str = 'gpt-3.5-turbo', # claude-instant - system_prompt: str = 'You are ChatGPT a large language model developed by Openai. Answer as consisely as possible', - description: str = 'gpt-3.5 language model from openai, skidded by poe.com', - handle: str = None) -> ModelResponse: - + model: str = "gpt-3.5-turbo", # claude-instant + system_prompt: str = "You are ChatGPT a large language model developed by Openai. Answer as consisely as possible", + description: str = "gpt-3.5 language model from openai, skidded by poe.com", + handle: str = None, + ) -> ModelResponse: models = { - 'gpt-3.5-turbo' : 'chinchilla', - 'claude-instant-v1.0': 'a2', - 'gpt-4': 'beaver' + "gpt-3.5-turbo": "chinchilla", + "claude-instant-v1.0": "a2", + "gpt-4": "beaver", } - + if not handle: - handle = f'gptx{randint(1111111, 9999999)}' - + handle = f"gptx{randint(1111111, 9999999)}" + client = Session() - client.cookies['p-b'] = token - - formkey = extract_formkey(client.get('https://poe.com').text) - settings = client.get('https://poe.com/api/settings').json() + client.cookies["p-b"] = token + + formkey = extract_formkey(client.get("https://poe.com").text) + settings = client.get("https://poe.com/api/settings").json() client.headers = { - "host" : "poe.com", - "origin" : "https://poe.com", - "referer" : "https://poe.com/", - "content-type" : "application/json", - "poe-formkey" : formkey, - "poe-tchannel" : settings['tchannelData']['channel'], - "user-agent" : "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36", - "connection" : "keep-alive", - "sec-ch-ua" : "\"Chromium\";v=\"112\", \"Google Chrome\";v=\"112\", \"Not:A-Brand\";v=\"99\"", - "sec-ch-ua-mobile" : "?0", - "sec-ch-ua-platform": "\"macOS\"", - "content-type" : "application/json", - "sec-fetch-site" : "same-origin", - "sec-fetch-mode" : "cors", - "sec-fetch-dest" : "empty", - "accept" : "*/*", - "accept-encoding" : "gzip, deflate, br", - "accept-language" : "en-GB,en-US;q=0.9,en;q=0.8", + "host": "poe.com", + "origin": "https://poe.com", + "referer": "https://poe.com/", + "poe-formkey": formkey, + "poe-tchannel": settings["tchannelData"]["channel"], + "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36", + "connection": "keep-alive", + "sec-ch-ua": '"Chromium";v="112", "Google Chrome";v="112", "Not:A-Brand";v="99"', + "sec-ch-ua-mobile": "?0", + "sec-ch-ua-platform": '"macOS"', + "content-type": "application/json", + "sec-fetch-site": "same-origin", + "sec-fetch-mode": "cors", + "sec-fetch-dest": "empty", + "accept": "*/*", + "accept-encoding": "gzip, deflate, br", + "accept-language": "en-GB,en-US;q=0.9,en;q=0.8", } - - payload = dumps(separators=(',', ':'), obj = { - 'queryName': 'CreateBotMain_poeBotCreate_Mutation', - 'variables': { - 'model' : models[model], - 'handle' : handle, - 'prompt' : system_prompt, - 'isPromptPublic' : True, - 'introduction' : '', - 'description' : description, - 'profilePictureUrl' : 'https://qph.fs.quoracdn.net/main-qimg-24e0b480dcd946e1cc6728802c5128b6', - 'apiUrl' : None, - 'apiKey' : ''.join(choices(ascii_letters + digits, k = 32)), - 'isApiBot' : False, - 'hasLinkification' : False, - 'hasMarkdownRendering' : False, - 'hasSuggestedReplies' : False, - 'isPrivateBot' : False + + payload = dumps( + separators=(",", ":"), + obj={ + "queryName": "CreateBotMain_poeBotCreate_Mutation", + "variables": { + "model": models[model], + "handle": handle, + "prompt": system_prompt, + "isPromptPublic": True, + "introduction": "", + "description": description, + "profilePictureUrl": "https://qph.fs.quoracdn.net/main-qimg-24e0b480dcd946e1cc6728802c5128b6", + "apiUrl": None, + "apiKey": "".join(choices(ascii_letters + digits, k=32)), + "isApiBot": False, + "hasLinkification": False, + "hasMarkdownRendering": False, + "hasSuggestedReplies": False, + "isPrivateBot": False, + }, + "query": "mutation CreateBotMain_poeBotCreate_Mutation(\n $model: String!\n $handle: String!\n $prompt: String!\n $isPromptPublic: Boolean!\n $introduction: String!\n $description: String!\n $profilePictureUrl: String\n $apiUrl: String\n $apiKey: String\n $isApiBot: Boolean\n $hasLinkification: Boolean\n $hasMarkdownRendering: Boolean\n $hasSuggestedReplies: Boolean\n $isPrivateBot: Boolean\n) {\n poeBotCreate(model: $model, handle: $handle, promptPlaintext: $prompt, isPromptPublic: $isPromptPublic, introduction: $introduction, description: $description, profilePicture: $profilePictureUrl, apiUrl: $apiUrl, apiKey: $apiKey, isApiBot: $isApiBot, hasLinkification: $hasLinkification, hasMarkdownRendering: $hasMarkdownRendering, hasSuggestedReplies: $hasSuggestedReplies, isPrivateBot: $isPrivateBot) {\n status\n bot {\n id\n ...BotHeader_bot\n }\n }\n}\n\nfragment BotHeader_bot on Bot {\n displayName\n messageLimit {\n dailyLimit\n }\n ...BotImage_bot\n ...BotLink_bot\n ...IdAnnotation_node\n ...botHelpers_useViewerCanAccessPrivateBot\n ...botHelpers_useDeletion_bot\n}\n\nfragment BotImage_bot on Bot {\n displayName\n ...botHelpers_useDeletion_bot\n ...BotImage_useProfileImage_bot\n}\n\nfragment BotImage_useProfileImage_bot on Bot {\n image {\n __typename\n ... on LocalBotImage {\n localName\n }\n ... on UrlBotImage {\n url\n }\n }\n ...botHelpers_useDeletion_bot\n}\n\nfragment BotLink_bot on Bot {\n displayName\n}\n\nfragment IdAnnotation_node on Node {\n __isNode: __typename\n id\n}\n\nfragment botHelpers_useDeletion_bot on Bot {\n deletionState\n}\n\nfragment botHelpers_useViewerCanAccessPrivateBot on Bot {\n isPrivateBot\n viewerIsCreator\n}\n", }, - 'query': 'mutation CreateBotMain_poeBotCreate_Mutation(\n $model: String!\n $handle: String!\n $prompt: String!\n $isPromptPublic: Boolean!\n $introduction: String!\n $description: String!\n $profilePictureUrl: String\n $apiUrl: String\n $apiKey: String\n $isApiBot: Boolean\n $hasLinkification: Boolean\n $hasMarkdownRendering: Boolean\n $hasSuggestedReplies: Boolean\n $isPrivateBot: Boolean\n) {\n poeBotCreate(model: $model, handle: $handle, promptPlaintext: $prompt, isPromptPublic: $isPromptPublic, introduction: $introduction, description: $description, profilePicture: $profilePictureUrl, apiUrl: $apiUrl, apiKey: $apiKey, isApiBot: $isApiBot, hasLinkification: $hasLinkification, hasMarkdownRendering: $hasMarkdownRendering, hasSuggestedReplies: $hasSuggestedReplies, isPrivateBot: $isPrivateBot) {\n status\n bot {\n id\n ...BotHeader_bot\n }\n }\n}\n\nfragment BotHeader_bot on Bot {\n displayName\n messageLimit {\n dailyLimit\n }\n ...BotImage_bot\n ...BotLink_bot\n ...IdAnnotation_node\n ...botHelpers_useViewerCanAccessPrivateBot\n ...botHelpers_useDeletion_bot\n}\n\nfragment BotImage_bot on Bot {\n displayName\n ...botHelpers_useDeletion_bot\n ...BotImage_useProfileImage_bot\n}\n\nfragment BotImage_useProfileImage_bot on Bot {\n image {\n __typename\n ... on LocalBotImage {\n localName\n }\n ... on UrlBotImage {\n url\n }\n }\n ...botHelpers_useDeletion_bot\n}\n\nfragment BotLink_bot on Bot {\n displayName\n}\n\nfragment IdAnnotation_node on Node {\n __isNode: __typename\n id\n}\n\nfragment botHelpers_useDeletion_bot on Bot {\n deletionState\n}\n\nfragment botHelpers_useViewerCanAccessPrivateBot on Bot {\n isPrivateBot\n viewerIsCreator\n}\n', - }) + ) - base_string = payload + client.headers["poe-formkey"] + 'WpuLMiXEKKE98j56k' - client.headers["poe-tag-id"] = md5(base_string.encode()).hexdigest() - - response = client.post("https://poe.com/api/gql_POST", data = payload) + base_string = payload + client.headers["poe-formkey"] + "WpuLMiXEKKE98j56k" + client.headers["poe-tag-id"] = md5(base_string.encode()).hexdigest() - if not 'success' in response.text: - raise Exception(''' + response = client.post("https://poe.com/api/gql_POST", data=payload) + + if "success" not in response.text: + raise Exception( + """ Bot creation Failed !! Important !! Bot creation was not enabled on this account please use: quora.Account.create with enable_bot_creation set to True - ''') - + """ + ) + return ModelResponse(response.json()) class Account: - def create(proxy: None or str = None, logging: bool = False, enable_bot_creation: bool = False): - client = TLS(client_identifier='chrome110') - client.proxies = { - 'http': f'http://{proxy}', - 'https': f'http://{proxy}'} if proxy else None + def create( + proxy: str | None = None, + logging: bool = False, + enable_bot_creation: bool = False, + ): + client = TLS(client_identifier="chrome110") + client.proxies = ( + {"http": f"http://{proxy}", "https": f"http://{proxy}"} if proxy else None + ) - mail_client = Emailnator() - mail_address = mail_client.get_mail() + mail_client = Emailnator() + mail_address = mail_client.get_mail() - if logging: print('email', mail_address) + if logging: + print("email", mail_address) client.headers = { - 'authority' : 'poe.com', - 'accept' : '*/*', - 'accept-language': 'en,fr-FR;q=0.9,fr;q=0.8,es-ES;q=0.7,es;q=0.6,en-US;q=0.5,am;q=0.4,de;q=0.3', - 'content-type' : 'application/json', - 'origin' : 'https://poe.com', - 'poe-formkey' : 'null', - 'poe-tag-id' : 'null', - 'poe-tchannel' : 'null', - 'referer' : 'https://poe.com/login', - 'sec-ch-ua' : '"Chromium";v="112", "Google Chrome";v="112", "Not:A-Brand";v="99"', - 'sec-ch-ua-mobile' : '?0', - 'sec-ch-ua-platform': '"macOS"', - 'sec-fetch-dest': 'empty', - 'sec-fetch-mode': 'cors', - 'sec-fetch-site': 'same-origin', - 'user-agent' : 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36' + "authority": "poe.com", + "accept": "*/*", + "accept-language": "en,fr-FR;q=0.9,fr;q=0.8,es-ES;q=0.7,es;q=0.6,en-US;q=0.5,am;q=0.4,de;q=0.3", + "content-type": "application/json", + "origin": "https://poe.com", + "poe-tag-id": "null", + "referer": "https://poe.com/login", + "sec-ch-ua": '"Chromium";v="112", "Google Chrome";v="112", "Not:A-Brand";v="99"', + "sec-ch-ua-mobile": "?0", + "sec-ch-ua-platform": '"macOS"', + "sec-fetch-dest": "empty", + "sec-fetch-mode": "cors", + "sec-fetch-site": "same-origin", + "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36", + "poe-formkey": extract_formkey(client.get("https://poe.com/login").text), + "poe-tchannel": client.get("https://poe.com/api/settings").json()[ + "tchannelData" + ]["channel"], } - client.headers["poe-formkey"] = extract_formkey(client.get('https://poe.com/login').text) - client.headers["poe-tchannel"] = client.get('https://poe.com/api/settings').json()['tchannelData']['channel'] - - token = reCaptchaV3('https://www.recaptcha.net/recaptcha/enterprise/anchor?ar=1&k=6LflhEElAAAAAI_ewVwRWI9hsyV4mbZnYAslSvlG&co=aHR0cHM6Ly9wb2UuY29tOjQ0Mw..&hl=en&v=4PnKmGB9wRHh1i04o7YUICeI&size=invisible&cb=bi6ivxoskyal') + token = reCaptchaV3( + "https://www.recaptcha.net/recaptcha/enterprise/anchor?ar=1&k=6LflhEElAAAAAI_ewVwRWI9hsyV4mbZnYAslSvlG&co=aHR0cHM6Ly9wb2UuY29tOjQ0Mw..&hl=en&v=4PnKmGB9wRHh1i04o7YUICeI&size=invisible&cb=bi6ivxoskyal" + ) # token = solver.recaptcha(sitekey='6LflhEElAAAAAI_ewVwRWI9hsyV4mbZnYAslSvlG', # url = 'https://poe.com/login?redirect_url=%2F', # version = 'v3', @@ -204,132 +229,219 @@ class Account: # invisible = 1, # action = 'login',)['code'] - payload = dumps(separators = (',', ':'), obj = { - 'queryName': 'MainSignupLoginSection_sendVerificationCodeMutation_Mutation', - 'variables': { - 'emailAddress' : mail_address, - 'phoneNumber' : None, - 'recaptchaToken': token + payload = dumps( + separators=(",", ":"), + obj={ + "queryName": "MainSignupLoginSection_sendVerificationCodeMutation_Mutation", + "variables": { + "emailAddress": mail_address, + "phoneNumber": None, + "recaptchaToken": token, + }, + "query": "mutation MainSignupLoginSection_sendVerificationCodeMutation_Mutation(\n $emailAddress: String\n $phoneNumber: String\n $recaptchaToken: String\n) {\n sendVerificationCode(verificationReason: login, emailAddress: $emailAddress, phoneNumber: $phoneNumber, recaptchaToken: $recaptchaToken) {\n status\n errorMessage\n }\n}\n", }, - 'query': 'mutation MainSignupLoginSection_sendVerificationCodeMutation_Mutation(\n $emailAddress: String\n $phoneNumber: String\n $recaptchaToken: String\n) {\n sendVerificationCode(verificationReason: login, emailAddress: $emailAddress, phoneNumber: $phoneNumber, recaptchaToken: $recaptchaToken) {\n status\n errorMessage\n }\n}\n', - }) + ) + + base_string = payload + client.headers["poe-formkey"] + "WpuLMiXEKKE98j56k" + client.headers["poe-tag-id"] = md5(base_string.encode()).hexdigest() - base_string = payload + client.headers["poe-formkey"] + 'WpuLMiXEKKE98j56k' - client.headers["poe-tag-id"] = md5(base_string.encode()).hexdigest() - print(dumps(client.headers, indent=4)) - - response = client.post('https://poe.com/api/gql_POST', data=payload) - - if 'automated_request_detected' in response.text: - print('please try using a proxy / wait for fix') - - if 'Bad Request' in response.text: - if logging: print('bad request, retrying...' , response.json()) + + response = client.post("https://poe.com/api/gql_POST", data=payload) + + if "automated_request_detected" in response.text: + print("please try using a proxy / wait for fix") + + if "Bad Request" in response.text: + if logging: + print("bad request, retrying...", response.json()) quit() - if logging: print('send_code' ,response.json()) - + if logging: + print("send_code", response.json()) + mail_content = mail_client.get_message() - mail_token = findall(r';">(\d{6,7})', mail_content)[0] + mail_token = findall(r';">(\d{6,7})', mail_content)[0] - if logging: print('code', mail_token) + if logging: + print("code", mail_token) - payload = dumps(separators = (',', ':'), obj={ - "queryName": "SignupOrLoginWithCodeSection_signupWithVerificationCodeMutation_Mutation", - "variables": { - "verificationCode": str(mail_token), - "emailAddress": mail_address, - "phoneNumber": None + payload = dumps( + separators=(",", ":"), + obj={ + "queryName": "SignupOrLoginWithCodeSection_signupWithVerificationCodeMutation_Mutation", + "variables": { + "verificationCode": str(mail_token), + "emailAddress": mail_address, + "phoneNumber": None, + }, + "query": "mutation SignupOrLoginWithCodeSection_signupWithVerificationCodeMutation_Mutation(\n $verificationCode: String!\n $emailAddress: String\n $phoneNumber: String\n) {\n signupWithVerificationCode(verificationCode: $verificationCode, emailAddress: $emailAddress, phoneNumber: $phoneNumber) {\n status\n errorMessage\n }\n}\n", }, - "query": "mutation SignupOrLoginWithCodeSection_signupWithVerificationCodeMutation_Mutation(\n $verificationCode: String!\n $emailAddress: String\n $phoneNumber: String\n) {\n signupWithVerificationCode(verificationCode: $verificationCode, emailAddress: $emailAddress, phoneNumber: $phoneNumber) {\n status\n errorMessage\n }\n}\n" - }) + ) - base_string = payload + client.headers["poe-formkey"] + 'WpuLMiXEKKE98j56k' - client.headers["poe-tag-id"] = md5(base_string.encode()).hexdigest() + base_string = payload + client.headers["poe-formkey"] + "WpuLMiXEKKE98j56k" + client.headers["poe-tag-id"] = md5(base_string.encode()).hexdigest() - response = client.post('https://poe.com/api/gql_POST', data = payload) - if logging: print('verify_code', response.json()) - - def get(): - cookies = open(Path(__file__).resolve().parent / 'cookies.txt', 'r').read().splitlines() + response = client.post("https://poe.com/api/gql_POST", data=payload) + if logging: + print("verify_code", response.json()) + + def get(self): + cookies = ( + open(Path(__file__).resolve().parent / "cookies.txt", "r") + .read() + .splitlines() + ) return choice(cookies) + class StreamingCompletion: def create( - model : str = 'gpt-4', - custom_model : bool = None, - prompt: str = 'hello world', - token : str = ''): + model: str = "gpt-4", + custom_model: bool = None, + prompt: str = "hello world", + token: str = "", + ): + _model = MODELS[model] if not custom_model else custom_model - models = { - 'sage' : 'capybara', - 'gpt-4' : 'beaver', - 'claude-v1.2' : 'a2_2', - 'claude-instant-v1.0' : 'a2', - 'gpt-3.5-turbo' : 'chinchilla' - } - - _model = models[model] if not custom_model else custom_model - client = PoeClient(token) - + for chunk in client.send_message(_model, prompt): - - yield PoeResponse({ - 'id' : chunk["messageId"], - 'object' : 'text_completion', - 'created': chunk['creationTime'], - 'model' : _model, - 'choices': [{ - 'text' : chunk["text_new"], - 'index' : 0, - 'logprobs' : None, - 'finish_reason' : 'stop' - }], - 'usage': { - 'prompt_tokens' : len(prompt), - 'completion_tokens' : len(chunk["text_new"]), - 'total_tokens' : len(prompt) + len(chunk["text_new"]) + yield PoeResponse( + { + "id": chunk["messageId"], + "object": "text_completion", + "created": chunk["creationTime"], + "model": _model, + "choices": [ + { + "text": chunk["text_new"], + "index": 0, + "logprobs": None, + "finish_reason": "stop", + } + ], + "usage": { + "prompt_tokens": len(prompt), + "completion_tokens": len(chunk["text_new"]), + "total_tokens": len(prompt) + len(chunk["text_new"]), + }, } - }) + ) + class Completion: def create( - model : str = 'gpt-4', - custom_model : str = None, - prompt: str = 'hello world', - token : str = ''): - + model: str = "gpt-4", + custom_model: str = None, + prompt: str = "hello world", + token: str = "", + ): models = { - 'sage' : 'capybara', - 'gpt-4' : 'beaver', - 'claude-v1.2' : 'a2_2', - 'claude-instant-v1.0' : 'a2', - 'gpt-3.5-turbo' : 'chinchilla' + "sage": "capybara", + "gpt-4": "beaver", + "claude-v1.2": "a2_2", + "claude-instant-v1.0": "a2", + "gpt-3.5-turbo": "chinchilla", } - + _model = models[model] if not custom_model else custom_model - + client = PoeClient(token) - + for chunk in client.send_message(_model, prompt): pass - - return PoeResponse({ - 'id' : chunk["messageId"], - 'object' : 'text_completion', - 'created': chunk['creationTime'], - 'model' : _model, - 'choices': [{ - 'text' : chunk["text"], - 'index' : 0, - 'logprobs' : None, - 'finish_reason' : 'stop' - }], - 'usage': { - 'prompt_tokens' : len(prompt), - 'completion_tokens' : len(chunk["text"]), - 'total_tokens' : len(prompt) + len(chunk["text"]) - } - }) \ No newline at end of file + + return PoeResponse( + { + "id": chunk["messageId"], + "object": "text_completion", + "created": chunk["creationTime"], + "model": _model, + "choices": [ + { + "text": chunk["text"], + "index": 0, + "logprobs": None, + "finish_reason": "stop", + } + ], + "usage": { + "prompt_tokens": len(prompt), + "completion_tokens": len(chunk["text"]), + "total_tokens": len(prompt) + len(chunk["text"]), + }, + } + ) + + +class Poe: + def __init__(self, model: str = "sage"): + self.cookie = self.__load_cookie() + self.model = MODELS[model] + self.client = PoeClient(self.cookie) + + def __load_cookie(self) -> str: + if (cookie_file := Path("cookie.json")).exists(): + with cookie_file.open() as fp: + cookie = json.load(fp) + if datetime.fromtimestamp(cookie["expiry"]) < datetime.now(): + cookie = self.__register_and_get_cookie() + else: + print("Loading the cookie from file") + else: + cookie = self.__register_and_get_cookie() + + return unquote(cookie["value"]) + + @classmethod + def __register_and_get_cookie(cls) -> dict: + mail_client = Emailnator() + mail_address = mail_client.get_mail() + + print(mail_address) + options = webdriver.FirefoxOptions() + options.add_argument("-headless") + driver = webdriver.Firefox(options=options) + + driver.get("https://www.poe.com") + + # clicking use email button + driver.find_element(By.XPATH, '//button[contains(text(), "Use email")]').click() + + email = WebDriverWait(driver, 30).until( + EC.presence_of_element_located((By.XPATH, '//input[@type="email"]')) + ) + email.send_keys(mail_address) + driver.find_element(By.XPATH, '//button[text()="Go"]').click() + + code = findall(r';">(\d{6,7})', mail_client.get_message())[0] + print(code) + + verification_code = WebDriverWait(driver, 30).until( + EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Code"]')) + ) + verification_code.send_keys(code) + verify_button = EC.presence_of_element_located( + (By.XPATH, '//button[text()="Verify"]') + ) + login_button = EC.presence_of_element_located( + (By.XPATH, '//button[text()="Log In"]') + ) + + WebDriverWait(driver, 30).until(EC.any_of(verify_button, login_button)).click() + + cookie = driver.get_cookie("p-b") + + with open("cookie.json", "w") as fw: + json.dump(cookie, fw) + + driver.close() + return cookie + + def chat(self, message: str): + response = None + for chunk in self.client.send_message(self.model, message): + response = chunk["text"] + + return response diff --git a/quora/api.py b/quora/api.py index 9436f86..b28c124 100644 --- a/quora/api.py +++ b/quora/api.py @@ -55,10 +55,7 @@ def load_queries(): def generate_payload(query_name, variables): - return { - "query": queries[query_name], - "variables": variables - } + return {"query": queries[query_name], "variables": variables} def request_with_retries(method, *args, **kwargs): @@ -69,7 +66,8 @@ def request_with_retries(method, *args, **kwargs): if r.status_code == 200: return r logger.warn( - f"Server returned a status code of {r.status_code} while downloading {url}. Retrying ({i+1}/{attempts})...") + f"Server returned a status code of {r.status_code} while downloading {url}. Retrying ({i+1}/{attempts})..." + ) raise RuntimeError(f"Failed to download {url} too many times.") @@ -84,15 +82,13 @@ class Client: self.proxy = proxy self.session = requests.Session() self.adapter = requests.adapters.HTTPAdapter( - pool_connections=100, pool_maxsize=100) + pool_connections=100, pool_maxsize=100 + ) self.session.mount("http://", self.adapter) self.session.mount("https://", self.adapter) if proxy: - self.session.proxies = { - "http": self.proxy, - "https": self.proxy - } + self.session.proxies = {"http": self.proxy, "https": self.proxy} logger.info(f"Proxy enabled: {self.proxy}") self.active_messages = {} @@ -124,11 +120,11 @@ class Client: self.subscribe() def extract_formkey(self, html): - script_regex = r'' + script_regex = r"" script_text = re.search(script_regex, html).group(1) key_regex = r'var .="([0-9a-f]+)",' key_text = re.search(key_regex, script_text).group(1) - cipher_regex = r'.\[(\d+)\]=.\[(\d+)\]' + cipher_regex = r".\[(\d+)\]=.\[(\d+)\]" cipher_pairs = re.findall(cipher_regex, script_text) formkey_list = [""] * len(cipher_pairs) @@ -143,7 +139,9 @@ class Client: logger.info("Downloading next_data...") r = request_with_retries(self.session.get, self.home_url) - json_regex = r'' + json_regex = ( + r'' + ) json_text = re.search(json_regex, r.text).group(1) next_data = json.loads(json_text) @@ -181,8 +179,7 @@ class Client: bots[chat_data["defaultBotObject"]["nickname"]] = chat_data for bot in bot_list: - thread = threading.Thread( - target=get_bot_thread, args=(bot,), daemon=True) + thread = threading.Thread(target=get_bot_thread, args=(bot,), daemon=True) threads.append(thread) for thread in threads: @@ -216,50 +213,59 @@ class Client: if channel is None: channel = self.channel query = f'?min_seq={channel["minSeq"]}&channel={channel["channel"]}&hash={channel["channelHash"]}' - return f'wss://{self.ws_domain}.tch.{channel["baseHost"]}/up/{channel["boxName"]}/updates'+query + return ( + f'wss://{self.ws_domain}.tch.{channel["baseHost"]}/up/{channel["boxName"]}/updates' + + query + ) def send_query(self, query_name, variables): for i in range(20): json_data = generate_payload(query_name, variables) payload = json.dumps(json_data, separators=(",", ":")) - base_string = payload + \ - self.gql_headers["poe-formkey"] + "WpuLMiXEKKE98j56k" + base_string = ( + payload + self.gql_headers["poe-formkey"] + "WpuLMiXEKKE98j56k" + ) headers = { "content-type": "application/json", - "poe-tag-id": hashlib.md5(base_string.encode()).hexdigest() + "poe-tag-id": hashlib.md5(base_string.encode()).hexdigest(), } headers = {**self.gql_headers, **headers} r = request_with_retries( - self.session.post, self.gql_url, data=payload, headers=headers) + self.session.post, self.gql_url, data=payload, headers=headers + ) data = r.json() if data["data"] == None: logger.warn( - f'{query_name} returned an error: {data["errors"][0]["message"]} | Retrying ({i+1}/20)') + f'{query_name} returned an error: {data["errors"][0]["message"]} | Retrying ({i+1}/20)' + ) time.sleep(2) continue return r.json() - raise RuntimeError(f'{query_name} failed too many times.') + raise RuntimeError(f"{query_name} failed too many times.") def subscribe(self): logger.info("Subscribing to mutations") - result = self.send_query("SubscriptionsMutation", { - "subscriptions": [ - { - "subscriptionName": "messageAdded", - "query": queries["MessageAddedSubscription"] - }, - { - "subscriptionName": "viewerStateUpdated", - "query": queries["ViewerStateUpdatedSubscription"] - } - ] - }) + result = self.send_query( + "SubscriptionsMutation", + { + "subscriptions": [ + { + "subscriptionName": "messageAdded", + "query": queries["MessageAddedSubscription"], + }, + { + "subscriptionName": "viewerStateUpdated", + "query": queries["ViewerStateUpdatedSubscription"], + }, + ] + }, + ) def ws_run_thread(self): kwargs = {} @@ -268,7 +274,7 @@ class Client: kwargs = { "proxy_type": proxy_parsed.scheme, "http_proxy_host": proxy_parsed.hostname, - "http_proxy_port": proxy_parsed.port + "http_proxy_port": proxy_parsed.port, } self.ws.run_forever(**kwargs) @@ -281,7 +287,7 @@ class Client: on_message=self.on_message, on_open=self.on_ws_connect, on_error=self.on_ws_error, - on_close=self.on_ws_close + on_close=self.on_ws_close, ) t = threading.Thread(target=self.ws_run_thread, daemon=True) t.start() @@ -299,7 +305,8 @@ class Client: def on_ws_close(self, ws, close_status_code, close_message): self.ws_connected = False logger.warn( - f"Websocket closed with status {close_status_code}: {close_message}") + f"Websocket closed with status {close_status_code}: {close_message}" + ) def on_ws_error(self, ws, error): self.disconnect_ws() @@ -326,7 +333,11 @@ class Client: return # indicate that the response id is tied to the human message id - elif key != "pending" and value == None and message["state"] != "complete": + elif ( + key != "pending" + and value == None + and message["state"] != "complete" + ): self.active_messages[key] = message["messageId"] self.message_queues[key].put(message) return @@ -352,13 +363,16 @@ class Client: self.setup_connection() self.connect_ws() - message_data = self.send_query("SendMessageMutation", { - "bot": chatbot, - "query": message, - "chatId": self.bots[chatbot]["chatId"], - "source": None, - "withChatBreak": with_chat_break - }) + message_data = self.send_query( + "SendMessageMutation", + { + "bot": chatbot, + "query": message, + "chatId": self.bots[chatbot]["chatId"], + "source": None, + "withChatBreak": with_chat_break, + }, + ) del self.active_messages["pending"] if not message_data["data"]["messageEdgeCreate"]["message"]: @@ -368,7 +382,8 @@ class Client: human_message_id = human_message["node"]["messageId"] except TypeError: raise RuntimeError( - f"An unknown error occurred. Raw response data: {message_data}") + f"An unknown error occurred. Raw response data: {message_data}" + ) # indicate that the current message is waiting for a response self.active_messages[human_message_id] = None @@ -378,8 +393,7 @@ class Client: message_id = None while True: try: - message = self.message_queues[human_message_id].get( - timeout=timeout) + message = self.message_queues[human_message_id].get(timeout=timeout) except queue.Empty: del self.active_messages[human_message_id] del self.message_queues[human_message_id] @@ -393,7 +407,7 @@ class Client: continue # update info about response - message["text_new"] = message["text"][len(last_text):] + message["text_new"] = message["text"][len(last_text) :] last_text = message["text"] message_id = message["messageId"] @@ -404,9 +418,9 @@ class Client: def send_chat_break(self, chatbot): logger.info(f"Sending chat break to {chatbot}") - result = self.send_query("AddMessageBreakMutation", { - "chatId": self.bots[chatbot]["chatId"] - }) + result = self.send_query( + "AddMessageBreakMutation", {"chatId": self.bots[chatbot]["chatId"]} + ) return result["data"]["messageBreakCreate"]["message"] def get_message_history(self, chatbot, count=25, cursor=None): @@ -423,23 +437,24 @@ class Client: cursor = str(cursor) if count > 50: - messages = self.get_message_history( - chatbot, count=50, cursor=cursor) + messages + messages = ( + self.get_message_history(chatbot, count=50, cursor=cursor) + messages + ) while count > 0: count -= 50 new_cursor = messages[0]["cursor"] new_messages = self.get_message_history( - chatbot, min(50, count), cursor=new_cursor) + chatbot, min(50, count), cursor=new_cursor + ) messages = new_messages + messages return messages elif count <= 0: return messages - result = self.send_query("ChatListPaginationQuery", { - "count": count, - "cursor": cursor, - "id": self.bots[chatbot]["id"] - }) + result = self.send_query( + "ChatListPaginationQuery", + {"count": count, "cursor": cursor, "id": self.bots[chatbot]["id"]}, + ) query_messages = result["data"]["node"]["messagesConnection"]["edges"] messages = query_messages + messages return messages @@ -449,9 +464,7 @@ class Client: if not type(message_ids) is list: message_ids = [int(message_ids)] - result = self.send_query("DeleteMessageMutation", { - "messageIds": message_ids - }) + result = self.send_query("DeleteMessageMutation", {"messageIds": message_ids}) def purge_conversation(self, chatbot, count=-1): logger.info(f"Purging messages from {chatbot}") @@ -471,60 +484,93 @@ class Client: last_messages = self.get_message_history(chatbot, count=50)[::-1] logger.info(f"No more messages left to delete.") - def create_bot(self, handle, prompt="", base_model="chinchilla", description="", - intro_message="", api_key=None, api_bot=False, api_url=None, - prompt_public=True, pfp_url=None, linkification=False, - markdown_rendering=True, suggested_replies=False, private=False): - result = self.send_query("PoeBotCreateMutation", { - "model": base_model, - "handle": handle, - "prompt": prompt, - "isPromptPublic": prompt_public, - "introduction": intro_message, - "description": description, - "profilePictureUrl": pfp_url, - "apiUrl": api_url, - "apiKey": api_key, - "isApiBot": api_bot, - "hasLinkification": linkification, - "hasMarkdownRendering": markdown_rendering, - "hasSuggestedReplies": suggested_replies, - "isPrivateBot": private - }) + def create_bot( + self, + handle, + prompt="", + base_model="chinchilla", + description="", + intro_message="", + api_key=None, + api_bot=False, + api_url=None, + prompt_public=True, + pfp_url=None, + linkification=False, + markdown_rendering=True, + suggested_replies=False, + private=False, + ): + result = self.send_query( + "PoeBotCreateMutation", + { + "model": base_model, + "handle": handle, + "prompt": prompt, + "isPromptPublic": prompt_public, + "introduction": intro_message, + "description": description, + "profilePictureUrl": pfp_url, + "apiUrl": api_url, + "apiKey": api_key, + "isApiBot": api_bot, + "hasLinkification": linkification, + "hasMarkdownRendering": markdown_rendering, + "hasSuggestedReplies": suggested_replies, + "isPrivateBot": private, + }, + ) data = result["data"]["poeBotCreate"] if data["status"] != "success": raise RuntimeError( - f"Poe returned an error while trying to create a bot: {data['status']}") + f"Poe returned an error while trying to create a bot: {data['status']}" + ) self.get_bots() return data - def edit_bot(self, bot_id, handle, prompt="", base_model="chinchilla", description="", - intro_message="", api_key=None, api_url=None, private=False, - prompt_public=True, pfp_url=None, linkification=False, - markdown_rendering=True, suggested_replies=False): - - result = self.send_query("PoeBotEditMutation", { - "baseBot": base_model, - "botId": bot_id, - "handle": handle, - "prompt": prompt, - "isPromptPublic": prompt_public, - "introduction": intro_message, - "description": description, - "profilePictureUrl": pfp_url, - "apiUrl": api_url, - "apiKey": api_key, - "hasLinkification": linkification, - "hasMarkdownRendering": markdown_rendering, - "hasSuggestedReplies": suggested_replies, - "isPrivateBot": private - }) + def edit_bot( + self, + bot_id, + handle, + prompt="", + base_model="chinchilla", + description="", + intro_message="", + api_key=None, + api_url=None, + private=False, + prompt_public=True, + pfp_url=None, + linkification=False, + markdown_rendering=True, + suggested_replies=False, + ): + result = self.send_query( + "PoeBotEditMutation", + { + "baseBot": base_model, + "botId": bot_id, + "handle": handle, + "prompt": prompt, + "isPromptPublic": prompt_public, + "introduction": intro_message, + "description": description, + "profilePictureUrl": pfp_url, + "apiUrl": api_url, + "apiKey": api_key, + "hasLinkification": linkification, + "hasMarkdownRendering": markdown_rendering, + "hasSuggestedReplies": suggested_replies, + "isPrivateBot": private, + }, + ) data = result["data"]["poeBotEdit"] if data["status"] != "success": raise RuntimeError( - f"Poe returned an error while trying to edit a bot: {data['status']}") + f"Poe returned an error while trying to edit a bot: {data['status']}" + ) self.get_bots() return data diff --git a/quora/mail.py b/quora/mail.py index a830502..c470786 100644 --- a/quora/mail.py +++ b/quora/mail.py @@ -1,66 +1,66 @@ +from json import loads +from re import findall +from time import sleep + +from fake_useragent import UserAgent from requests import Session -from time import sleep -from re import search, findall -from json import loads + class Emailnator: def __init__(self) -> None: self.client = Session() - self.client.get('https://www.emailnator.com/', timeout=6) + self.client.get("https://www.emailnator.com/", timeout=6) self.cookies = self.client.cookies.get_dict() self.client.headers = { - 'authority' : 'www.emailnator.com', - 'origin' : 'https://www.emailnator.com', - 'referer' : 'https://www.emailnator.com/', - 'user-agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.0.0 Safari/537.36 Edg/101.0.1722.39', - 'x-xsrf-token' : self.client.cookies.get("XSRF-TOKEN")[:-3]+"=", + "authority": "www.emailnator.com", + "origin": "https://www.emailnator.com", + "referer": "https://www.emailnator.com/", + "user-agent": UserAgent().random, + "x-xsrf-token": self.client.cookies.get("XSRF-TOKEN")[:-3] + "=", } - + self.email = None - + def get_mail(self): - response = self.client.post('https://www.emailnator.com/generate-email',json = { - 'email': [ - 'domain', - 'plusGmail', - 'dotGmail', - ] - }) - + response = self.client.post( + "https://www.emailnator.com/generate-email", + json={ + "email": [ + "domain", + "plusGmail", + "dotGmail", + ] + }, + ) + self.email = loads(response.text)["email"][0] return self.email - + def get_message(self): print("waiting for code...") - + while True: sleep(2) - mail_token = self.client.post('https://www.emailnator.com/message-list', - json = {'email': self.email}) - + mail_token = self.client.post( + "https://www.emailnator.com/message-list", json={"email": self.email} + ) + mail_token = loads(mail_token.text)["messageData"] - + if len(mail_token) == 2: print(mail_token[1]["messageID"]) break - - mail_context = self.client.post('https://www.emailnator.com/message-list', json = { - 'email' : self.email, - 'messageID': mail_token[1]["messageID"], - }) - + + mail_context = self.client.post( + "https://www.emailnator.com/message-list", + json={ + "email": self.email, + "messageID": mail_token[1]["messageID"], + }, + ) + return mail_context.text -# mail_client = Emailnator() -# mail_adress = mail_client.get_mail() - -# print(mail_adress) - -# mail_content = mail_client.get_message() - -# print(mail_content) - -# code = findall(r';">(\d{6,7})', mail_content)[0] -# print(code) - + def get_verification_code(self): + return findall(r';">(\d{6,7})', self.get_message())[0] diff --git a/requirements.txt b/requirements.txt index f9d6351..0f9413b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,6 @@ tls-client pypasser names colorama -curl_cffi \ No newline at end of file +curl_cffi +selenium +fake-useragent \ No newline at end of file