wstkeys/services/rokky.py

156 lines
3.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import gzip
import json
import shutil
import sqlite3
from base64 import b64decode
from Crypto.Cipher import AES
from Crypto.Protocol.KDF import PBKDF2
from Crypto.Hash import SHA256
from Crypto.Util.Padding import unpad
from dotenv import load_dotenv
import os
load_dotenv()
password = os.getenv('password')
conn = sqlite3.connect("./files/rokky.db")
def decrypt_file(input_file, output_file, password):
with open(input_file, "rb") as f:
encrypted = f.read()
# OpenSSL salted format: Salted__ + 8 byte salt
salt = encrypted[8:16]
ciphertext = encrypted[16:]
key_iv = PBKDF2(
password,
salt,
dkLen=32 + 16,
count=10000,
hmac_hash_module=SHA256
)
key = key_iv[:32]
iv = key_iv[32:]
cipher = AES.new(key, AES.MODE_CBC, iv)
decrypted = unpad(cipher.decrypt(ciphertext), AES.block_size)
with open(output_file, "wb") as f:
f.write(decrypted)
print("Расшифровано:", output_file)
decrypt_file(
"./files/prices.json.gz.enc",
"./files/prices.json.gz",
password
)
def archive():
with gzip.open("./files/prices.json.gz", "rb") as f_in:
with open("./files/prices.json", "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
archive()
with open("./files/prices.json", "r", encoding="utf-8") as f:
products = json.load(f)
def get_title(product, lang="RU"):
for title in product["titles"]:
if title["language"] == lang:
return title["value"]
return product["name"]
def get_genres(product):
return [g["name"] for g in product["genres"]]
def save_product(conn, product):
cursor = conn.cursor()
cursor.execute("""
INSERT INTO products (
sku,
product_id,
name,
developer,
publisher,
cover,
release_date,
genres
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(sku) DO UPDATE SET
product_id=excluded.product_id,
name=excluded.name,
developer=excluded.developer,
publisher=excluded.publisher,
cover=excluded.cover,
release_date=excluded.release_date,
genres=excluded.genres
""", (
product["sku"],
product["product_id"],
product["name"],
product["developer"],
product["publisher"],
product["cover"],
product["release_date"],
",".join(product["genres"])
))
conn.commit()
def add_product():
parsed_products = []
for product in products:
parsed = {
"sku": product["sku"],
"product_id": product["product_id"],
"name": get_title(product, "RU"),
"developer": product["developer"],
"publisher": product["publisher"],
"genres": get_genres(product),
"cover": product["cover"],
"release_date": product["releaseDate"]
}
parsed_products.append(parsed)
for product in parsed_products:
save_product(conn, product)
# print(parsed_products[10])
print("Всего продуктов:", len(products))
def update_prices(conn):
with open("./files/prices.json", "r") as f:
prices = json.load(f)
cursor = conn.cursor()
data = [(item["price"], item["sku"]) for item in prices]
cursor.executemany("""
UPDATE products
SET price = ?
WHERE sku = ?
""", data)
conn.commit()
# add_product()
update_prices(conn)