from dotenv import load_dotenv import os load_dotenv() import requests partner = os.getenv('partner') user_name = os.getenv('userName') password = os.getenv('password') headers = { 'Content-Type': 'application/json', } BASE_URL = "https://partner.rokky.com/api/v1" json_data = { 'partner': partner, 'userName': user_name, 'password': password, 'scope': 'WebApi', } def tokens(): response = requests.post('https://partner.rokky.com/api/v1/tokens', headers=headers, json=json_data) # print(response.json()) data = response.json() access_token = data["accessToken"] return access_token access_token = tokens() print(partner) def get_orders(access_token, page=1, page_size=25): url = f"{BASE_URL}/orders" headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json" } payload = { "page": page, "pageSize": page_size } response = requests.get(url, headers=headers, json=payload) response.raise_for_status() print(response.json()) # return response.json() def download_products(): # url = f"https://cdn-partners.rokky.com/partner-catalogs/{user_name}_products.json.gz.enc" url = f"https://rokky-cdn.azureedge.net/partner-catalogs/{user_name}_products.json.gz.enc" response = requests.get(url) response.raise_for_status() with open("./files/products.json.gz.enc", "wb") as f: f.write(response.content) print("Каталог продуктов скачан.") # get_orders(access_token) def download_prices(): # url = f"https://cdn-partners.rokky.com/partner-catalogs/{partner}_prices.json.gz.enc" url = f"https://rokky-cdn.azureedge.net/partner-catalogs/{user_name}_prices.json.gz.enc" response = requests.get(url) response.raise_for_status() with open("./files/prices.json.gz.enc", "wb") as f: f.write(response.content) print("Каталог цен скачан.") def create_order(access_token, sku, quantity, unit_price, partner_order_id): url = f"{BASE_URL}/orders" headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json" } payload = { "items": [ { "sku": sku, "quantity": quantity, "unitPrice": unit_price } ], "partnerOrderId": partner_order_id } response = requests.post(url, headers=headers, json=payload) response.raise_for_status() return response.json() def get_orders(access_token, page=1, page_size=25): url = f"{BASE_URL}/orders" headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json" } payload = { "page": page, "pageSize": page_size } response = requests.get(url, headers=headers, json=payload) response.raise_for_status() return response.json() def get_order_content(order_id): url = f"{BASE_URL}/orders/{order_id}/content" headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json" } response = requests.get(url, headers=headers) response.raise_for_status() data = response.json() con = data['orderContentItems'] for c in con: content = c['content'] return content # content = get_order_content(5379484) # print(content) # orders = get_orders(access_token) # print(orders) # order = create_order(access_token, 22774, 1, 21.17, "my-test-order-123") # print(order) # download_prices(partner) # download_products(user_name)