linkedin2/utils/clients.py

381 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import ast
import requests
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from sqlalchemy.orm import joinedload
from sqlalchemy import delete
from sqlalchemy import update
from sqlalchemy import or_
from sqlalchemy import select, and_
from sqlalchemy.exc import IntegrityError
from model.database import Client, AppliedJob, Job
from dotenv import load_dotenv
import os
# Загружаем переменные окружения
load_dotenv()
url_up = os.getenv('UP_DOMEN')
async def get_filtered_jobs(db: AsyncSession, user_job_titles, minimum_annual_salary, salary_currency,
user_location_type, user_locations, user_levels, user_job_types): #
# Строим фильтры для каждого из параметров пользователя
filters = []
# Фильтрация по job_title (примерное совпадение)
if user_job_titles:
title_filters = [Job.job_title.ilike(f"%{title}%") for title in user_job_titles]
filters.append(or_(*title_filters))
# Фильтрация по minimum_annual_salary (если указано)
if minimum_annual_salary is not None:
filters.append(Job.minimum_annual_salary >= minimum_annual_salary)
# Фильтрация по salary_currency (если указано)
if salary_currency is not None:
filters.append(Job.salary_currency == salary_currency)
# Фильтрация по location_type (если указано)
if user_location_type:
filters.append(Job.location_type == user_location_type)
# Фильтрация по location (разделяем по каждому городу)
if user_locations:
location_filters = [Job.location.ilike(location) for location in user_locations]
filters.append(or_(*location_filters))
# Фильтрация по job_level (если указаны)
if user_levels:
filters.append(Job.job_level.in_(user_levels))
# Фильтрация по job_type (если указаны)
if user_job_types:
filters.append(Job.job_type.in_(user_job_types))
# Выполняем запрос с применением всех фильтров
query = select(Job).filter(*filters)
# Выполняем асинхронный запрос
result = await db.execute(query)
# Получаем все результаты
jobs = result.scalars().all()
return jobs
async def upsert_client(db: AsyncSession, user_id: str, user_login: str, user_nicename: str, user_email: str, phone: str, json_data: str):
# Проверяем, существует ли клиент с таким логином или email
async with db.begin():
result = await db.execute(
select(Client).filter((Client.id == user_id))# | (Client.user_email == user_email))
)
client = result.scalars().first() # Получаем первый результат или None
if client:
# Если клиент существует, обновляем его данные
client.user_nicename = user_login
client.phone = phone
client.json_data = json_data
# Можно добавить другие поля для обновления
try:
await db.commit() # Применяем изменения в базе данных
await db.refresh(client) # Обновляем объект в Python
return client
except IntegrityError:
await db.rollback() # В случае ошибки откатываем изменения
raise
else:
# Если клиента нет, создаем нового
new_client = Client(
user_login=user_login,
user_nicename=user_nicename,
user_email=user_email,
json_data=json_data,
)
db.add(new_client)
try:
await db.commit() # Сохраняем нового клиента в базе данных
await db.refresh(new_client) # Обновляем объект в Python
return new_client
except IntegrityError:
await db.rollback() # В случае ошибки откатываем изменения
raise
async def del_jobs(db: AsyncSession, user_id: str):
jobs = await db.execute(
select(AppliedJob).where(
AppliedJob.client_id == user_id,
or_(AppliedJob.status == "Scheduled", AppliedJob.status.is_(None))
)
)
jobs = jobs.scalars().all()
for job in jobs:
await db.delete(job)
await db.commit()
# async def add_jobs(db: AsyncSession, user_id: str):
# # Фильтруем вакансии по переданным параметрам
# query = select(Job).filter(Job.active == 3)
# result = await db.execute(query)
# jobs = result.scalars().all() # Получаем список вакансий
# if not jobs:
# return {"message": "Нет вакансий по данному фильтру"}
# # Создаём записи в AppliedJob
# applied_jobs = [
# AppliedJob(client_id=user_id, job_id=job.job_id, status="Scheduled")
# for job in jobs
# ]
# db.add_all(applied_jobs) # Добавляем в сессию
# await db.commit() # Фиксируем изменения
# return {"message": f"{len(applied_jobs)} вакансий добавлено в AppliedJob"}
async def add_jobs(db: AsyncSession, user_id: str):
# Получаем все активные вакансии (предполагаю, что ты хотел использовать True, а не 3)
query = select(Job).filter(Job.active == 3)
result = await db.execute(query)
jobs = result.scalars().all()
if not jobs:
return {"message": "Нет вакансий по данному фильтру"}
applied_jobs = []
for job in jobs:
# Проверка, существует ли уже AppliedJob с таким client_id и job_id
check_query = select(AppliedJob).where(
and_(
AppliedJob.client_id == user_id,
AppliedJob.job_id == job.job_id
)
)
check_result = await db.execute(check_query)
existing = check_result.scalar_one_or_none()
if not existing:
applied_jobs.append(
AppliedJob(client_id=user_id, job_id=job.job_id, status="Scheduled")
)
if applied_jobs:
db.add_all(applied_jobs)
await db.commit()
print (f'{"message": f"{len(applied_jobs)} вакансий добавлено в AppliedJob"}')
else:
print (f'{"message": "Все вакансии уже были добавлены ранее"}')
async def get_applied_jobs(db, client_id: int):
query = (
select(AppliedJob)
.options(joinedload(AppliedJob.job)) # Подгружаем данные о вакансии
.where(AppliedJob.client_id == client_id)
)
result = await db.execute(query)
applied_jobs = result.scalars().all()
if not applied_jobs:
return {"message": "Нет откликов на вакансии"}
jobs_list = []
for applied in applied_jobs:
job = applied.job # Получаем объект Job из отношения AppliedJob.job
jobs_list.append({
"id": job.job_id,
"title": job.job_title,
"company": job.job_company,
"postedDate": job.date_posted,
"location": job.location,
"jobType": job.job_type,
"salary": f"${job.minimum_annual_salary}K" if job.minimum_annual_salary else "Not specified",
"level": job.job_level,
"status": applied.status if applied.status else "Scheduled",
"requiredSkills": "",
"aboutJob": job.about,
"jobLink": job.link
})
print(jobs_list)
return jobs_list
async def client_list(client_id: int, db):
result = await db.execute(
select(Client).filter((Client.id == client_id))# | (Client.user_email == user_email))
)
clients = result.scalars().first()
clients_str= clients.json_data
# Если клиент найден
if clients:
client_dict = ast.literal_eval(clients_str)
# result = {client_id: client_dict}
print(f"===============================!!!!!{type(client_dict)}")
return client_dict
else:
# Если клиент не найден, возвращаем None или обрабатываем ошибку
# return None
print("NOTTTTTTTTTTTTTTTTTTTTTTTTTTTTT")
async def get_avtopilot(db, data):
try:
client_id = data.user_id # Используем client_id вместо user_id для согласованности
avtopilotss = data.avtopilot
if avtopilotss is True:
# Для асинхронного обновления используем update().where().values()
stmt = (
update(AppliedJob)
.where(
(AppliedJob.client_id == client_id) &
(AppliedJob.status == "Paused")
)
.values(status="Scheduled")
)
await db.execute(stmt)
await db.commit()
return True
elif avtopilotss is False:
stmt = (
update(AppliedJob)
.where(
(AppliedJob.client_id == client_id) &
(AppliedJob.status == "Scheduled")
)
.values(status="Paused")
)
await db.execute(stmt)
await db.commit()
return True
else:
print("error")
return False
except Exception as e:
print(f"Ошибка при обработке данных: {e}")
await db.rollback()
raise HTTPException(status_code=400, detail="Error processing data")
async def get_delite(db, data):
try:
client_id = data.user_id # Обратите внимание на точку вместо квадратных скобок
job_id = data.job_id # Так как data - это объект Pydantic модели
print(f"Полученные данные: user_id={client_id}, avtopilotss={job_id}")
stmt = delete(AppliedJob).where(
(AppliedJob.client_id == client_id) &
(AppliedJob.job_id == job_id)
)
# Выполняем запрос с await
result = await db.execute(stmt)
await db.commit()
# Получаем количество удаленных строк
deleted_count = result.rowcount
print(f"Удалено записей: {deleted_count}")
except Exception as e:
print(f"Ошибка при обработке данных: {e}")
raise HTTPException(status_code=400, detail="Error processing data")
async def get_update(db: AsyncSession, data: update):
user_id = data.user_id
up_client = requests.get(f"{url_up}{user_id}") # Лучше заменить на httpx.AsyncClient
if up_client.status_code == 200:
datas = up_client.json()
# Проверяем, что 'json_data' есть в ответе
if 'json_data' not in datas:
raise HTTPException(status_code=400, detail="Missing 'json_data' in response")
json_data = datas['json_data']
# Если json_data — строка, парсим её в словарь
if isinstance(json_data, str):
try:
data_dict = json.loads(json_data)
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid JSON in 'json_data'")
else:
data_dict = json_data # Если уже словарь, используем как есть
# Проверяем, что data_dict — словарь
if not isinstance(data_dict, dict):
raise HTTPException(status_code=400, detail="'json_data' is not a valid dictionary")
try:
first_name = data_dict['first_name']
last_name = data_dict['last_name']
email_addr = data_dict['email_addr']
user_id = data_dict['user_id']
phone_num = data_dict['phone_num']
print(data_dict)
# Обновляем или создаём клиента
async with db.begin():
client = await db.get(Client, user_id) # Ищем клиента по ID
if client:
# Обновляем существующего клиента
client.user_nicename = first_name # Исправлено: было user_login
client.phone = phone_num
client.json_data = str(data_dict)
else:
# Создаём нового клиента
client = Client(
id=user_id,
user_login=email_addr, # Или другой логин
user_nicename=f"{first_name} {last_name}",
user_email=email_addr,
phone=phone_num,
json_data=str(data_dict),
)
db.add(client)
await db.commit()
return client
except KeyError as e:
raise HTTPException(status_code=400, detail=f"Missing required field: {e}")
else:
raise HTTPException(status_code=up_client.status_code, detail="Failed to fetch data")