linkedin2/utils/app.py

187 lines
5.8 KiB
Python

import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from linkedin_api import Linkedin
import logging
import schedule
import time
from utils.logging_setup import configure_global_logging
from datetime import datetime
import asyncio
import json
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
# from model.database import create_async_engine, Job
from model.database import get_async_session, Job
from dotenv import load_dotenv
import os
configure_global_logging()
load_dotenv()
# # Ваши учетные данные LinkedIn
username = os.getenv('USERNAME')
password = os.getenv('PASSWD')
# Authenticate using any Linkedin user account credentials
api = Linkedin(username, password)
def pars_jobs(geo):
search_jobs = api.search_jobs(location_geo_id = geo)
search_jobes = [
{
"title": item.get("title"),
"entityUrn": item.get("entityUrn", "").split(":")[-1],
}
for item in search_jobs
]
# print(search_job)
file_path = "search_jobes2.json"
with open(file_path, "w", encoding="utf-8") as json_file:
json.dump(search_jobes, json_file, indent=4, ensure_ascii=False)
print(f"Результаты успешно сохранены в {file_path}")
def add_to_bd():
#[ ]: Написать функцию записи в БД
pass
def get_job(job_id):
jobs = api.get_job(job_id)
text = jobs['description']['text']
location = jobs['formattedLocation']
title = jobs['title']
listedAt = jobs['listedAt']
# Преобразование из миллисекунд в секунды и конвертация
future_date = datetime.fromtimestamp(listedAt / 1000)
# Текущая дата
current_date = datetime.now()
# Разница в днях
difference = abs((future_date - current_date).days)
jobPostingId = jobs['jobPostingId']
localizedName = jobs['workplaceTypesResolutionResults']
# Извлекаем все localizedName
localized_names = [value['localizedName'] for value in localizedName.values()]
localized_name = ", ".join(localized_names)
# localized_name = workplaceTypesResolutionResults['urn:li:fs_workplaceType:2']['localizedName']
link = f'https://www.linkedin.com/jobs/view/{job_id}/'
names = jobs['companyDetails']['com.linkedin.voyager.deco.jobs.web.shared.WebCompactJobPostingCompany']['companyResolutionResult']['name']
url = jobs['companyDetails']['com.linkedin.voyager.deco.jobs.web.shared.WebCompactJobPostingCompany']['companyResolutionResult']['url']
# [ ]: job_level job_type hourly_rate найти minimum_annual_salary и salary_currency добавил description names Компании url на компанию
logging.info(f"title: {title}, location: {location}, jobPostingId: {jobPostingId}, difference: {difference}, location_type: {localized_name}, link: {link} ===== {names} {url}") #text:{text},
async def get_or_create_jobs(db: AsyncSession, job_id: int, titles: str):
""" Проверяет, существует ли запись, если нет — создаёт """
try:
query = select(Job).filter(Job.job_id == job_id)
result = await db.execute(query)
job = result.scalars().first()
if not job:
job = Job(
job_id=job_id,
job_title=titles
)
db.add(job)
await db.commit()
await db.refresh(job)
return job # Возвращаем объект
except Exception as e:
await db.rollback() # Откатываем транзакцию в случае ошибки
print(f"Ошибка при добавлении вакансии {job_id}: {e}")
return None
async def get_vakansi():
""" Читает данные из JSON и записывает их в БД """
file_path = "search_jobes2.json"
with open(file_path, "r", encoding="utf-8") as json_file:
data = json.load(json_file)
async with async_session_maker() as session: # Создаём сессию здесь!
for d in data:
title = d.get("title", "")
job_id = d.get("entityUrn", "")
if job_id:
await get_or_create_jobs(session, int(job_id), title) # Сохраняем в БД
print(f"{title} {job_id}")
geo = '100025096'
# pars_jobs(geo)
# def main():
# get_vakansi()
# logging.info("WORK")
# get_job('4130181356')
# pars_jobs(geo)
# main()
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
DATABASE_URL = os.getenv('DATABASE_URL')
engine = create_async_engine(DATABASE_URL, echo=True)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async_session_maker = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
async def main():
await get_vakansi()
await engine.dispose() # Корректно закрываем соединение перед завершением
if __name__ == "__main__":
asyncio.run(main()) # 🚀 Запускаем программу в единственном event loop
# [x] Обмен по времени не удалять main() что бы при старте сразу отрабатывала)
# Запуск функции каждые 5 минут
# schedule.every(5).minutes.do(main)
# Запуск функции каждые день
# schedule.every().day.at("08:30").do(main)
# # Основной цикл выполнения
# while True:
# schedule.run_pending() # Запускает запланированные задачи
# time.sleep(1) # Пауза между проверками