wstkeys/API/rok.py

179 lines
4.7 KiB
Python

from dotenv import load_dotenv
import os
load_dotenv()
import requests
partner = os.getenv('partner')
user_name = os.getenv('userName')
password = os.getenv('password')
headers = {
'Content-Type': 'application/json',
}
BASE_URL = "https://partner.rokky.com/api/v1"
json_data = {
'partner': partner,
'userName': user_name,
'password': password,
'scope': 'WebApi',
}
def tokens():
response = requests.post('https://partner.rokky.com/api/v1/tokens', headers=headers, json=json_data)
# print(response.json())
data = response.json()
access_token = data["accessToken"]
return access_token
access_token = tokens()
def get_orders(access_token, page=1, page_size=25):
url = f"{BASE_URL}/orders"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
payload = {
"page": page,
"pageSize": page_size
}
response = requests.get(url, headers=headers, json=payload)
response.raise_for_status()
print(response.json())
# return response.json()
def download_products():
# url = f"https://cdn-partners.rokky.com/partner-catalogs/{user_name}_products.json.gz.enc"
url = f"https://rokky-cdn.azureedge.net/partner-catalogs/{user_name}_products.json.gz.enc"
response = requests.get(url)
response.raise_for_status()
with open("./files/products.json.gz.enc", "wb") as f:
f.write(response.content)
print("Каталог продуктов скачан.")
# get_orders(access_token)
def download_prices():
# url = f"https://cdn-partners.rokky.com/partner-catalogs/{partner}_prices.json.gz.enc"
url = f"https://rokky-cdn.azureedge.net/partner-catalogs/{user_name}_prices.json.gz.enc"
response = requests.get(url)
response.raise_for_status()
with open("./files/prices.json.gz.enc", "wb") as f:
f.write(response.content)
print("Каталог цен скачан.")
def create_order(sku, quantity, unit_price, partner_order_id):
# try:
url = f"{BASE_URL}/orders"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
payload = {
"items": [
{
"sku": int(sku),
"quantity": int(quantity),
"unitPrice": float(unit_price)
}
],
"partnerOrderId": partner_order_id
}
try:
response = requests.post(url, json=payload, headers=headers, timeout=30)
# 🔥 Самое важное: если статус не 2xx — печатаем детали ошибки
if response.status_code >= 400:
print(f"\n❌ Ошибка {response.status_code}")
print(f"🔍 URL: {response.url}")
print(f"📦 Тело запроса: {payload}")
print(f"📄 Ответ сервера (raw): {response.text}")
# Пробуем распарсить как JSON, если сервер возвращает структурированную ошибку
try:
error_data = response.json()
print(f"🧩 Ответ сервера (json): {error_data}")
except ValueError:
pass # Ответ не JSON, уже напечатали raw выше
response.raise_for_status() # Всё равно выбрасываем исключение
return response.json()
except requests.exceptions.RequestException as e:
print(f"🌐 Сетевая ошибка: {e}")
raise
# print(response.json())
# return response.json()
def get_orders(page=1, page_size=25):
url = f"{BASE_URL}/orders"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
payload = {
"page": page,
"pageSize": page_size
}
response = requests.get(url, headers=headers, json=payload)
response.raise_for_status()
return response.json()
def get_order_content(order_id):
url = f"{BASE_URL}/orders/{order_id}/content"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
response = requests.get(url, headers=headers)
response.raise_for_status()
data = response.json()
con = data['orderContentItems']
for c in con:
content = c['content']
return content
# content = get_order_content(5379499)
# print(content)
# orders = get_orders()
# print(orders)
# order = create_order(22774, 1, 21.3, "my-test-order-12w3")
# print(order)
# download_prices(partner)
# download_products(user_name)