import sys import os sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from linkedin_api import Linkedin import logging import schedule import time from utils.logging_setup import configure_global_logging from datetime import datetime import requests from bs4 import BeautifulSoup import asyncio import json from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.future import select # from model.database import create_async_engine, Job from model.database import get_async_session, Job from dotenv import load_dotenv import os configure_global_logging() load_dotenv() headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", "Accept-Language": "en-US,en;q=0.9", } # # Ваши учетные данные LinkedIn username = os.getenv('USERNAME') password = os.getenv('PASSWD') # Authenticate using any Linkedin user account credentials api = Linkedin(username, password) def pars_jobs(geo): search_jobs = api.search_jobs(location_geo_id = geo) search_jobes = [ { "title": item.get("title"), "entityUrn": item.get("entityUrn", "").split(":")[-1], } for item in search_jobs ] file_path = "search_jobes2.json" with open(file_path, "w", encoding="utf-8") as json_file: json.dump(search_jobes, json_file, indent=4, ensure_ascii=False) # file_path = "search_jobes3.json" # with open(file_path, "w", encoding="utf-8") as json_file: # json.dump(search_jobs, json_file, indent=4, ensure_ascii=False) print(f"Результаты успешно сохранены в {file_path}") def add_to_bd(): #[ ]: Написать функцию записи в БД pass async def typess(url): response = requests.get(url, headers=headers) soup = BeautifulSoup(response.text, "html.parser") # Находим все элементы с описанием критериев вакансии criteria = soup.find_all("span", class_="description__job-criteria-text") if len(criteria) >= 2: level = criteria[0].get_text(strip=True) # Уровень должности job_type = criteria[1].get_text(strip=True) # Тип занятости else: level = "Не найдено" job_type = "Не найдено" # print(f"Job Level: {level}") # print(f"Type of Employment: {job_type}") return level, job_type async def get_job(db: AsyncSession, job_id: str): try: jobs = api.get_job(job_id) # Проверка наличия всех необходимых данных в ответе required_keys = ["description", "formattedLocation", "title", "listedAt", "companyDetails"] for key in required_keys: if key not in jobs: logging.error(f"❌ Ошибка: Ключ {key} отсутствует в API-ответе") return None # Извлечение данных text = jobs['description']['text'] location = jobs['formattedLocation'] title = jobs['title'] listed_at = jobs['listedAt'] listed_ats = datetime.utcfromtimestamp(listed_at / 1000).strftime('%Y-%m-%d %H:%M:%S') company_info = jobs.get("companyDetails", {}).get("com.linkedin.voyager.deco.jobs.web.shared.WebCompactJobPostingCompany", {}).get("companyResolutionResult", {}) company_name = company_info.get("name", "Unknown") company_url = company_info.get("url", "") link = f'https://www.linkedin.com/jobs/view/{job_id}/' # about = workplace_types = jobs.get("workplaceTypesResolutionResults", {}) if workplace_types: first_key = next(iter(workplace_types)) # Берём первый (и единственный) ключ workplace_data = workplace_types[first_key] localized_name = workplace_data.get("localizedName", "Unknown") entity_urn = workplace_data.get("entityUrn", "") logging.info(f"🏢 Тип работы: {localized_name} ({entity_urn})") else: localized_name = "Unknown" entity_urn = "" # Теперь можно добавить эти значения в БД # job.location_type = localized_name # job.entity_urn = entity_urn level, job_type = await typess(link) # Проверка, есть ли вакансия в базе query = select(Job).filter(Job.job_id == job_id) result = await db.execute(query) job = result.scalars().first() if job: logging.info(f"🔄 Обновление вакансии {job_id} в базе...") job.text = json.dumps(jobs) job.about = text job.link = link # job.days_posted = listed_ats job.location = location job.job_company = company_name job.date_posted = listed_ats job.link_company = company_url job.location_type = localized_name job.job_level = level job.job_type = job_type job.days_posted = 5 else: logging.info(f"🆕 Добавление вакансии {job_id} в базу...") job = Job( job_id=job_id, text=json.dumps(jobs), link=link, location=location, job_company=company_name, link_company=company_url ) db.add(job) # Коммит и обновление внутри активной сессии await db.commit() await db.refresh(job) logging.info(f"✅ Вакансия {job_id} успешно сохранена") return job except Exception as e: # await db.rollback() # Откат транзакции при ошибке logging.error(f"❌ Ошибка при обработке вакансии {job_id}: {e}") # return None # except Exception as e: # await db.rollback() # logging.error(f"❌ Ошибка при обработке вакансии {job_id}: {e}") # return None # [ ]: job_level job_type hourly_rate найти minimum_annual_salary и salary_currency добавил description names Компании url на компанию # logging.info(f"title: {title}, location: {location}, jobPostingId: {jobPostingId}, difference: {difference}, location_type: {localized_name}, link: {link} ===== {url}") #text:{text}, async def get_or_create_jobs(db: AsyncSession, job_id: int, titles: str): """ Проверяет, существует ли запись, если нет — создаёт """ try: query = select(Job).filter(Job.job_id == job_id) result = await db.execute(query) job = result.scalars().first() if not job: job = Job( job_id=job_id, job_title=titles, days_posted=7 ) db.add(job) await db.commit() await db.refresh(job) return job # Возвращаем объект except Exception as e: await db.rollback() # Откатываем транзакцию в случае ошибки print(f"Ошибка при добавлении вакансии {job_id}: {e}") return None async def get_vakansi(): """ Читает данные из JSON и записывает их в БД """ file_path = "search_jobes2.json" with open(file_path, "r", encoding="utf-8") as json_file: data = json.load(json_file) # async with async_session_maker() as session: # Создаём сессию здесь! async for session in get_async_session(): for d in data: title = d.get("title", "") job_id = d.get("entityUrn", "") if job_id: await get_or_create_jobs(session, int(job_id), title) # Сохраняем в БД print(f"{title} {job_id}") # geo = '100025096' # pars_jobs(geo) # async def main(): # await get_vakansi() #[]TODO!!! async def main(): async for db in get_async_session(): # Асинхронный генератор сессий query = select(Job).filter(Job.days_posted == 7) result = await db.execute(query) jobs = result.scalars().all() # Получаем ВСЕ записи в виде списка for job in jobs: print(job.job_id) await get_job(db, job.job_id) # if __name__ == "__main__": # asyncio.run(main()) if __name__ == "__main__": asyncio.run(main()) # from sqlalchemy.orm import sessionmaker, declarative_base # from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine # DATABASE_URL = os.getenv('DATABASE_URL') # engine = create_async_engine(DATABASE_URL, echo=True) # async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) # async_session_maker = sessionmaker( # engine, class_=AsyncSession, expire_on_commit=False # ) # async def main(): # await get_vakansi() # await engine.dispose() # Корректно закрываем соединение перед завершением # if __name__ == "__main__": # asyncio.run(main()) # 🚀 Запускаем программу в единственном event loop # [x] Обмен по времени не удалять main() что бы при старте сразу отрабатывала) # Запуск функции каждые 5 минут # schedule.every(5).minutes.do(main) # Запуск функции каждые день # schedule.every().day.at("08:30").do(main) # # Основной цикл выполнения # while True: # schedule.run_pending() # Запускает запланированные задачи # time.sleep(1) # Пауза между проверками # async def get_or_create_jobs(db: AsyncSession, job_id: int, titles: str): # """ Проверяет, существует ли запись, если нет — создаёт """ # try: # query = select(Job).filter(Job.job_id == job_id) # result = await db.execute(query) # job = result.scalars().first() # if not job: # job = Job( # job_id=job_id, # job_title=titles # ) # db.add(job) # await db.commit() # await db.refresh(job) # return job # Возвращаем объект # except Exception as e: # await db.rollback() # Откатываем транзакцию в случае ошибки # print(f"Ошибка при добавлении вакансии {job_id}: {e}") # return None # async def get_vakansi(): # """ Читает данные из JSON и записывает их в БД """ # file_path = "search_jobes2.json" # try: # with open(file_path, "r", encoding="utf-8") as json_file: # data = json.load(json_file) # except Exception as e: # print(f"Ошибка чтения JSON: {e}") # return # async for session in get_async_session(): # Создаём сессию здесь! # for d in data: # title = d.get("title", "") # job_id = d.get("entityUrn", "") # if job_id: # await get_or_create_jobs(session, int(job_id), title) # Сохраняем в БД # print(f"{title} {job_id}") # async def main(): # await get_vakansi() # Здесь должен быть await! # if __name__ == "__main__": # asyncio.run(main())