1
0
Fork 0
GPT4FREE/openai_rev/quora/api.py

546 lines
19 KiB
Python
Raw Normal View History

# This file was taken from the repository poe-api https://github.com/ading2210/poe-api and is unmodified
# This file is licensed under the GNU GPL v3 and written by @ading2210
2023-04-18 15:16:09 +00:00
# license:
# ading2210/poe-api: a reverse engineered Python API wrapepr for Quora's Poe
# Copyright (C) 2023 ading2210
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
2023-04-18 15:16:09 +00:00
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import hashlib
2023-03-29 20:10:42 +00:00
import json
import logging
import queue
import random
import re
2023-03-29 20:10:42 +00:00
import threading
import time
import traceback
2023-03-29 20:10:42 +00:00
from pathlib import Path
from urllib.parse import urlparse
import requests
import requests.adapters
import websocket
2023-04-16 16:10:37 +00:00
2023-03-29 20:10:42 +00:00
parent_path = Path(__file__).resolve().parent
2023-04-19 14:51:06 +00:00
queries_path = parent_path / "graphql"
2023-03-29 20:10:42 +00:00
queries = {}
logging.basicConfig()
logger = logging.getLogger()
user_agent = "Mozilla/5.0 (X11; Linux x86_64; rv:102.0) Gecko/20100101 Firefox/102.0"
def load_queries():
for path in queries_path.iterdir():
if path.suffix != ".graphql":
continue
with open(path) as f:
queries[path.stem] = f.read()
def generate_payload(query_name, variables):
return {"query": queries[query_name], "variables": variables}
2023-03-29 20:10:42 +00:00
def request_with_retries(method, *args, **kwargs):
attempts = kwargs.get("attempts") or 10
url = args[0]
for i in range(attempts):
r = method(*args, **kwargs)
if r.status_code == 200:
return r
logger.warn(
f"Server returned a status code of {r.status_code} while downloading {url}. Retrying ({i + 1}/{attempts})..."
)
2023-03-29 20:10:42 +00:00
raise RuntimeError(f"Failed to download {url} too many times.")
class Client:
gql_url = "https://poe.com/api/gql_POST"
gql_recv_url = "https://poe.com/api/receive_POST"
home_url = "https://poe.com"
settings_url = "https://poe.com/api/settings"
def __init__(self, token, proxy=None):
self.proxy = proxy
self.session = requests.Session()
self.adapter = requests.adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100)
2023-04-18 15:16:09 +00:00
self.session.mount("http://", self.adapter)
self.session.mount("https://", self.adapter)
2023-03-29 20:10:42 +00:00
if proxy:
self.session.proxies = {"http": self.proxy, "https": self.proxy}
2023-03-29 20:10:42 +00:00
logger.info(f"Proxy enabled: {self.proxy}")
self.active_messages = {}
self.message_queues = {}
2023-03-29 20:10:42 +00:00
self.session.cookies.set("p-b", token, domain="poe.com")
self.headers = {
"User-Agent": user_agent,
"Referrer": "https://poe.com/",
"Origin": "https://poe.com",
}
self.session.headers.update(self.headers)
2023-04-16 16:10:37 +00:00
self.setup_connection()
self.connect_ws()
def setup_connection(self):
self.ws_domain = f"tch{random.randint(1, 1e6)}"
self.next_data = self.get_next_data(overwrite_vars=True)
2023-03-29 20:10:42 +00:00
self.channel = self.get_channel_data()
self.bots = self.get_bots(download_next_data=False)
2023-03-29 20:10:42 +00:00
self.bot_names = self.get_bot_names()
self.gql_headers = {
"poe-formkey": self.formkey,
"poe-tchannel": self.channel["channel"],
}
self.gql_headers = {**self.gql_headers, **self.headers}
self.subscribe()
2023-04-16 16:10:37 +00:00
def extract_formkey(self, html):
script_regex = r"<script>if\(.+\)throw new Error;(.+)</script>"
2023-04-16 16:10:37 +00:00
script_text = re.search(script_regex, html).group(1)
key_regex = r'var .="([0-9a-f]+)",'
key_text = re.search(key_regex, script_text).group(1)
cipher_regex = r".\[(\d+)\]=.\[(\d+)\]"
2023-04-16 16:10:37 +00:00
cipher_pairs = re.findall(cipher_regex, script_text)
formkey_list = [""] * len(cipher_pairs)
for pair in cipher_pairs:
formkey_index, key_index = map(int, pair)
formkey_list[formkey_index] = key_text[key_index]
formkey = "".join(formkey_list)
return formkey
def get_next_data(self, overwrite_vars=False):
2023-03-29 20:10:42 +00:00
logger.info("Downloading next_data...")
r = request_with_retries(self.session.get, self.home_url)
json_regex = r'<script id="__NEXT_DATA__" type="application\/json">(.+?)</script>'
2023-03-29 20:10:42 +00:00
json_text = re.search(json_regex, r.text).group(1)
next_data = json.loads(json_text)
if overwrite_vars:
2023-04-16 16:10:37 +00:00
self.formkey = self.extract_formkey(r.text)
self.viewer = next_data["props"]["pageProps"]["payload"]["viewer"]
2023-04-18 15:16:09 +00:00
self.next_data = next_data
2023-03-29 20:10:42 +00:00
return next_data
def get_bot(self, display_name):
url = f'https://poe.com/_next/data/{self.next_data["buildId"]}/{display_name}.json'
r = request_with_retries(self.session.get, url)
chat_data = r.json()["pageProps"]["payload"]["chatOfBotDisplayName"]
return chat_data
def get_bots(self, download_next_data=True):
2023-04-18 15:16:09 +00:00
logger.info("Downloading all bots...")
if download_next_data:
2023-04-18 15:16:09 +00:00
next_data = self.get_next_data(overwrite_vars=True)
else:
next_data = self.next_data
if not "availableBots" in self.viewer:
raise RuntimeError("Invalid token or no bots are available.")
bot_list = self.viewer["availableBots"]
2023-03-29 20:10:42 +00:00
2023-04-18 15:16:09 +00:00
threads = []
2023-03-29 20:10:42 +00:00
bots = {}
2023-04-18 15:16:09 +00:00
def get_bot_thread(bot):
chat_data = self.get_bot(bot["displayName"])
2023-03-29 20:10:42 +00:00
bots[chat_data["defaultBotObject"]["nickname"]] = chat_data
2023-04-18 15:16:09 +00:00
for bot in bot_list:
thread = threading.Thread(target=get_bot_thread, args=(bot,), daemon=True)
2023-04-18 15:16:09 +00:00
threads.append(thread)
for thread in threads:
thread.start()
for thread in threads:
thread.join()
self.bots = bots
self.bot_names = self.get_bot_names()
2023-03-29 20:10:42 +00:00
return bots
def get_bot_names(self):
bot_names = {}
for bot_nickname in self.bots:
bot_obj = self.bots[bot_nickname]["defaultBotObject"]
bot_names[bot_nickname] = bot_obj["displayName"]
return bot_names
2023-04-18 15:16:09 +00:00
def get_remaining_messages(self, chatbot):
chat_data = self.get_bot(self.bot_names[chatbot])
return chat_data["defaultBotObject"]["messageLimit"]["numMessagesRemaining"]
2023-03-29 20:10:42 +00:00
def get_channel_data(self, channel=None):
logger.info("Downloading channel data...")
r = request_with_retries(self.session.get, self.settings_url)
data = r.json()
return data["tchannelData"]
def get_websocket_url(self, channel=None):
if channel is None:
channel = self.channel
query = f'?min_seq={channel["minSeq"]}&channel={channel["channel"]}&hash={channel["channelHash"]}'
return f'wss://{self.ws_domain}.tch.{channel["baseHost"]}/up/{channel["boxName"]}/updates' + query
2023-03-29 20:10:42 +00:00
def send_query(self, query_name, variables):
for i in range(20):
json_data = generate_payload(query_name, variables)
2023-04-16 16:10:37 +00:00
payload = json.dumps(json_data, separators=(",", ":"))
base_string = payload + self.gql_headers["poe-formkey"] + "WpuLMiXEKKE98j56k"
2023-04-16 16:10:37 +00:00
headers = {
"content-type": "application/json",
"poe-tag-id": hashlib.md5(base_string.encode()).hexdigest(),
}
2023-04-16 16:10:37 +00:00
headers = {**self.gql_headers, **headers}
r = request_with_retries(self.session.post, self.gql_url, data=payload, headers=headers)
2023-04-16 16:10:37 +00:00
2023-03-29 20:10:42 +00:00
data = r.json()
if data["data"] == None:
logger.warn(f'{query_name} returned an error: {data["errors"][0]["message"]} | Retrying ({i + 1}/20)')
2023-03-29 20:10:42 +00:00
time.sleep(2)
continue
return r.json()
raise RuntimeError(f"{query_name} failed too many times.")
2023-03-29 20:10:42 +00:00
def subscribe(self):
logger.info("Subscribing to mutations")
result = self.send_query(
"SubscriptionsMutation",
{
"subscriptions": [
{
"subscriptionName": "messageAdded",
"query": queries["MessageAddedSubscription"],
},
{
"subscriptionName": "viewerStateUpdated",
"query": queries["ViewerStateUpdatedSubscription"],
},
]
},
)
2023-03-29 20:10:42 +00:00
def ws_run_thread(self):
kwargs = {}
if self.proxy:
proxy_parsed = urlparse(self.proxy)
kwargs = {
"proxy_type": proxy_parsed.scheme,
"http_proxy_host": proxy_parsed.hostname,
"http_proxy_port": proxy_parsed.port,
2023-03-29 20:10:42 +00:00
}
self.ws.run_forever(**kwargs)
def connect_ws(self):
self.ws_connected = False
2023-03-29 20:10:42 +00:00
self.ws = websocket.WebSocketApp(
self.get_websocket_url(),
header={"User-Agent": user_agent},
on_message=self.on_message,
on_open=self.on_ws_connect,
on_error=self.on_ws_error,
on_close=self.on_ws_close,
2023-03-29 20:10:42 +00:00
)
t = threading.Thread(target=self.ws_run_thread, daemon=True)
t.start()
while not self.ws_connected:
time.sleep(0.01)
def disconnect_ws(self):
if self.ws:
self.ws.close()
self.ws_connected = False
def on_ws_connect(self, ws):
self.ws_connected = True
def on_ws_close(self, ws, close_status_code, close_message):
self.ws_connected = False
logger.warn(f"Websocket closed with status {close_status_code}: {close_message}")
2023-03-29 20:10:42 +00:00
def on_ws_error(self, ws, error):
self.disconnect_ws()
self.connect_ws()
def on_message(self, ws, msg):
try:
data = json.loads(msg)
if not "messages" in data:
2023-03-29 20:10:42 +00:00
return
for message_str in data["messages"]:
message_data = json.loads(message_str)
if message_data["message_type"] != "subscriptionUpdate":
continue
message = message_data["payload"]["data"]["messageAdded"]
copied_dict = self.active_messages.copy()
for key, value in copied_dict.items():
# add the message to the appropriate queue
if value == message["messageId"] and key in self.message_queues:
self.message_queues[key].put(message)
return
# indicate that the response id is tied to the human message id
elif key != "pending" and value == None and message["state"] != "complete":
self.active_messages[key] = message["messageId"]
self.message_queues[key].put(message)
return
except Exception:
logger.error(traceback.format_exc())
self.disconnect_ws()
self.connect_ws()
2023-03-29 20:10:42 +00:00
def send_message(self, chatbot, message, with_chat_break=False, timeout=20):
# if there is another active message, wait until it has finished sending
while None in self.active_messages.values():
time.sleep(0.01)
# None indicates that a message is still in progress
self.active_messages["pending"] = None
logger.info(f"Sending message to {chatbot}: {message}")
2023-04-16 16:10:37 +00:00
# reconnect websocket
if not self.ws_connected:
self.disconnect_ws()
2023-04-16 16:10:37 +00:00
self.setup_connection()
self.connect_ws()
2023-04-16 16:10:37 +00:00
message_data = self.send_query(
"SendMessageMutation",
{
"bot": chatbot,
"query": message,
"chatId": self.bots[chatbot]["chatId"],
"source": None,
"withChatBreak": with_chat_break,
},
)
2023-03-29 20:10:42 +00:00
del self.active_messages["pending"]
if not message_data["data"]["messageEdgeCreate"]["message"]:
2023-03-29 20:10:42 +00:00
raise RuntimeError(f"Daily limit reached for {chatbot}.")
try:
human_message = message_data["data"]["messageEdgeCreate"]["message"]
human_message_id = human_message["node"]["messageId"]
2023-03-29 20:10:42 +00:00
except TypeError:
raise RuntimeError(f"An unknown error occurred. Raw response data: {message_data}")
2023-03-29 20:10:42 +00:00
# indicate that the current message is waiting for a response
self.active_messages[human_message_id] = None
self.message_queues[human_message_id] = queue.Queue()
last_text = ""
message_id = None
while True:
try:
message = self.message_queues[human_message_id].get(timeout=timeout)
2023-03-29 20:10:42 +00:00
except queue.Empty:
del self.active_messages[human_message_id]
del self.message_queues[human_message_id]
raise RuntimeError("Response timed out.")
# only break when the message is marked as complete
if message["state"] == "complete":
if last_text and message["messageId"] == message_id:
break
else:
continue
# update info about response
message["text_new"] = message["text"][len(last_text) :]
2023-03-29 20:10:42 +00:00
last_text = message["text"]
message_id = message["messageId"]
yield message
del self.active_messages[human_message_id]
del self.message_queues[human_message_id]
def send_chat_break(self, chatbot):
logger.info(f"Sending chat break to {chatbot}")
result = self.send_query("AddMessageBreakMutation", {"chatId": self.bots[chatbot]["chatId"]})
return result["data"]["messageBreakCreate"]["message"]
def get_message_history(self, chatbot, count=25, cursor=None):
logger.info(f"Downloading {count} messages from {chatbot}")
messages = []
if cursor == None:
chat_data = self.get_bot(self.bot_names[chatbot])
if not chat_data["messagesConnection"]["edges"]:
return []
messages = chat_data["messagesConnection"]["edges"][:count]
cursor = chat_data["messagesConnection"]["pageInfo"]["startCursor"]
count -= len(messages)
cursor = str(cursor)
if count > 50:
messages = self.get_message_history(chatbot, count=50, cursor=cursor) + messages
while count > 0:
count -= 50
new_cursor = messages[0]["cursor"]
new_messages = self.get_message_history(chatbot, min(50, count), cursor=new_cursor)
messages = new_messages + messages
return messages
elif count <= 0:
return messages
result = self.send_query(
"ChatListPaginationQuery",
{"count": count, "cursor": cursor, "id": self.bots[chatbot]["id"]},
)
query_messages = result["data"]["node"]["messagesConnection"]["edges"]
messages = query_messages + messages
return messages
def delete_message(self, message_ids):
logger.info(f"Deleting messages: {message_ids}")
if not type(message_ids) is list:
message_ids = [int(message_ids)]
result = self.send_query("DeleteMessageMutation", {"messageIds": message_ids})
def purge_conversation(self, chatbot, count=-1):
logger.info(f"Purging messages from {chatbot}")
last_messages = self.get_message_history(chatbot, count=50)[::-1]
while last_messages:
message_ids = []
for message in last_messages:
if count == 0:
break
count -= 1
message_ids.append(message["node"]["messageId"])
self.delete_message(message_ids)
if count == 0:
return
last_messages = self.get_message_history(chatbot, count=50)[::-1]
logger.info(f"No more messages left to delete.")
def create_bot(
self,
handle,
prompt="",
base_model="chinchilla",
description="",
intro_message="",
api_key=None,
api_bot=False,
api_url=None,
prompt_public=True,
pfp_url=None,
linkification=False,
markdown_rendering=True,
suggested_replies=False,
private=False,
):
result = self.send_query(
"PoeBotCreateMutation",
{
"model": base_model,
"handle": handle,
"prompt": prompt,
"isPromptPublic": prompt_public,
"introduction": intro_message,
"description": description,
"profilePictureUrl": pfp_url,
"apiUrl": api_url,
"apiKey": api_key,
"isApiBot": api_bot,
"hasLinkification": linkification,
"hasMarkdownRendering": markdown_rendering,
"hasSuggestedReplies": suggested_replies,
"isPrivateBot": private,
},
)
2023-04-18 15:16:09 +00:00
data = result["data"]["poeBotCreate"]
if data["status"] != "success":
raise RuntimeError(f"Poe returned an error while trying to create a bot: {data['status']}")
2023-04-18 15:16:09 +00:00
self.get_bots()
return data
def edit_bot(
self,
bot_id,
handle,
prompt="",
base_model="chinchilla",
description="",
intro_message="",
api_key=None,
api_url=None,
private=False,
prompt_public=True,
pfp_url=None,
linkification=False,
markdown_rendering=True,
suggested_replies=False,
):
result = self.send_query(
"PoeBotEditMutation",
{
"baseBot": base_model,
"botId": bot_id,
"handle": handle,
"prompt": prompt,
"isPromptPublic": prompt_public,
"introduction": intro_message,
"description": description,
"profilePictureUrl": pfp_url,
"apiUrl": api_url,
"apiKey": api_key,
"hasLinkification": linkification,
"hasMarkdownRendering": markdown_rendering,
"hasSuggestedReplies": suggested_replies,
"isPrivateBot": private,
},
)
2023-04-18 15:16:09 +00:00
data = result["data"]["poeBotEdit"]
if data["status"] != "success":
raise RuntimeError(f"Poe returned an error while trying to edit a bot: {data['status']}")
2023-04-18 15:16:09 +00:00
self.get_bots()
return data
load_queries()