1
0
Fork 0
GPT4FREE/quora/api.py
2023-04-15 15:42:07 +01:00

407 lines
14 KiB
Python

import requests
import re
import json
import random
import logging
import time
import queue
import threading
import traceback
import websocket
from pathlib import Path
from urllib.parse import urlparse
parent_path = Path(__file__).resolve().parent
queries_path = parent_path / "graphql"
queries = {}
logging.basicConfig()
logger = logging.getLogger()
user_agent = "Mozilla/5.0 (X11; Linux x86_64; rv:102.0) Gecko/20100101 Firefox/102.0"
def load_queries():
for path in queries_path.iterdir():
if path.suffix != ".graphql":
continue
with open(path) as f:
queries[path.stem] = f.read()
def generate_payload(query_name, variables):
return {
"query": queries[query_name],
"variables": variables
}
def request_with_retries(method, *args, **kwargs):
attempts = kwargs.get("attempts") or 10
url = args[0]
for i in range(attempts):
r = method(*args, **kwargs)
if r.status_code == 200:
return r
logger.warn(
f"Server returned a status code of {r.status_code} while downloading {url}. Retrying ({i+1}/{attempts})...")
raise RuntimeError(f"Failed to download {url} too many times.")
class Client:
gql_url = "https://poe.com/api/gql_POST"
gql_recv_url = "https://poe.com/api/receive_POST"
home_url = "https://poe.com"
settings_url = "https://poe.com/api/settings"
def __init__(self, token, proxy=None):
self.proxy = proxy
self.session = requests.Session()
if proxy:
self.session.proxies = {
"http": self.proxy,
"https": self.proxy
}
logger.info(f"Proxy enabled: {self.proxy}")
self.active_messages = {}
self.message_queues = {}
self.session.cookies.set("p-b", token, domain="poe.com")
self.headers = {
"User-Agent": user_agent,
"Referrer": "https://poe.com/",
"Origin": "https://poe.com",
}
self.ws_domain = f"tch{random.randint(1, 1e6)}"
self.session.headers.update(self.headers)
self.next_data = self.get_next_data(overwrite_vars=True)
self.channel = self.get_channel_data()
self.connect_ws()
self.bots = self.get_bots(download_next_data=False)
self.bot_names = self.get_bot_names()
self.gql_headers = {
"poe-formkey": self.formkey,
"poe-tchannel": self.channel["channel"],
}
self.gql_headers = {**self.gql_headers, **self.headers}
self.subscribe()
def get_next_data(self, overwrite_vars=False):
logger.info("Downloading next_data...")
r = request_with_retries(self.session.get, self.home_url)
json_regex = r'<script id="__NEXT_DATA__" type="application\/json">(.+?)</script>'
json_text = re.search(json_regex, r.text).group(1)
next_data = json.loads(json_text)
if overwrite_vars:
self.formkey = next_data["props"]["formkey"]
self.viewer = next_data["props"]["pageProps"]["payload"]["viewer"]
return next_data
def get_bot(self, display_name):
url = f'https://poe.com/_next/data/{self.next_data["buildId"]}/{display_name}.json'
logger.info("Downloading "+url)
r = request_with_retries(self.session.get, url)
chat_data = r.json()["pageProps"]["payload"]["chatOfBotDisplayName"]
return chat_data
def get_bots(self, download_next_data=True):
if download_next_data:
next_data = self.get_next_data()
else:
next_data = self.next_data
if not "availableBots" in self.viewer:
raise RuntimeError("Invalid token or no bots are available.")
bot_list = self.viewer["availableBots"]
bots = {}
for bot in bot_list:
chat_data = self.get_bot(bot["displayName"])
bots[chat_data["defaultBotObject"]["nickname"]] = chat_data
self.bots = bots
self.bot_names = self.get_bot_names()
return bots
def get_bot_names(self):
bot_names = {}
for bot_nickname in self.bots:
bot_obj = self.bots[bot_nickname]["defaultBotObject"]
bot_names[bot_nickname] = bot_obj["displayName"]
return bot_names
def get_channel_data(self, channel=None):
logger.info("Downloading channel data...")
r = request_with_retries(self.session.get, self.settings_url)
data = r.json()
self.formkey = data["formkey"]
return data["tchannelData"]
def get_websocket_url(self, channel=None):
if channel is None:
channel = self.channel
query = f'?min_seq={channel["minSeq"]}&channel={channel["channel"]}&hash={channel["channelHash"]}'
return f'wss://{self.ws_domain}.tch.{channel["baseHost"]}/up/{channel["boxName"]}/updates'+query
def send_query(self, query_name, variables):
for i in range(20):
json_data = generate_payload(query_name, variables)
payload = json.dumps(json_data, separators=(',', ':'))
base_string = payload + self.gql_headers['poe-formkey'] + 'WpuLMiXEKKE98j56k'
from hashlib import md5
headers = self.gql_headers |{
"content-type": "application/json",
"poe-tag-id": md5(base_string.encode()).hexdigest()
}
r = request_with_retries(
self.session.post, self.gql_url, data=payload, headers=headers)
data = r.json()
if data["data"] == None:
logger.warn(
f'{query_name} returned an error: {data["errors"][0]["message"]} | Retrying ({i+1}/20)')
time.sleep(2)
continue
return r.json()
raise RuntimeError(f'{query_name} failed too many times.')
def subscribe(self):
logger.info("Subscribing to mutations")
result = self.send_query("SubscriptionsMutation", {
"subscriptions": [
{
"subscriptionName": "messageAdded",
"query": queries["MessageAddedSubscription"]
},
{
"subscriptionName": "viewerStateUpdated",
"query": queries["ViewerStateUpdatedSubscription"]
}
]
})
def ws_run_thread(self):
kwargs = {}
if self.proxy:
proxy_parsed = urlparse(self.proxy)
kwargs = {
"proxy_type": proxy_parsed.scheme,
"http_proxy_host": proxy_parsed.hostname,
"http_proxy_port": proxy_parsed.port
}
self.ws.run_forever(**kwargs)
def connect_ws(self):
self.ws_connected = False
self.ws = websocket.WebSocketApp(
self.get_websocket_url(),
header={"User-Agent": user_agent},
on_message=self.on_message,
on_open=self.on_ws_connect,
on_error=self.on_ws_error,
on_close=self.on_ws_close
)
t = threading.Thread(target=self.ws_run_thread, daemon=True)
t.start()
while not self.ws_connected:
time.sleep(0.01)
def disconnect_ws(self):
if self.ws:
self.ws.close()
self.ws_connected = False
def on_ws_connect(self, ws):
self.ws_connected = True
def on_ws_close(self, ws, close_status_code, close_message):
self.ws_connected = False
logger.warn(
f"Websocket closed with status {close_status_code}: {close_message}")
def on_ws_error(self, ws, error):
self.disconnect_ws()
self.connect_ws()
def on_message(self, ws, msg):
try:
data = json.loads(msg)
if not "messages" in data:
return
for message_str in data["messages"]:
message_data = json.loads(message_str)
if message_data["message_type"] != "subscriptionUpdate":
continue
message = message_data["payload"]["data"]["messageAdded"]
copied_dict = self.active_messages.copy()
for key, value in copied_dict.items():
# add the message to the appropriate queue
if value == message["messageId"] and key in self.message_queues:
self.message_queues[key].put(message)
return
# indicate that the response id is tied to the human message id
elif key != "pending" and value == None and message["state"] != "complete":
self.active_messages[key] = message["messageId"]
self.message_queues[key].put(message)
return
except Exception:
logger.error(traceback.format_exc())
self.disconnect_ws()
self.connect_ws()
def send_message(self, chatbot, message, with_chat_break=False, timeout=20):
# if there is another active message, wait until it has finished sending
while None in self.active_messages.values():
time.sleep(0.01)
# None indicates that a message is still in progress
self.active_messages["pending"] = None
logger.info(f"Sending message to {chatbot}: {message}")
# reconnect websocket
if not self.ws_connected:
self.disconnect_ws()
self.connect_ws()
message_data = self.send_query("SendMessageMutation", {
"bot": chatbot,
"query": message,
"chatId": self.bots[chatbot]["chatId"],
"source": None,
"withChatBreak": with_chat_break
})
del self.active_messages["pending"]
if not message_data["data"]["messageEdgeCreate"]["message"]:
raise RuntimeError(f"Daily limit reached for {chatbot}.")
try:
human_message = message_data["data"]["messageEdgeCreate"]["message"]
human_message_id = human_message["node"]["messageId"]
except TypeError:
raise RuntimeError(
f"An unknown error occured. Raw response data: {message_data}")
# indicate that the current message is waiting for a response
self.active_messages[human_message_id] = None
self.message_queues[human_message_id] = queue.Queue()
last_text = ""
message_id = None
while True:
try:
message = self.message_queues[human_message_id].get(
timeout=timeout)
except queue.Empty:
del self.active_messages[human_message_id]
del self.message_queues[human_message_id]
raise RuntimeError("Response timed out.")
# only break when the message is marked as complete
if message["state"] == "complete":
if last_text and message["messageId"] == message_id:
break
else:
continue
# update info about response
message["text_new"] = message["text"][len(last_text):]
last_text = message["text"]
message_id = message["messageId"]
yield message
del self.active_messages[human_message_id]
del self.message_queues[human_message_id]
def send_chat_break(self, chatbot):
logger.info(f"Sending chat break to {chatbot}")
result = self.send_query("AddMessageBreakMutation", {
"chatId": self.bots[chatbot]["chatId"]
})
return result["data"]["messageBreakCreate"]["message"]
def get_message_history(self, chatbot, count=25, cursor=None):
logger.info(f"Downloading {count} messages from {chatbot}")
messages = []
if cursor == None:
chat_data = self.get_bot(self.bot_names[chatbot])
if not chat_data["messagesConnection"]["edges"]:
return []
messages = chat_data["messagesConnection"]["edges"][:count]
cursor = chat_data["messagesConnection"]["pageInfo"]["startCursor"]
count -= len(messages)
cursor = str(cursor)
if count > 50:
messages = self.get_message_history(
chatbot, count=50, cursor=cursor) + messages
while count > 0:
count -= 50
new_cursor = messages[0]["cursor"]
new_messages = self.get_message_history(
chatbot, min(50, count), cursor=new_cursor)
messages = new_messages + messages
return messages
elif count <= 0:
return messages
result = self.send_query("ChatListPaginationQuery", {
"count": count,
"cursor": cursor,
"id": self.bots[chatbot]["id"]
})
query_messages = result["data"]["node"]["messagesConnection"]["edges"]
messages = query_messages + messages
return messages
def delete_message(self, message_ids):
logger.info(f"Deleting messages: {message_ids}")
if not type(message_ids) is list:
message_ids = [int(message_ids)]
result = self.send_query("DeleteMessageMutation", {
"messageIds": message_ids
})
def purge_conversation(self, chatbot, count=-1):
logger.info(f"Purging messages from {chatbot}")
last_messages = self.get_message_history(chatbot, count=50)[::-1]
while last_messages:
message_ids = []
for message in last_messages:
if count == 0:
break
count -= 1
message_ids.append(message["node"]["messageId"])
self.delete_message(message_ids)
if count == 0:
return
last_messages = self.get_message_history(chatbot, count=50)[::-1]
logger.info(f"No more messages left to delete.")
load_queries()