diff --git a/utils/clients.py b/utils/clients.py index 50bd76a..dd6aa18 100644 --- a/utils/clients.py +++ b/utils/clients.py @@ -1,389 +1,460 @@ -import json -import ast -import requests - -from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.future import select -from sqlalchemy.orm import joinedload -from sqlalchemy import delete -from sqlalchemy import update -from sqlalchemy import or_ -from sqlalchemy import select, and_ -from sqlalchemy.exc import IntegrityError -from model.database import Client, AppliedJob, Job - -from dotenv import load_dotenv -import os - -# Загружаем переменные окружения -load_dotenv() - - -url_up = os.getenv('UP_DOMEN') - - - - -async def get_filtered_jobs(db: AsyncSession, user_job_titles, minimum_annual_salary, salary_currency, - user_location_type, user_locations, user_levels, user_job_types): # - # Строим фильтры для каждого из параметров пользователя - filters = [] - - # Фильтрация по job_title (примерное совпадение) - if user_job_titles: - title_filters = [Job.job_title.ilike(f"%{title}%") for title in user_job_titles] - filters.append(or_(*title_filters)) - - # Фильтрация по minimum_annual_salary (если указано) - if minimum_annual_salary is not None: - filters.append(Job.minimum_annual_salary >= minimum_annual_salary) - - # Фильтрация по salary_currency (если указано) - if salary_currency is not None: - filters.append(Job.salary_currency == salary_currency) - - # Фильтрация по location_type (если указано) - if user_location_type: - filters.append(Job.location_type == user_location_type) - - # Фильтрация по location (разделяем по каждому городу) - if user_locations: - location_filters = [Job.location.ilike(location) for location in user_locations] - filters.append(or_(*location_filters)) - - # Фильтрация по job_level (если указаны) - if user_levels: - filters.append(Job.job_level.in_(user_levels)) - - # Фильтрация по job_type (если указаны) - if user_job_types: - filters.append(Job.job_type.in_(user_job_types)) - - # Выполняем запрос с применением всех фильтров - query = select(Job).filter(*filters) - - # Выполняем асинхронный запрос - result = await db.execute(query) - - # Получаем все результаты - jobs = result.scalars().all() - - return jobs - -async def upsert_client(db: AsyncSession, user_id: str, user_login: str, user_nicename: str, user_email: str, phone: str, json_data: str): - # Проверяем, существует ли клиент с таким логином или email - async with db.begin(): - result = await db.execute( - select(Client).filter((Client.id == user_id))# | (Client.user_email == user_email)) - ) - client = result.scalars().first() # Получаем первый результат или None - - if client: - # Если клиент существует, обновляем его данные - client.user_nicename = user_login - client.phone = phone - client.json_data = json_data - # Можно добавить другие поля для обновления - - try: - await db.commit() # Применяем изменения в базе данных - await db.refresh(client) # Обновляем объект в Python - return client - except IntegrityError: - await db.rollback() # В случае ошибки откатываем изменения - raise - else: - # Если клиента нет, создаем нового - new_client = Client( - user_login=user_login, - user_nicename=user_nicename, - user_email=user_email, - json_data=json_data, - ) - db.add(new_client) - try: - await db.commit() # Сохраняем нового клиента в базе данных - await db.refresh(new_client) # Обновляем объект в Python - return new_client - except IntegrityError: - await db.rollback() # В случае ошибки откатываем изменения - raise - - - - -async def del_jobs(db: AsyncSession, user_id: str): - jobs = await db.execute( - select(AppliedJob).where( - AppliedJob.client_id == user_id, - or_( - AppliedJob.status.in_(["Scheduled", "Backlogged"]), - AppliedJob.status.is_(None) - ) - ) - ) - jobs = jobs.scalars().all() - - for job in jobs: - await db.delete(job) - - await db.commit() - - -# async def add_jobs(db: AsyncSession, user_id: str): -# # Фильтруем вакансии по переданным параметрам -# query = select(Job).filter(Job.active == 3) -# result = await db.execute(query) -# jobs = result.scalars().all() # Получаем список вакансий - -# if not jobs: -# return {"message": "Нет вакансий по данному фильтру"} - -# # Создаём записи в AppliedJob -# applied_jobs = [ -# AppliedJob(client_id=user_id, job_id=job.job_id, status="Scheduled") -# for job in jobs -# ] - -# db.add_all(applied_jobs) # Добавляем в сессию -# await db.commit() # Фиксируем изменения - - - -# return {"message": f"{len(applied_jobs)} вакансий добавлено в AppliedJob"} - - -async def add_jobs(db: AsyncSession, user_id: str): - # Получаем все активные вакансии (предполагаю, что ты хотел использовать True, а не 3) - query = select(Job).where(Job.active == 3) - result = await db.execute(query) - jobs = result.scalars().all() - - if not jobs: - return {"message": "Нет вакансий по данному фильтру"} - - applied_jobs = [] - scheduled_count = 0 - - for job in jobs: - # Проверяем, существует ли уже такая пара client_id + job_id - check_query = select(AppliedJob).where( - and_( - AppliedJob.client_id == user_id, - AppliedJob.job_id == job.job_id - ) - ) - check_result = await db.execute(check_query) - existing = check_result.scalar_one_or_none() - - if not existing: - # Первые 5 добавим со статусом Scheduled, остальные Backlogged - status = "Scheduled" if scheduled_count < 5 else "Backlogged" - applied_jobs.append( - AppliedJob(client_id=user_id, job_id=job.job_id, status=status) - ) - if status == "Scheduled": - scheduled_count += 1 - - if applied_jobs: - db.add_all(applied_jobs) - await db.commit() - return {"message": f"{len(applied_jobs)} вакансий добавлено в AppliedJob (Scheduled: {scheduled_count})"} - else: - return {"message": "Все вакансии уже были добавлены ранее"} - - -async def get_applied_jobs(db, client_id: int): - query = ( - select(AppliedJob) - .options(joinedload(AppliedJob.job)) # Подгружаем данные о вакансии - .where(AppliedJob.client_id == client_id) - ) - - result = await db.execute(query) - applied_jobs = result.scalars().all() - - if not applied_jobs: - return {"message": "Нет откликов на вакансии"} - - jobs_list = [] - - for applied in applied_jobs: - job = applied.job # Получаем объект Job из отношения AppliedJob.job - - jobs_list.append({ - "id": job.job_id, - "title": job.job_title, - "company": job.job_company, - "postedDate": job.date_posted, - "AppliedDate": applied.applied_on, - "location": job.location, - "jobType": job.job_type, - "salary": f"${job.minimum_annual_salary}K" if job.minimum_annual_salary else "Not specified", - "level": job.job_level, - "status": applied.status if applied.status else "Scheduled", - "requiredSkills": "", - "aboutJob": job.about, - "jobLink": job.link - }) - print(jobs_list) - return jobs_list - - - -async def client_list(client_id: int, db): - result = await db.execute( - select(Client).filter((Client.id == client_id))# | (Client.user_email == user_email)) - ) - clients = result.scalars().first() - clients_str= clients.json_data - # Если клиент найден - if clients: - client_dict = ast.literal_eval(clients_str) - # result = {client_id: client_dict} - print(f"===============================!!!!!{type(client_dict)}") - - return client_dict - else: - # Если клиент не найден, возвращаем None или обрабатываем ошибку - # return None - print("NOTTTTTTTTTTTTTTTTTTTTTTTTTTTTT") - - - -async def get_avtopilot(db, data): - try: - client_id = data.user_id # Используем client_id вместо user_id для согласованности - avtopilotss = data.avtopilot - - if avtopilotss is True: - # Для асинхронного обновления используем update().where().values() - stmt = ( - update(AppliedJob) - .where( - (AppliedJob.client_id == client_id) & - (AppliedJob.status == "Paused") - ) - .values(status="Scheduled") - ) - await db.execute(stmt) - await db.commit() - return True - - elif avtopilotss is False: - stmt = ( - update(AppliedJob) - .where( - (AppliedJob.client_id == client_id) & - (AppliedJob.status == "Scheduled") - ) - .values(status="Paused") - ) - await db.execute(stmt) - await db.commit() - return True - - else: - print("error") - return False - - except Exception as e: - print(f"Ошибка при обработке данных: {e}") - await db.rollback() - raise HTTPException(status_code=400, detail="Error processing data") - - -async def get_delite(db, data): - try: - client_id = data.user_id # Обратите внимание на точку вместо квадратных скобок - job_id = data.job_id # Так как data - это объект Pydantic модели - - print(f"Полученные данные: user_id={client_id}, avtopilotss={job_id}") - stmt = delete(AppliedJob).where( - (AppliedJob.client_id == client_id) & - (AppliedJob.job_id == job_id) - ) - - # Выполняем запрос с await - result = await db.execute(stmt) - await db.commit() - - # Получаем количество удаленных строк - deleted_count = result.rowcount - print(f"Удалено записей: {deleted_count}") - - - except Exception as e: - print(f"Ошибка при обработке данных: {e}") - raise HTTPException(status_code=400, detail="Error processing data") - - - - - -async def get_update(db: AsyncSession, data: update): - user_id = data.user_id - up_client = requests.get(f"{url_up}{user_id}") # Лучше заменить на httpx.AsyncClient - - if up_client.status_code == 200: - datas = up_client.json() - - # Проверяем, что 'json_data' есть в ответе - if 'json_data' not in datas: - raise HTTPException(status_code=400, detail="Missing 'json_data' in response") - - json_data = datas['json_data'] - - # Если json_data — строка, парсим её в словарь - if isinstance(json_data, str): - try: - data_dict = json.loads(json_data) - except json.JSONDecodeError: - raise HTTPException(status_code=400, detail="Invalid JSON in 'json_data'") - else: - data_dict = json_data # Если уже словарь, используем как есть - - # Проверяем, что data_dict — словарь - if not isinstance(data_dict, dict): - raise HTTPException(status_code=400, detail="'json_data' is not a valid dictionary") - - try: - first_name = data_dict['first_name'] - last_name = data_dict['last_name'] - email_addr = data_dict['email_addr'] - user_id = data_dict['user_id'] - phone_num = data_dict['phone_num'] - print(data_dict) - - # Обновляем или создаём клиента - async with db.begin(): - client = await db.get(Client, user_id) # Ищем клиента по ID - - if client: - # Обновляем существующего клиента - client.user_nicename = first_name # Исправлено: было user_login - client.phone = phone_num - client.json_data = str(data_dict) - else: - # Создаём нового клиента - client = Client( - id=user_id, - user_login=email_addr, # Или другой логин - user_nicename=f"{first_name} {last_name}", - user_email=email_addr, - phone=phone_num, - json_data=str(data_dict), - ) - db.add(client) - - - await db.commit() - return client - - except KeyError as e: - raise HTTPException(status_code=400, detail=f"Missing required field: {e}") - else: - raise HTTPException(status_code=up_client.status_code, detail="Failed to fetch data") - - - +import json +import ast +import requests + +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.future import select +from sqlalchemy.orm import joinedload +from sqlalchemy import delete +from sqlalchemy import update +from sqlalchemy import or_ +from sqlalchemy import select, and_ +from sqlalchemy.exc import IntegrityError +from model.database import Client, AppliedJob, Job + +from sqlalchemy import func, or_ +import urllib.parse +from dotenv import load_dotenv +import os + +# Загружаем переменные окружения +load_dotenv() + + +url_up = os.getenv('UP_DOMEN') + + + +async def get_filtered_jobs(db: AsyncSession, user_job_titles, minimum_annual_salary, salary_currency, + user_location_type, user_locations, user_levels, user_job_types): + title_filters = [] + location_filters = [] + + # Очистка и нормализация входных данных + def clean_text(text): + return urllib.parse.unquote(text).strip().lower() + + # 2. Фильтр по должностям (OR-группа) + if user_job_titles: + seen_titles = set() # Для избежания дубликатов + for title in user_job_titles: + clean_title = clean_text(title) + if clean_title in seen_titles: + continue + seen_titles.add(clean_title) + + words = clean_title.split() + if len(words) > 1: + # Для составных названий ищем все слова через AND + word_filters = [func.lower(Job.job_title).contains(word) for word in words] + title_filters.append(and_(*word_filters)) + else: + title_filters.append(func.lower(Job.job_title).contains(clean_title)) + + # 3. Фильтр по локациям (OR-группа) + if user_locations: + seen_locations = set() + for location in user_locations: + clean_loc = clean_text(location) + if clean_loc in seen_locations: + continue + seen_locations.add(clean_loc) + + location_filters.append(func.lower(Job.location).contains(clean_loc)) + + # Обработка составных локаций (City, Country) + if ',' in clean_loc: + main_part = clean_loc.split(',')[0].strip() + location_filters.append(func.lower(Job.location).contains(main_part)) + + # 4. Комбинирование фильтров + final_filters = [] + if title_filters: + final_filters.append(or_(*title_filters)) + if location_filters: + final_filters.append(or_(*location_filters)) + + if not final_filters: + return [] + + # 5. Формирование и выполнение запроса + query = select(Job).filter(*final_filters) + + # Для отладки + print("Final query:", str(query)) + + result = await db.execute(query) + jobs = result.scalars().all() + + print(f"Found {len(jobs)} matching jobs") + for job in jobs: + print(f"Match: {job.job_title} | {job.location}") + + return jobs + + + +async def upsert_client(db: AsyncSession, user_id: str, user_login: str, user_nicename: str, user_email: str, phone: str, json_data: str): + # Проверяем, существует ли клиент с таким логином или email + async with db.begin(): + result = await db.execute( + select(Client).filter((Client.id == user_id))# | (Client.user_email == user_email)) + ) + client = result.scalars().first() # Получаем первый результат или None + + if client: + # Если клиент существует, обновляем его данные + client.user_nicename = user_login + client.phone = phone + client.json_data = json_data + # Можно добавить другие поля для обновления + + try: + await db.commit() # Применяем изменения в базе данных + await db.refresh(client) # Обновляем объект в Python + return client + except IntegrityError: + await db.rollback() # В случае ошибки откатываем изменения + raise + else: + # Если клиента нет, создаем нового + new_client = Client( + user_login=user_login, + user_nicename=user_nicename, + user_email=user_email, + json_data=json_data, + ) + db.add(new_client) + try: + await db.commit() # Сохраняем нового клиента в базе данных + await db.refresh(new_client) # Обновляем объект в Python + return new_client + except IntegrityError: + await db.rollback() # В случае ошибки откатываем изменения + raise + + + + +async def del_jobs(db: AsyncSession, user_id: str): + jobs = await db.execute( + select(AppliedJob).where( + AppliedJob.client_id == user_id, + or_( + AppliedJob.status.in_(["Scheduled", "Backlogged"]), + AppliedJob.status.is_(None) + ) + ) + ) + jobs = jobs.scalars().all() + + for job in jobs: + await db.delete(job) + + await db.commit() + + +# async def add_jobs(db: AsyncSession, user_id: str): +# # Фильтруем вакансии по переданным параметрам +# query = select(Job).filter(Job.active == 3) +# result = await db.execute(query) +# jobs = result.scalars().all() # Получаем список вакансий + +# if not jobs: +# return {"message": "Нет вакансий по данному фильтру"} + +# # Создаём записи в AppliedJob +# applied_jobs = [ +# AppliedJob(client_id=user_id, job_id=job.job_id, status="Scheduled") +# for job in jobs +# ] + +# db.add_all(applied_jobs) # Добавляем в сессию +# await db.commit() # Фиксируем изменения + + + +# return {"message": f"{len(applied_jobs)} вакансий добавлено в AppliedJob"} + + +async def add_jobs(db: AsyncSession, user_id: str, up): + print(up) + + try: + user_data = up + print(type(user_data)) + job_preferences = user_data.get('job_preferences', {}) + + job_titles = job_preferences.get('job_titles', []) + locations = job_preferences.get('locations', []) + + location_types = [k for k, v in job_preferences.get('location_types', {}).items() if v is True] + job_levels = [k for k, v in job_preferences.get('job_levels', {}).items() if v is True] + job_types = [k for k, v in job_preferences.get('job_types', {}).items() if v is True] + + print("Job Titles:", job_titles) + print("Locations:", locations) + print("Location Types (True only):", location_types) + print("Job Levels (True only):", job_levels) + print("Job Types (True only):", job_types) + + except json.JSONDecodeError as e: + print("Ошибка декодирования JSON:", e) + + jobs = await get_filtered_jobs( + db, + user_job_titles=job_titles, + minimum_annual_salary=None, + salary_currency=None, + user_location_type=None, + user_locations=locations, + user_levels=None, + user_job_types=None + ) + + for job in jobs: + print(job.job_title, job.location, job.job_level, job.job_type) + # # Получаем все активные вакансии (предполагаю, что ты хотел использовать True, а не 3) + # # query = select(Job).where(Job.active == 3) + # # result = await db.execute(query) + # # jobs = result.scalars().all() + + if not jobs: + return {"message": "Нет вакансий по данному фильтру"} + + scheduled_count = 0 + added_count = 0 + + for job in jobs: + # Проверяем, существует ли уже такая пара client_id + job_id + check_query = select(AppliedJob).where( + and_( + AppliedJob.client_id == user_id, + AppliedJob.job_id == job.job_id + ) + ) + check_result = await db.execute(check_query) + existing = check_result.scalar_one_or_none() + + if not existing: + status = "Scheduled" if scheduled_count < 5 else "Backlogged" + new_applied = AppliedJob( + client_id=user_id, + job_id=job.job_id, + status=status + ) + db.add(new_applied) + await db.commit() # коммитим каждую запись сразу + scheduled_count += 1 if status == "Scheduled" else 0 + added_count += 1 + + if added_count > 0: + return { + "message": f"{added_count} вакансий добавлено в AppliedJob (Scheduled: {scheduled_count}, Backlogged: {added_count - scheduled_count})" + } + else: + return {"message": "Все вакансии уже были добавлены ранее"} + + +async def get_applied_jobs(db, client_id: int): + query = ( + select(AppliedJob) + .options(joinedload(AppliedJob.job)) # Подгружаем данные о вакансии + .where(AppliedJob.client_id == client_id) + ) + + result = await db.execute(query) + applied_jobs = result.scalars().all() + + if not applied_jobs: + return {"message": "Нет откликов на вакансии"} + + jobs_list = [] + + for applied in applied_jobs: + job = applied.job # Получаем объект Job из отношения AppliedJob.job + + jobs_list.append({ + "id": job.job_id, + "title": job.job_title, + "company": job.job_company, + "postedDate": job.date_posted, + "AppliedDate": applied.applied_on, + "location": job.location, + "jobType": job.job_type, + "salary": f"${job.minimum_annual_salary}K" if job.minimum_annual_salary else "Not specified", + "level": job.job_level, + "status": applied.status if applied.status else "Scheduled", + "requiredSkills": "", + "aboutJob": job.about, + "jobLink": job.link + }) + print(jobs_list) + return jobs_list + + + +async def client_list(client_id: int, db): + result = await db.execute( + select(Client).filter((Client.id == client_id))# | (Client.user_email == user_email)) + ) + clients = result.scalars().first() + clients_str= clients.json_data + # Если клиент найден + if clients: + client_dict = ast.literal_eval(clients_str) + # result = {client_id: client_dict} + print(f"===============================!!!!!{type(client_dict)}") + + return client_dict + else: + # Если клиент не найден, возвращаем None или обрабатываем ошибку + # return None + print("NOTTTTTTTTTTTTTTTTTTTTTTTTTTTTT") + + + +async def get_avtopilot(db, data): + try: + client_id = data.user_id # Используем client_id вместо user_id для согласованности + avtopilotss = data.avtopilot + + if avtopilotss is True: + # Для асинхронного обновления используем update().where().values() + stmt = ( + update(AppliedJob) + .where( + (AppliedJob.client_id == client_id) & + (AppliedJob.status == "Paused") + ) + .values(status="Scheduled") + ) + await db.execute(stmt) + await db.commit() + return True + + elif avtopilotss is False: + stmt = ( + update(AppliedJob) + .where( + (AppliedJob.client_id == client_id) & + (AppliedJob.status == "Scheduled") + ) + .values(status="Paused") + ) + await db.execute(stmt) + await db.commit() + return True + + else: + print("error") + return False + + except Exception as e: + print(f"Ошибка при обработке данных: {e}") + await db.rollback() + raise HTTPException(status_code=400, detail="Error processing data") + + +async def get_delite(db, data): + try: + client_id = data.user_id # Обратите внимание на точку вместо квадратных скобок + job_id = data.job_id # Так как data - это объект Pydantic модели + + print(f"Полученные данные: user_id={client_id}, avtopilotss={job_id}") + stmt = delete(AppliedJob).where( + (AppliedJob.client_id == client_id) & + (AppliedJob.job_id == job_id) + ) + + # Выполняем запрос с await + result = await db.execute(stmt) + await db.commit() + + # Получаем количество удаленных строк + deleted_count = result.rowcount + print(f"Удалено записей: {deleted_count}") + + + except Exception as e: + print(f"Ошибка при обработке данных: {e}") + raise HTTPException(status_code=400, detail="Error processing data") + + + + + +async def get_update(db: AsyncSession, data: update): + user_id = data.user_id + print(data) + up_client = requests.get(f"{url_up}{user_id}") # Лучше заменить на httpx.AsyncClient + print("22222") + if up_client.status_code == 200: + datas = up_client.json() + + # Проверяем, что 'json_data' есть в ответе + if 'json_data' not in datas: + raise HTTPException(status_code=400, detail="Missing 'json_data' in response") + + json_data = datas['json_data'] + + # Если json_data — строка, парсим её в словарь + if isinstance(json_data, str): + try: + data_dict = json.loads(json_data) + except json.JSONDecodeError: + raise HTTPException(status_code=400, detail="Invalid JSON in 'json_data'") + else: + data_dict = json_data # Если уже словарь, используем как есть + + # Проверяем, что data_dict — словарь + if not isinstance(data_dict, dict): + raise HTTPException(status_code=400, detail="'json_data' is not a valid dictionary") + + try: + first_name = data_dict['first_name'] + last_name = data_dict['last_name'] + email_addr = data_dict['email_addr'] + user_id = data_dict['user_id'] + phone_num = data_dict['phone_num'] + # print(data_dict) + + # Обновляем или создаём клиента + async with db.begin(): + client = await db.get(Client, user_id) # Ищем клиента по ID + + if client: + # Обновляем существующего клиента + client.user_nicename = first_name # Исправлено: было user_login + client.phone = phone_num + client.json_data = str(data_dict) + else: + # Создаём нового клиента + client = Client( + id=user_id, + user_login=email_addr, # Или другой логин + user_nicename=f"{first_name} {last_name}", + user_email=email_addr, + phone=phone_num, + json_data=str(data_dict), + ) + db.add(client) + + + await db.commit() + return client + + except KeyError as e: + raise HTTPException(status_code=400, detail=f"Missing required field: {e}") + else: + raise HTTPException(status_code=up_client.status_code, detail="Failed to fetch data") + + + + + +async def get_users(db: AsyncSession, data: update): + user_id = data.user_id + client = await db.get(Client, user_id) + return + print(client.json_data) \ No newline at end of file