linkedin2/utils/app.py

380 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from linkedin_api import Linkedin
import logging
import schedule
import time
from utils.logging_setup import configure_global_logging
from datetime import datetime
import requests
from bs4 import BeautifulSoup
import asyncio
import json
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
# from model.database import create_async_engine, Job
from model.database import get_async_session, Job
from dotenv import load_dotenv
import os
configure_global_logging()
load_dotenv()
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"Accept-Language": "en-US,en;q=0.9",
}
# # Ваши учетные данные LinkedIn
username = os.getenv('USERNAME')
password = os.getenv('PASSWD')
# Authenticate using any Linkedin user account credentials
api = Linkedin(username, password)
async def pars_jobs(geo):
search_jobs = api.search_jobs(location_geo_id = geo)
search_jobes = [
{
"title": item.get("title"),
"entityUrn": item.get("entityUrn", "").split(":")[-1],
}
for item in search_jobs
]
file_path = "search_jobes2.json"
with open(file_path, "w", encoding="utf-8") as json_file:
json.dump(search_jobes, json_file, indent=4, ensure_ascii=False)
# file_path = "search_jobes3.json"
# with open(file_path, "w", encoding="utf-8") as json_file:
# json.dump(search_jobs, json_file, indent=4, ensure_ascii=False)
print(f"Результаты успешно сохранены в {file_path}")
def add_to_bd():
#[ ]: Написать функцию записи в БД
pass
async def typess(url):
response = requests.get(url, headers=headers)
soup = BeautifulSoup(response.text, "html.parser")
# Находим все элементы с описанием критериев вакансии
criteria = soup.find_all("span", class_="description__job-criteria-text")
if len(criteria) >= 2:
level = criteria[0].get_text(strip=True) # Уровень должности
job_type = criteria[1].get_text(strip=True) # Тип занятости
else:
level = "Не найдено"
job_type = "Не найдено"
# print(f"Job Level: {level}")
# print(f"Type of Employment: {job_type}")
return level, job_type
async def get_job(db: AsyncSession, job_id: str):
try:
jobs = api.get_job(job_id)
# Проверка наличия всех необходимых данных в ответе
required_keys = ["description", "formattedLocation", "title", "listedAt", "companyDetails"]
for key in required_keys:
if key not in jobs:
logging.error(f"❌ Ошибка: Ключ {key} отсутствует в API-ответе")
return None
# Извлечение данных
text = jobs['description']['text']
location = jobs['formattedLocation']
title = jobs['title']
listed_at = jobs['listedAt']
listed_ats = datetime.utcfromtimestamp(listed_at / 1000).strftime('%Y-%m-%d %H:%M:%S')
company_info = jobs.get("companyDetails", {}).get("com.linkedin.voyager.deco.jobs.web.shared.WebCompactJobPostingCompany", {}).get("companyResolutionResult", {})
company_name = company_info.get("name", "Unknown")
company_url = company_info.get("url", "")
link = f'https://www.linkedin.com/jobs/view/{job_id}/'
# about =
workplace_types = jobs.get("workplaceTypesResolutionResults", {})
if workplace_types:
first_key = next(iter(workplace_types)) # Берём первый (и единственный) ключ
workplace_data = workplace_types[first_key]
localized_name = workplace_data.get("localizedName", "Unknown")
entity_urn = workplace_data.get("entityUrn", "")
logging.info(f"🏢 Тип работы: {localized_name} ({entity_urn})")
else:
localized_name = "Unknown"
entity_urn = ""
# Теперь можно добавить эти значения в БД
# job.location_type = localized_name
# job.entity_urn = entity_urn
level, job_type = await typess(link)
# Проверка, есть ли вакансия в базе
query = select(Job).filter(Job.job_id == job_id)
result = await db.execute(query)
job = result.scalars().first()
if job:
logging.info(f"🔄 Обновление вакансии {job_id} в базе...")
job.text = json.dumps(jobs)
job.about = text
job.link = link
# job.days_posted = listed_ats
job.location = location
job.job_company = company_name
job.date_posted = listed_ats
job.link_company = company_url
job.location_type = localized_name
job.job_level = level
job.job_type = job_type
job.days_posted = 5
else:
logging.info(f"🆕 Добавление вакансии {job_id} в базу...")
job = Job(
job_id=job_id,
text=json.dumps(jobs),
link=link,
location=location,
job_company=company_name,
link_company=company_url
)
db.add(job)
# Коммит и обновление внутри активной сессии
await db.commit()
await db.refresh(job)
logging.info(f"✅ Вакансия {job_id} успешно сохранена")
return job
except Exception as e:
# await db.rollback() # Откат транзакции при ошибке
logging.error(f"❌ Ошибка при обработке вакансии {job_id}: {e}")
# return None
# except Exception as e:
# await db.rollback()
# logging.error(f"❌ Ошибка при обработке вакансии {job_id}: {e}")
# return None
# [ ]: job_level job_type hourly_rate найти minimum_annual_salary и salary_currency добавил description names Компании url на компанию
# logging.info(f"title: {title}, location: {location}, jobPostingId: {jobPostingId}, difference: {difference}, location_type: {localized_name}, link: {link} ===== {url}") #text:{text},
async def get_or_create_jobs(db: AsyncSession, job_id: int, titles: str):
""" Проверяет, существует ли запись, если нет — создаёт """
try:
query = select(Job).filter(Job.job_id == job_id)
result = await db.execute(query)
job = result.scalars().first()
if not job:
job = Job(
job_id=job_id,
job_title=titles,
days_posted=7
)
db.add(job)
await db.commit()
await db.refresh(job)
return job # Возвращаем объект
except Exception as e:
await db.rollback() # Откатываем транзакцию в случае ошибки
print(f"Ошибка при добавлении вакансии {job_id}: {e}")
return None
async def get_vakansi():
""" Читает данные из JSON и записывает их в БД """
file_path = "search_jobes2.json"
with open(file_path, "r", encoding="utf-8") as json_file:
data = json.load(json_file)
# async with async_session_maker() as session: # Создаём сессию здесь!
async for session in get_async_session():
for d in data:
title = d.get("title", "")
job_id = d.get("entityUrn", "")
if job_id:
await get_or_create_jobs(session, int(job_id), title) # Сохраняем в БД
print(f"{title} {job_id}")
# geo = '100025096' #ON
# geo = '101174742' #Canada
# geo = '103644278' #USA
# pars_jobs(geo)
# async def main():
# await get_vakansi()
#[]TODO!!!
async def process_jobs():
async for db in get_async_session(): # Асинхронный генератор сессий
query = select(Job).filter(Job.days_posted == 7)
result = await db.execute(query)
jobs = result.scalars().all() # Получаем ВСЕ записи в виде списка
for job in jobs:
print(job.job_id)
await get_job(db, job.job_id)
# if __name__ == "__main__":
# asyncio.run(main())
# import asyncio
# async def pars_jobs(geo):
# print(f"Parsing jobs for {geo}")
# await asyncio.sleep(1) # Имитация асинхронной операции
# async def get_vakansi():
# print("Fetching vacancies")
# await asyncio.sleep(1) # Имитация асинхронной операции
async def main():
geo_list = ['100025096', '101174742', '103644278'] # ON, Canada, USA
for geo in geo_list:
await pars_jobs(geo)
await get_vakansi()
await process_jobs() # Вызываем обработку вакансий
if __name__ == "__main__":
asyncio.run(main())
# from sqlalchemy.orm import sessionmaker, declarative_base
# from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
# DATABASE_URL = os.getenv('DATABASE_URL')
# engine = create_async_engine(DATABASE_URL, echo=True)
# async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
# async_session_maker = sessionmaker(
# engine, class_=AsyncSession, expire_on_commit=False
# )
# async def main():
# await get_vakansi()
# await engine.dispose() # Корректно закрываем соединение перед завершением
# if __name__ == "__main__":
# asyncio.run(main()) # 🚀 Запускаем программу в единственном event loop
# [x] Обмен по времени не удалять main() что бы при старте сразу отрабатывала)
# Запуск функции каждые 5 минут
# schedule.every(5).minutes.do(main)
# Запуск функции каждые день
# schedule.every().day.at("08:30").do(main)
# # Основной цикл выполнения
# while True:
# schedule.run_pending() # Запускает запланированные задачи
# time.sleep(1) # Пауза между проверками
# async def get_or_create_jobs(db: AsyncSession, job_id: int, titles: str):
# """ Проверяет, существует ли запись, если нет — создаёт """
# try:
# query = select(Job).filter(Job.job_id == job_id)
# result = await db.execute(query)
# job = result.scalars().first()
# if not job:
# job = Job(
# job_id=job_id,
# job_title=titles
# )
# db.add(job)
# await db.commit()
# await db.refresh(job)
# return job # Возвращаем объект
# except Exception as e:
# await db.rollback() # Откатываем транзакцию в случае ошибки
# print(f"Ошибка при добавлении вакансии {job_id}: {e}")
# return None
# async def get_vakansi():
# """ Читает данные из JSON и записывает их в БД """
# file_path = "search_jobes2.json"
# try:
# with open(file_path, "r", encoding="utf-8") as json_file:
# data = json.load(json_file)
# except Exception as e:
# print(f"Ошибка чтения JSON: {e}")
# return
# async for session in get_async_session(): # Создаём сессию здесь!
# for d in data:
# title = d.get("title", "")
# job_id = d.get("entityUrn", "")
# if job_id:
# await get_or_create_jobs(session, int(job_id), title) # Сохраняем в БД
# print(f"{title} {job_id}")
# async def main():
# await get_vakansi() # Здесь должен быть await!
# if __name__ == "__main__":
# asyncio.run(main())