wstkeys/services/gg.py

421 lines
9.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import time
import json
import hashlib
from dotenv import load_dotenv
import os
load_dotenv()
TOKEN = os.getenv('GGTOKEN')
SELLER_ID = os.getenv('SELLER_ID')
API_KEY = os.getenv('GGTOKEN')
TOKEN_FILE = "./files/ggsel_token.json"
def save_token(token):
with open(TOKEN_FILE, "w") as f:
json.dump({"token": token}, f)
def load_token():
if not os.path.exists(TOKEN_FILE):
return None
with open(TOKEN_FILE) as f:
try:
data = json.load(f)
return data.get("token")
except json.JSONDecodeError:
return None
def safe_json(response):
if not response.text:
raise Exception("Empty response from API")
try:
return response.json()
except json.JSONDecodeError:
print("Invalid JSON response:")
print(response.text)
raise
def get_token():
timestamp = str(int(time.time()))
sign_string = API_KEY + timestamp
sign = hashlib.sha256(sign_string.encode()).hexdigest()
url = "https://seller.ggsel.com/api_sellers/api/apilogin"
payload = {
"seller_id": SELLER_ID,
"timestamp": timestamp,
"sign": sign
}
headers = {
"Content-Type": "application/json",
"Accept": "application/json"
}
response = requests.post(url, json=payload, headers=headers)
print(response.status_code)
res = response.json()
token = res['token']
save_token(token)
print(token)
return token
def balans():
token = load_token()
if not token:
token = get_token()
url = "https://seller.ggsel.com/api_sellers/api/sellers/account/balance/info"
headers = {"Accept": "application/json"}
params = {"token": token}
r = requests.get(url, params=params, headers=headers)
print(r.status_code)
# если токен умер
if r.status_code in (401, 403):
token = get_token()
params["token"] = token
r = requests.get(url, params=params, headers=headers)
return r.json()
def last_sales(top=10, group=False):
token = load_token()
if not token:
token = get_token()
url = "https://seller.ggsel.com/api_sellers/api/seller-last-sales"
headers = {
"Accept": "application/json",
"locale": "ru"
}
params = {
"token": token,
"seller_id": SELLER_ID,
"top": top,
"group": str(group).lower()
}
r = requests.get(url, params=params, headers=headers)
print(r.status_code)
# если токен умер
if r.status_code in (401, 403):
token = get_token()
params["token"] = token
r = requests.get(url, params=params, headers=headers)
return r.json()
# print(last_sales())
def get_messages(id_i, id_from=None, id_to=None, newer=None, count=50):
token = load_token()
if not token:
token = get_token()
url = "https://seller.ggsel.com/api_sellers/api/debates/v2"
headers = {
"Accept": "application/json"
}
params = {
"token": token,
"id_i": id_i,
"count": min(count, 100)
}
if id_from:
params["id_from"] = id_from
if id_to:
params["id_to"] = id_to
if newer:
params["newer"] = newer
r = requests.get(url, params=params, headers=headers)
print(r.status_code)
# если токен умер
if r.status_code in (401, 403):
token = get_token()
params["token"] = token
r = requests.get(url, params=params, headers=headers)
return r.json()
# пример
# print(get_messages(id_i=18840912))
def get_chats(page=1, pagesize=20, filter_new=None, email=None, id_ds=None):
token = load_token()
if not token:
token = get_token()
url = "https://seller.ggsel.com/api_sellers/api/debates/v2/chats"
headers = {
"Accept": "application/json"
}
params = {
"token": token,
"page": page,
"pagesize": pagesize
}
if filter_new is not None:
params["filter_new"] = filter_new
if email:
params["email"] = email
if id_ds:
params["id_ds"] = id_ds
r = requests.get(url, params=params, headers=headers)
print(r.status_code)
# если токен умер
if r.status_code in (401, 403):
token = get_token()
params["token"] = token
r = requests.get(url, params=params, headers=headers)
return r.json()
def get_product(product_id):
token = load_token()
# product_id = 102180432
url = f"https://seller.ggsel.com/api_sellers/api/products/{product_id}/data"
params = {
"token": token
}
headers = {
"Accept": "application/json"
}
r = requests.get(url, headers=headers, params=params)
print(r.status_code)
prud = r.json()
name = prud["product"]["name"]
im = prud["product"]["preview_imgs"][0]["url"]
return name, im
# return r.json()
print(get_product(102184895))
def create_message(id_i: int):
token = load_token()
message = (
f"🔹Спасибо за покупку в WST Keys (West Store Trusted Keys)\n\n"
f"🔑 Ваш ключ: 22222\n\n"
"📩 Копия ключа отправлена на вашу электронную почту.\n"
"💬 Если у вас возникнут вопросы — напишите нам.\n"
"⭐️ Нам очень важно ваше мнение, пожалуйста, оставьте отзыв."
)
url = "https://seller.ggsel.com/api_sellers/api/debates/v2"
params = {
"token": token,
"id_i": id_i
}
payload = {
"message": message
}
r = requests.post(url, params=params, json=payload)
if r.status_code == 200:
return {"success": True}
return {
"success": False,
"status": r.status_code,
"response": r.text
}
# пример вызова
# create_message(18094706)
def create_messagea(id_i: int, message):
print("SEND MESSAGE CALLED", id_i)
token = load_token()
# message = "Спасибо за покупку в WST Keys"
url = "https://seller.ggsel.com/api_sellers/api/debates/v2"
params = {
"token": token,
"id_i": id_i
}
payload = {
"message": message
}
r = requests.post(url, params=params, json=payload)
print("STATUS:", r.status_code)
return r.text
message = f"🔹Спасибо за покупку в WST Keys (West Store Trusted Keys)\n\n"
create_messagea(22289522, message)
# пример
def get_chats_by_email(email):
token = load_token()
message = (
f""
)
url = "https://seller.ggsel.com/api_sellers/api/debates/v2/chats"
params = {
"token": token,
'email': email,
'pagesize': 100,
'page': 1
}
payload = {
"message": message
}
r = requests.get(url, params=params)
print(r.text)
if r.status_code == 200:
return {"success": True}
return {
"success": False,
"status": r.status_code,
"response": r.text
}
# get_chats_by_email('fenomenplayerok@gmail.com')
def get_order_info(invoice_id: int, locale: str = "ru"):
token = load_token()
url = f"https://seller.ggsel.com/api_sellers/api/purchase/info/{invoice_id}"
headers = {
"Accept": "application/json",
"locale": locale
}
params = {
"token": token
}
r = requests.get(url, headers=headers, params=params)
return r.json() # или r.text если нужен сырой ответ
# # пример использования
# invoice_id = 22228752
# info = get_order_info(invoice_id)
# print(info)
# print(get_chats(email='fenomenplayerok@gmail.com'))
# def tokens():
# timestamp = str(int(time.time()))
# sign_string = API_KEY + timestamp
# sign = hashlib.sha256(sign_string.encode()).hexdigest()
# url = "https://seller.ggsel.com/api_sellers/api/apilogin"
# payload = {
# "seller_id": SELLER_ID,
# "timestamp": timestamp,
# "sign": sign
# }
# headers = {
# "Content-Type": "application/json",
# "Accept": "application/json"
# }
# response = requests.post(url, json=payload, headers=headers)
# print(response.status_code)
# res = response.json()
# token = res['token']
# return token
# def balans():
# url = "https://seller.ggsel.com/api_sellers/api/sellers/account/balance/info"
# params = {
# "token": tokens()
# }
# headers = {
# "Accept": "application/json"
# }
# response = requests.get(url, params=params, headers=headers)
# print(response.status_code)
# print(response.json())
# def create_debate(order_id):
# token = load_token()
# url = "https://seller.ggsel.com/api_sellers/api/debates/v2"
# params = {
# "token": token,
# "id_i": order_id
# }
# json = {
# "message": "Спасибо за покупку в WST Keys"
# }
# r = requests.post(url, params=params, json=json)
# return r.json()
# print(create_debate("22257737"))