Обновить utils/clients.py

This commit is contained in:
Alex55 2025-04-24 18:53:22 +03:00
parent bf84ee2820
commit 8d0cfdbe17
1 changed files with 460 additions and 389 deletions

View File

@ -1,389 +1,460 @@
import json import json
import ast import ast
import requests import requests
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select from sqlalchemy.future import select
from sqlalchemy.orm import joinedload from sqlalchemy.orm import joinedload
from sqlalchemy import delete from sqlalchemy import delete
from sqlalchemy import update from sqlalchemy import update
from sqlalchemy import or_ from sqlalchemy import or_
from sqlalchemy import select, and_ from sqlalchemy import select, and_
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
from model.database import Client, AppliedJob, Job from model.database import Client, AppliedJob, Job
from dotenv import load_dotenv from sqlalchemy import func, or_
import os import urllib.parse
from dotenv import load_dotenv
# Загружаем переменные окружения import os
load_dotenv()
# Загружаем переменные окружения
load_dotenv()
url_up = os.getenv('UP_DOMEN')
url_up = os.getenv('UP_DOMEN')
async def get_filtered_jobs(db: AsyncSession, user_job_titles, minimum_annual_salary, salary_currency,
user_location_type, user_locations, user_levels, user_job_types): # async def get_filtered_jobs(db: AsyncSession, user_job_titles, minimum_annual_salary, salary_currency,
# Строим фильтры для каждого из параметров пользователя user_location_type, user_locations, user_levels, user_job_types):
filters = [] title_filters = []
location_filters = []
# Фильтрация по job_title (примерное совпадение)
if user_job_titles: # Очистка и нормализация входных данных
title_filters = [Job.job_title.ilike(f"%{title}%") for title in user_job_titles] def clean_text(text):
filters.append(or_(*title_filters)) return urllib.parse.unquote(text).strip().lower()
# Фильтрация по minimum_annual_salary (если указано) # 2. Фильтр по должностям (OR-группа)
if minimum_annual_salary is not None: if user_job_titles:
filters.append(Job.minimum_annual_salary >= minimum_annual_salary) seen_titles = set() # Для избежания дубликатов
for title in user_job_titles:
# Фильтрация по salary_currency (если указано) clean_title = clean_text(title)
if salary_currency is not None: if clean_title in seen_titles:
filters.append(Job.salary_currency == salary_currency) continue
seen_titles.add(clean_title)
# Фильтрация по location_type (если указано)
if user_location_type: words = clean_title.split()
filters.append(Job.location_type == user_location_type) if len(words) > 1:
# Для составных названий ищем все слова через AND
# Фильтрация по location (разделяем по каждому городу) word_filters = [func.lower(Job.job_title).contains(word) for word in words]
if user_locations: title_filters.append(and_(*word_filters))
location_filters = [Job.location.ilike(location) for location in user_locations] else:
filters.append(or_(*location_filters)) title_filters.append(func.lower(Job.job_title).contains(clean_title))
# Фильтрация по job_level (если указаны) # 3. Фильтр по локациям (OR-группа)
if user_levels: if user_locations:
filters.append(Job.job_level.in_(user_levels)) seen_locations = set()
for location in user_locations:
# Фильтрация по job_type (если указаны) clean_loc = clean_text(location)
if user_job_types: if clean_loc in seen_locations:
filters.append(Job.job_type.in_(user_job_types)) continue
seen_locations.add(clean_loc)
# Выполняем запрос с применением всех фильтров
query = select(Job).filter(*filters) location_filters.append(func.lower(Job.location).contains(clean_loc))
# Выполняем асинхронный запрос # Обработка составных локаций (City, Country)
result = await db.execute(query) if ',' in clean_loc:
main_part = clean_loc.split(',')[0].strip()
# Получаем все результаты location_filters.append(func.lower(Job.location).contains(main_part))
jobs = result.scalars().all()
# 4. Комбинирование фильтров
return jobs final_filters = []
if title_filters:
async def upsert_client(db: AsyncSession, user_id: str, user_login: str, user_nicename: str, user_email: str, phone: str, json_data: str): final_filters.append(or_(*title_filters))
# Проверяем, существует ли клиент с таким логином или email if location_filters:
async with db.begin(): final_filters.append(or_(*location_filters))
result = await db.execute(
select(Client).filter((Client.id == user_id))# | (Client.user_email == user_email)) if not final_filters:
) return []
client = result.scalars().first() # Получаем первый результат или None
# 5. Формирование и выполнение запроса
if client: query = select(Job).filter(*final_filters)
# Если клиент существует, обновляем его данные
client.user_nicename = user_login # Для отладки
client.phone = phone print("Final query:", str(query))
client.json_data = json_data
# Можно добавить другие поля для обновления result = await db.execute(query)
jobs = result.scalars().all()
try:
await db.commit() # Применяем изменения в базе данных print(f"Found {len(jobs)} matching jobs")
await db.refresh(client) # Обновляем объект в Python for job in jobs:
return client print(f"Match: {job.job_title} | {job.location}")
except IntegrityError:
await db.rollback() # В случае ошибки откатываем изменения return jobs
raise
else:
# Если клиента нет, создаем нового
new_client = Client( async def upsert_client(db: AsyncSession, user_id: str, user_login: str, user_nicename: str, user_email: str, phone: str, json_data: str):
user_login=user_login, # Проверяем, существует ли клиент с таким логином или email
user_nicename=user_nicename, async with db.begin():
user_email=user_email, result = await db.execute(
json_data=json_data, select(Client).filter((Client.id == user_id))# | (Client.user_email == user_email))
) )
db.add(new_client) client = result.scalars().first() # Получаем первый результат или None
try:
await db.commit() # Сохраняем нового клиента в базе данных if client:
await db.refresh(new_client) # Обновляем объект в Python # Если клиент существует, обновляем его данные
return new_client client.user_nicename = user_login
except IntegrityError: client.phone = phone
await db.rollback() # В случае ошибки откатываем изменения client.json_data = json_data
raise # Можно добавить другие поля для обновления
try:
await db.commit() # Применяем изменения в базе данных
await db.refresh(client) # Обновляем объект в Python
async def del_jobs(db: AsyncSession, user_id: str): return client
jobs = await db.execute( except IntegrityError:
select(AppliedJob).where( await db.rollback() # В случае ошибки откатываем изменения
AppliedJob.client_id == user_id, raise
or_( else:
AppliedJob.status.in_(["Scheduled", "Backlogged"]), # Если клиента нет, создаем нового
AppliedJob.status.is_(None) new_client = Client(
) user_login=user_login,
) user_nicename=user_nicename,
) user_email=user_email,
jobs = jobs.scalars().all() json_data=json_data,
)
for job in jobs: db.add(new_client)
await db.delete(job) try:
await db.commit() # Сохраняем нового клиента в базе данных
await db.commit() await db.refresh(new_client) # Обновляем объект в Python
return new_client
except IntegrityError:
# async def add_jobs(db: AsyncSession, user_id: str): await db.rollback() # В случае ошибки откатываем изменения
# # Фильтруем вакансии по переданным параметрам raise
# query = select(Job).filter(Job.active == 3)
# result = await db.execute(query)
# jobs = result.scalars().all() # Получаем список вакансий
# if not jobs: async def del_jobs(db: AsyncSession, user_id: str):
# return {"message": "Нет вакансий по данному фильтру"} jobs = await db.execute(
select(AppliedJob).where(
# # Создаём записи в AppliedJob AppliedJob.client_id == user_id,
# applied_jobs = [ or_(
# AppliedJob(client_id=user_id, job_id=job.job_id, status="Scheduled") AppliedJob.status.in_(["Scheduled", "Backlogged"]),
# for job in jobs AppliedJob.status.is_(None)
# ] )
)
# db.add_all(applied_jobs) # Добавляем в сессию )
# await db.commit() # Фиксируем изменения jobs = jobs.scalars().all()
for job in jobs:
await db.delete(job)
# return {"message": f"{len(applied_jobs)} вакансий добавлено в AppliedJob"}
await db.commit()
async def add_jobs(db: AsyncSession, user_id: str):
# Получаем все активные вакансии (предполагаю, что ты хотел использовать True, а не 3) # async def add_jobs(db: AsyncSession, user_id: str):
query = select(Job).where(Job.active == 3) # # Фильтруем вакансии по переданным параметрам
result = await db.execute(query) # query = select(Job).filter(Job.active == 3)
jobs = result.scalars().all() # result = await db.execute(query)
# jobs = result.scalars().all() # Получаем список вакансий
if not jobs:
return {"message": "Нет вакансий по данному фильтру"} # if not jobs:
# return {"message": "Нет вакансий по данному фильтру"}
applied_jobs = []
scheduled_count = 0 # # Создаём записи в AppliedJob
# applied_jobs = [
for job in jobs: # AppliedJob(client_id=user_id, job_id=job.job_id, status="Scheduled")
# Проверяем, существует ли уже такая пара client_id + job_id # for job in jobs
check_query = select(AppliedJob).where( # ]
and_(
AppliedJob.client_id == user_id, # db.add_all(applied_jobs) # Добавляем в сессию
AppliedJob.job_id == job.job_id # await db.commit() # Фиксируем изменения
)
)
check_result = await db.execute(check_query)
existing = check_result.scalar_one_or_none() # return {"message": f"{len(applied_jobs)} вакансий добавлено в AppliedJob"}
if not existing:
# Первые 5 добавим со статусом Scheduled, остальные Backlogged async def add_jobs(db: AsyncSession, user_id: str, up):
status = "Scheduled" if scheduled_count < 5 else "Backlogged" print(up)
applied_jobs.append(
AppliedJob(client_id=user_id, job_id=job.job_id, status=status) try:
) user_data = up
if status == "Scheduled": print(type(user_data))
scheduled_count += 1 job_preferences = user_data.get('job_preferences', {})
if applied_jobs: job_titles = job_preferences.get('job_titles', [])
db.add_all(applied_jobs) locations = job_preferences.get('locations', [])
await db.commit()
return {"message": f"{len(applied_jobs)} вакансий добавлено в AppliedJob (Scheduled: {scheduled_count})"} location_types = [k for k, v in job_preferences.get('location_types', {}).items() if v is True]
else: job_levels = [k for k, v in job_preferences.get('job_levels', {}).items() if v is True]
return {"message": "Все вакансии уже были добавлены ранее"} job_types = [k for k, v in job_preferences.get('job_types', {}).items() if v is True]
print("Job Titles:", job_titles)
async def get_applied_jobs(db, client_id: int): print("Locations:", locations)
query = ( print("Location Types (True only):", location_types)
select(AppliedJob) print("Job Levels (True only):", job_levels)
.options(joinedload(AppliedJob.job)) # Подгружаем данные о вакансии print("Job Types (True only):", job_types)
.where(AppliedJob.client_id == client_id)
) except json.JSONDecodeError as e:
print("Ошибка декодирования JSON:", e)
result = await db.execute(query)
applied_jobs = result.scalars().all() jobs = await get_filtered_jobs(
db,
if not applied_jobs: user_job_titles=job_titles,
return {"message": "Нет откликов на вакансии"} minimum_annual_salary=None,
salary_currency=None,
jobs_list = [] user_location_type=None,
user_locations=locations,
for applied in applied_jobs: user_levels=None,
job = applied.job # Получаем объект Job из отношения AppliedJob.job user_job_types=None
)
jobs_list.append({
"id": job.job_id, for job in jobs:
"title": job.job_title, print(job.job_title, job.location, job.job_level, job.job_type)
"company": job.job_company, # # Получаем все активные вакансии (предполагаю, что ты хотел использовать True, а не 3)
"postedDate": job.date_posted, # # query = select(Job).where(Job.active == 3)
"AppliedDate": applied.applied_on, # # result = await db.execute(query)
"location": job.location, # # jobs = result.scalars().all()
"jobType": job.job_type,
"salary": f"${job.minimum_annual_salary}K" if job.minimum_annual_salary else "Not specified", if not jobs:
"level": job.job_level, return {"message": "Нет вакансий по данному фильтру"}
"status": applied.status if applied.status else "Scheduled",
"requiredSkills": "", scheduled_count = 0
"aboutJob": job.about, added_count = 0
"jobLink": job.link
}) for job in jobs:
print(jobs_list) # Проверяем, существует ли уже такая пара client_id + job_id
return jobs_list check_query = select(AppliedJob).where(
and_(
AppliedJob.client_id == user_id,
AppliedJob.job_id == job.job_id
async def client_list(client_id: int, db): )
result = await db.execute( )
select(Client).filter((Client.id == client_id))# | (Client.user_email == user_email)) check_result = await db.execute(check_query)
) existing = check_result.scalar_one_or_none()
clients = result.scalars().first()
clients_str= clients.json_data if not existing:
# Если клиент найден status = "Scheduled" if scheduled_count < 5 else "Backlogged"
if clients: new_applied = AppliedJob(
client_dict = ast.literal_eval(clients_str) client_id=user_id,
# result = {client_id: client_dict} job_id=job.job_id,
print(f"===============================!!!!!{type(client_dict)}") status=status
)
return client_dict db.add(new_applied)
else: await db.commit() # коммитим каждую запись сразу
# Если клиент не найден, возвращаем None или обрабатываем ошибку scheduled_count += 1 if status == "Scheduled" else 0
# return None added_count += 1
print("NOTTTTTTTTTTTTTTTTTTTTTTTTTTTTT")
if added_count > 0:
return {
"message": f"{added_count} вакансий добавлено в AppliedJob (Scheduled: {scheduled_count}, Backlogged: {added_count - scheduled_count})"
async def get_avtopilot(db, data): }
try: else:
client_id = data.user_id # Используем client_id вместо user_id для согласованности return {"message": "Все вакансии уже были добавлены ранее"}
avtopilotss = data.avtopilot
if avtopilotss is True: async def get_applied_jobs(db, client_id: int):
# Для асинхронного обновления используем update().where().values() query = (
stmt = ( select(AppliedJob)
update(AppliedJob) .options(joinedload(AppliedJob.job)) # Подгружаем данные о вакансии
.where( .where(AppliedJob.client_id == client_id)
(AppliedJob.client_id == client_id) & )
(AppliedJob.status == "Paused")
) result = await db.execute(query)
.values(status="Scheduled") applied_jobs = result.scalars().all()
)
await db.execute(stmt) if not applied_jobs:
await db.commit() return {"message": "Нет откликов на вакансии"}
return True
jobs_list = []
elif avtopilotss is False:
stmt = ( for applied in applied_jobs:
update(AppliedJob) job = applied.job # Получаем объект Job из отношения AppliedJob.job
.where(
(AppliedJob.client_id == client_id) & jobs_list.append({
(AppliedJob.status == "Scheduled") "id": job.job_id,
) "title": job.job_title,
.values(status="Paused") "company": job.job_company,
) "postedDate": job.date_posted,
await db.execute(stmt) "AppliedDate": applied.applied_on,
await db.commit() "location": job.location,
return True "jobType": job.job_type,
"salary": f"${job.minimum_annual_salary}K" if job.minimum_annual_salary else "Not specified",
else: "level": job.job_level,
print("error") "status": applied.status if applied.status else "Scheduled",
return False "requiredSkills": "",
"aboutJob": job.about,
except Exception as e: "jobLink": job.link
print(f"Ошибка при обработке данных: {e}") })
await db.rollback() print(jobs_list)
raise HTTPException(status_code=400, detail="Error processing data") return jobs_list
async def get_delite(db, data):
try: async def client_list(client_id: int, db):
client_id = data.user_id # Обратите внимание на точку вместо квадратных скобок result = await db.execute(
job_id = data.job_id # Так как data - это объект Pydantic модели select(Client).filter((Client.id == client_id))# | (Client.user_email == user_email))
)
print(f"Полученные данные: user_id={client_id}, avtopilotss={job_id}") clients = result.scalars().first()
stmt = delete(AppliedJob).where( clients_str= clients.json_data
(AppliedJob.client_id == client_id) & # Если клиент найден
(AppliedJob.job_id == job_id) if clients:
) client_dict = ast.literal_eval(clients_str)
# result = {client_id: client_dict}
# Выполняем запрос с await print(f"===============================!!!!!{type(client_dict)}")
result = await db.execute(stmt)
await db.commit() return client_dict
else:
# Получаем количество удаленных строк # Если клиент не найден, возвращаем None или обрабатываем ошибку
deleted_count = result.rowcount # return None
print(f"Удалено записей: {deleted_count}") print("NOTTTTTTTTTTTTTTTTTTTTTTTTTTTTT")
except Exception as e:
print(f"Ошибка при обработке данных: {e}") async def get_avtopilot(db, data):
raise HTTPException(status_code=400, detail="Error processing data") try:
client_id = data.user_id # Используем client_id вместо user_id для согласованности
avtopilotss = data.avtopilot
if avtopilotss is True:
# Для асинхронного обновления используем update().where().values()
async def get_update(db: AsyncSession, data: update): stmt = (
user_id = data.user_id update(AppliedJob)
up_client = requests.get(f"{url_up}{user_id}") # Лучше заменить на httpx.AsyncClient .where(
(AppliedJob.client_id == client_id) &
if up_client.status_code == 200: (AppliedJob.status == "Paused")
datas = up_client.json() )
.values(status="Scheduled")
# Проверяем, что 'json_data' есть в ответе )
if 'json_data' not in datas: await db.execute(stmt)
raise HTTPException(status_code=400, detail="Missing 'json_data' in response") await db.commit()
return True
json_data = datas['json_data']
elif avtopilotss is False:
# Если json_data — строка, парсим её в словарь stmt = (
if isinstance(json_data, str): update(AppliedJob)
try: .where(
data_dict = json.loads(json_data) (AppliedJob.client_id == client_id) &
except json.JSONDecodeError: (AppliedJob.status == "Scheduled")
raise HTTPException(status_code=400, detail="Invalid JSON in 'json_data'") )
else: .values(status="Paused")
data_dict = json_data # Если уже словарь, используем как есть )
await db.execute(stmt)
# Проверяем, что data_dict — словарь await db.commit()
if not isinstance(data_dict, dict): return True
raise HTTPException(status_code=400, detail="'json_data' is not a valid dictionary")
else:
try: print("error")
first_name = data_dict['first_name'] return False
last_name = data_dict['last_name']
email_addr = data_dict['email_addr'] except Exception as e:
user_id = data_dict['user_id'] print(f"Ошибка при обработке данных: {e}")
phone_num = data_dict['phone_num'] await db.rollback()
print(data_dict) raise HTTPException(status_code=400, detail="Error processing data")
# Обновляем или создаём клиента
async with db.begin(): async def get_delite(db, data):
client = await db.get(Client, user_id) # Ищем клиента по ID try:
client_id = data.user_id # Обратите внимание на точку вместо квадратных скобок
if client: job_id = data.job_id # Так как data - это объект Pydantic модели
# Обновляем существующего клиента
client.user_nicename = first_name # Исправлено: было user_login print(f"Полученные данные: user_id={client_id}, avtopilotss={job_id}")
client.phone = phone_num stmt = delete(AppliedJob).where(
client.json_data = str(data_dict) (AppliedJob.client_id == client_id) &
else: (AppliedJob.job_id == job_id)
# Создаём нового клиента )
client = Client(
id=user_id, # Выполняем запрос с await
user_login=email_addr, # Или другой логин result = await db.execute(stmt)
user_nicename=f"{first_name} {last_name}", await db.commit()
user_email=email_addr,
phone=phone_num, # Получаем количество удаленных строк
json_data=str(data_dict), deleted_count = result.rowcount
) print(f"Удалено записей: {deleted_count}")
db.add(client)
except Exception as e:
await db.commit() print(f"Ошибка при обработке данных: {e}")
return client raise HTTPException(status_code=400, detail="Error processing data")
except KeyError as e:
raise HTTPException(status_code=400, detail=f"Missing required field: {e}")
else:
raise HTTPException(status_code=up_client.status_code, detail="Failed to fetch data")
async def get_update(db: AsyncSession, data: update):
user_id = data.user_id
print(data)
up_client = requests.get(f"{url_up}{user_id}") # Лучше заменить на httpx.AsyncClient
print("22222")
if up_client.status_code == 200:
datas = up_client.json()
# Проверяем, что 'json_data' есть в ответе
if 'json_data' not in datas:
raise HTTPException(status_code=400, detail="Missing 'json_data' in response")
json_data = datas['json_data']
# Если json_data — строка, парсим её в словарь
if isinstance(json_data, str):
try:
data_dict = json.loads(json_data)
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid JSON in 'json_data'")
else:
data_dict = json_data # Если уже словарь, используем как есть
# Проверяем, что data_dict — словарь
if not isinstance(data_dict, dict):
raise HTTPException(status_code=400, detail="'json_data' is not a valid dictionary")
try:
first_name = data_dict['first_name']
last_name = data_dict['last_name']
email_addr = data_dict['email_addr']
user_id = data_dict['user_id']
phone_num = data_dict['phone_num']
# print(data_dict)
# Обновляем или создаём клиента
async with db.begin():
client = await db.get(Client, user_id) # Ищем клиента по ID
if client:
# Обновляем существующего клиента
client.user_nicename = first_name # Исправлено: было user_login
client.phone = phone_num
client.json_data = str(data_dict)
else:
# Создаём нового клиента
client = Client(
id=user_id,
user_login=email_addr, # Или другой логин
user_nicename=f"{first_name} {last_name}",
user_email=email_addr,
phone=phone_num,
json_data=str(data_dict),
)
db.add(client)
await db.commit()
return client
except KeyError as e:
raise HTTPException(status_code=400, detail=f"Missing required field: {e}")
else:
raise HTTPException(status_code=up_client.status_code, detail="Failed to fetch data")
async def get_users(db: AsyncSession, data: update):
user_id = data.user_id
client = await db.get(Client, user_id)
return
print(client.json_data)