wstkeys/services/gg.py

258 lines
5.4 KiB
Python

import requests
import time
import json
import hashlib
from dotenv import load_dotenv
import os
load_dotenv()
TOKEN = os.getenv('GGTOKEN')
SELLER_ID = os.getenv('SELLER_ID')
API_KEY = os.getenv('GGTOKEN')
TOKEN_FILE = "./files/ggsel_token.json"
def save_token(token):
with open(TOKEN_FILE, "w") as f:
json.dump({"token": token}, f)
def load_token():
if not os.path.exists(TOKEN_FILE):
return None
with open(TOKEN_FILE) as f:
try:
data = json.load(f)
return data.get("token")
except json.JSONDecodeError:
return None
def safe_json(response):
if not response.text:
raise Exception("Empty response from API")
try:
return response.json()
except json.JSONDecodeError:
print("Invalid JSON response:")
print(response.text)
raise
def get_token():
timestamp = str(int(time.time()))
sign_string = API_KEY + timestamp
sign = hashlib.sha256(sign_string.encode()).hexdigest()
url = "https://seller.ggsel.com/api_sellers/api/apilogin"
payload = {
"seller_id": SELLER_ID,
"timestamp": timestamp,
"sign": sign
}
headers = {
"Content-Type": "application/json",
"Accept": "application/json"
}
response = requests.post(url, json=payload, headers=headers)
print(response.status_code)
res = response.json()
token = res['token']
save_token(token)
print(token)
return token
def balans():
token = load_token()
if not token:
token = get_token()
url = "https://seller.ggsel.com/api_sellers/api/sellers/account/balance/info"
headers = {"Accept": "application/json"}
params = {"token": token}
r = requests.get(url, params=params, headers=headers)
print(r.status_code)
# если токен умер
if r.status_code in (401, 403):
token = get_token()
params["token"] = token
r = requests.get(url, params=params, headers=headers)
return r.json()
def last_sales(top=10, group=False):
token = load_token()
if not token:
token = get_token()
url = "https://seller.ggsel.com/api_sellers/api/seller-last-sales"
headers = {
"Accept": "application/json",
"locale": "ru"
}
params = {
"token": token,
"seller_id": SELLER_ID,
"top": top,
"group": str(group).lower()
}
r = requests.get(url, params=params, headers=headers)
print(r.status_code)
# если токен умер
if r.status_code in (401, 403):
token = get_token()
params["token"] = token
r = requests.get(url, params=params, headers=headers)
return r.json()
# print(last_sales())
def get_messages(id_i, id_from=None, id_to=None, newer=None, count=50):
token = load_token()
if not token:
token = get_token()
url = "https://seller.ggsel.com/api_sellers/api/debates/v2"
headers = {
"Accept": "application/json"
}
params = {
"token": token,
"id_i": id_i,
"count": min(count, 100)
}
if id_from:
params["id_from"] = id_from
if id_to:
params["id_to"] = id_to
if newer:
params["newer"] = newer
r = requests.get(url, params=params, headers=headers)
print(r.status_code)
# если токен умер
if r.status_code in (401, 403):
token = get_token()
params["token"] = token
r = requests.get(url, params=params, headers=headers)
return r.json()
# пример
print(get_messages(id_i=18840912))
def get_chats(page=1, pagesize=20, filter_new=None, email=None, id_ds=None):
token = load_token()
if not token:
token = get_token()
url = "https://seller.ggsel.com/api_sellers/api/debates/v2/chats"
headers = {
"Accept": "application/json"
}
params = {
"token": token,
"page": page,
"pagesize": pagesize
}
if filter_new is not None:
params["filter_new"] = filter_new
if email:
params["email"] = email
if id_ds:
params["id_ds"] = id_ds
r = requests.get(url, params=params, headers=headers)
print(r.status_code)
# если токен умер
if r.status_code in (401, 403):
token = get_token()
params["token"] = token
r = requests.get(url, params=params, headers=headers)
return r.json()
# пример
# print(get_chats())
# def tokens():
# timestamp = str(int(time.time()))
# sign_string = API_KEY + timestamp
# sign = hashlib.sha256(sign_string.encode()).hexdigest()
# url = "https://seller.ggsel.com/api_sellers/api/apilogin"
# payload = {
# "seller_id": SELLER_ID,
# "timestamp": timestamp,
# "sign": sign
# }
# headers = {
# "Content-Type": "application/json",
# "Accept": "application/json"
# }
# response = requests.post(url, json=payload, headers=headers)
# print(response.status_code)
# res = response.json()
# token = res['token']
# return token
# def balans():
# url = "https://seller.ggsel.com/api_sellers/api/sellers/account/balance/info"
# params = {
# "token": tokens()
# }
# headers = {
# "Accept": "application/json"
# }
# response = requests.get(url, params=params, headers=headers)
# print(response.status_code)
# print(response.json())