import sys import os sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from linkedin_api import Linkedin import logging import schedule import time from utils.logging_setup import configure_global_logging from datetime import datetime import asyncio import json from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.future import select # from model.database import create_async_engine, Job from model.database import get_async_session, Job from dotenv import load_dotenv import os configure_global_logging() load_dotenv() # # Ваши учетные данные LinkedIn username = os.getenv('USERNAME') password = os.getenv('PASSWD') # Authenticate using any Linkedin user account credentials api = Linkedin(username, password) def pars_jobs(geo): search_jobs = api.search_jobs(location_geo_id = geo) search_jobes = [ { "title": item.get("title"), "entityUrn": item.get("entityUrn", "").split(":")[-1], } for item in search_jobs ] # print(search_job) file_path = "search_jobes2.json" with open(file_path, "w", encoding="utf-8") as json_file: json.dump(search_jobes, json_file, indent=4, ensure_ascii=False) print(f"Результаты успешно сохранены в {file_path}") def add_to_bd(): #[ ]: Написать функцию записи в БД pass def get_job(job_id): jobs = api.get_job(job_id) text = jobs['description']['text'] location = jobs['formattedLocation'] title = jobs['title'] listedAt = jobs['listedAt'] # Преобразование из миллисекунд в секунды и конвертация future_date = datetime.fromtimestamp(listedAt / 1000) # Текущая дата current_date = datetime.now() # Разница в днях difference = abs((future_date - current_date).days) jobPostingId = jobs['jobPostingId'] localizedName = jobs['workplaceTypesResolutionResults'] # Извлекаем все localizedName localized_names = [value['localizedName'] for value in localizedName.values()] localized_name = ", ".join(localized_names) # localized_name = workplaceTypesResolutionResults['urn:li:fs_workplaceType:2']['localizedName'] link = f'https://www.linkedin.com/jobs/view/{job_id}/' names = jobs['companyDetails']['com.linkedin.voyager.deco.jobs.web.shared.WebCompactJobPostingCompany']['companyResolutionResult']['name'] url = jobs['companyDetails']['com.linkedin.voyager.deco.jobs.web.shared.WebCompactJobPostingCompany']['companyResolutionResult']['url'] # [ ]: job_level job_type hourly_rate найти minimum_annual_salary и salary_currency добавил description names Компании url на компанию logging.info(f"title: {title}, location: {location}, jobPostingId: {jobPostingId}, difference: {difference}, location_type: {localized_name}, link: {link} ===== {names} {url}") #text:{text}, async def get_or_create_jobs(db: AsyncSession, job_id: int, titles: str): """ Проверяет, существует ли запись, если нет — создаёт """ try: query = select(Job).filter(Job.job_id == job_id) result = await db.execute(query) job = result.scalars().first() if not job: job = Job( job_id=job_id, job_title=titles ) db.add(job) await db.commit() await db.refresh(job) return job # Возвращаем объект except Exception as e: await db.rollback() # Откатываем транзакцию в случае ошибки print(f"Ошибка при добавлении вакансии {job_id}: {e}") return None async def get_vakansi(): """ Читает данные из JSON и записывает их в БД """ file_path = "search_jobes2.json" with open(file_path, "r", encoding="utf-8") as json_file: data = json.load(json_file) async with async_session_maker() as session: # Создаём сессию здесь! for d in data: title = d.get("title", "") job_id = d.get("entityUrn", "") if job_id: await get_or_create_jobs(session, int(job_id), title) # Сохраняем в БД print(f"{title} {job_id}") geo = '100025096' # pars_jobs(geo) # def main(): # get_vakansi() # logging.info("WORK") # get_job('4130181356') # pars_jobs(geo) # main() from sqlalchemy.orm import sessionmaker, declarative_base from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine DATABASE_URL = os.getenv('DATABASE_URL') engine = create_async_engine(DATABASE_URL, echo=True) async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) async_session_maker = sessionmaker( engine, class_=AsyncSession, expire_on_commit=False ) async def main(): await get_vakansi() await engine.dispose() # Корректно закрываем соединение перед завершением if __name__ == "__main__": asyncio.run(main()) # 🚀 Запускаем программу в единственном event loop # [x] Обмен по времени не удалять main() что бы при старте сразу отрабатывала) # Запуск функции каждые 5 минут # schedule.every(5).minutes.do(main) # Запуск функции каждые день # schedule.every().day.at("08:30").do(main) # # Основной цикл выполнения # while True: # schedule.run_pending() # Запускает запланированные задачи # time.sleep(1) # Пауза между проверками