linkedin2/routers/product.py

430 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import FastAPI, APIRouter, Depends, Request, HTTPException, Form
from fastapi import Request, Query
from typing import List, Optional
from fastapi.templating import Jinja2Templates
from fastapi.responses import RedirectResponse, HTMLResponse
from fastapi.responses import JSONResponse
import jwt
from pydantic import BaseModel
from sqlalchemy.future import select
from sqlalchemy.orm import joinedload, selectinload
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import and_, between
from sqlalchemy.sql import func
from sqlalchemy import or_, and_, select
from sqlalchemy import null
from datetime import datetime, timedelta
from sqlalchemy import func
from routers.auth import get_current_user
from model.database import get_async_session, Job, Client, AppliedJob, User
router = APIRouter()
templates = Jinja2Templates(directory="templates")
@router.get("/product")
async def product(request: Request,
username: str = Depends(get_current_user),
session: AsyncSession = Depends(get_async_session)):
size = "Work"
username = username
result = await session.execute(
select(AppliedJob)
.options(
joinedload(AppliedJob.client), # Подгружаем клиента
joinedload(AppliedJob.job), # Подгружаем работу
joinedload(AppliedJob.users) # Подгружаем пользователя
)
)
applied_jobs = result.scalars().all()
selected_client_ids = list({job.client.id for job in applied_jobs if job.client})
result1 = await session.execute(select(User))
users = result1.scalars().all()
return templates.TemplateResponse("product.html", {"request": request, "size": size,
"jobs": applied_jobs, "role": username["role"],
"selected_client_ids": selected_client_ids,
"username": username['username'], "users": users,
"current_path": request.url.path })#
@router.get("/productuj")
async def productuj(request: Request,
username: str = Depends(get_current_user),
session: AsyncSession = Depends(get_async_session)):
size = "Work"
username = username
result = await session.execute(
select(AppliedJob)
.where(
and_(
or_(
AppliedJob.status.is_(None),
AppliedJob.status.in_(["Scheduled", "Requested"])
),
AppliedJob.assignee.is_(None)
)
)
.options(
joinedload(AppliedJob.client),
joinedload(AppliedJob.job),
joinedload(AppliedJob.users)
)
)
applied_jobs = result.scalars().all()
result1 = await session.execute(select(User))
users = result1.scalars().all()
return templates.TemplateResponse("productuj.html", {"request": request, "size": size,
"jobs": applied_jobs, "role": username["role"],
"username": username['username'], "users": users,
"current_path": request.url.path })#
@router.get("/productmj")
async def productmj(request: Request,
username: str = Depends(get_current_user),
session: AsyncSession = Depends(get_async_session)):
size = "Work"
username = username
user_result = await session.execute(
select(User.id).where(User.username == username['username'])
)
user_id = user_result.scalar() # Достаём ID
if user_id is None:
raise ValueError(f"Пользователь с username '{username['username']}' не найден")
# Теперь используем user_id для фильтрации AppliedJob
result = await session.execute(
select(AppliedJob)
.where(
and_(
AppliedJob.status.in_(["In-Progress"]),
AppliedJob.assignee == user_id # Теперь сравниваем с id, а не username
)
)
.options(
joinedload(AppliedJob.client),
joinedload(AppliedJob.job),
joinedload(AppliedJob.users)
)
)
applied_jobs = result.scalars().all()
result1 = await session.execute(select(User))
users = result1.scalars().all()
return templates.TemplateResponse("productmj.html", {"request": request, "size": size,
"jobs": applied_jobs, "role": username["role"],
"username": username['username'], "users": users,
"current_path": request.url.path })#
@router.get("/productoj")
async def productoj(request: Request,
username: str = Depends(get_current_user),
session: AsyncSession = Depends(get_async_session)):
size = "Work"
username = username
result = await session.execute(
select(AppliedJob)
.where(
and_(
or_(
AppliedJob.status.in_(["Scheduled", "Requested", "In-Progress"])
),
# AppliedJob.assignee.is_(None)
)
)
.options(
joinedload(AppliedJob.client),
joinedload(AppliedJob.job),
joinedload(AppliedJob.users)
)
)
applied_jobs = result.scalars().all()
result1 = await session.execute(select(User))
users = result1.scalars().all()
return templates.TemplateResponse("productoj.html", {"request": request, "size": size,
"jobs": applied_jobs, "role": username["role"],
"username": username['username'], "users": users,
"current_path": request.url.path })#
@router.post("/productf")
async def product_filtered(
request: Request,
username: str = Depends(get_current_user),
session: AsyncSession = Depends(get_async_session)
):
# Преобразуем строки в списки
size = "Work"
username = username
form_data = await request.form()
# Преобразуем FormData в словарь, где ключи - имена полей, значения - списки значений
form_dict = {}
for key, value in form_data.multi_items():
if key not in form_dict:
form_dict[key] = []
form_dict[key].append(value)
# Получаем значения для каждого поля
date_requested = form_dict.get('date_requested', [''])[0]
date_requested_from = form_dict.get('date_requested_from', [''])[0]
date_requested_to = form_dict.get('date_requested_to', [''])[0]
date_posted = form_dict.get('date_posted', [''])[0] # Обратите внимание на возможную опечатку в ключе ('date_posted' vs 'date_posted')
date_posted_from = form_dict.get('date_posted_from', [''])[0]
date_posted_to = form_dict.get('date_posted_to', [''])[0]
date_applied = form_dict.get('date_applied', [''])[0]
date_applied_from = form_dict.get('date_applied_from', [''])[0]
date_applied_to = form_dict.get('date_applied_to', [''])[0]
client = form_dict.get('client', [''])
assignees = form_dict.get('assignee', []) # Это будет список всех выбранных assignees
status = form_dict.get('status', [''])
# Теперь у вас есть все значения в отдельных переменных
print(f"Date requested: {date_requested}, from: {date_requested_from}, to: {date_requested_to}")
print(f"Date posted: {date_posted}, from: {date_posted_from}, to: {date_posted_to}")
print(f"Date applied: {date_applied}, from: {date_applied_from}, to: {date_applied_to}")
print(f"Client: {client}")
print(f"Assignees: {assignees}")
print(f"Status: {status}")
# Строим базовый запрос с join таблиц
# Строим базовый запрос
# Получаем все выбранные значения
filters = {
'date_requested': form_dict.get('date_requested', [''])[0],
'date_requested_from': form_dict.get('date_requested_from', [''])[0],
'date_requested_to': form_dict.get('date_requested_to', [''])[0],
'date_posted': form_dict.get('date_posted', [''])[0],
'date_posted_from': form_dict.get('date_posted_from', [''])[0],
'date_posted_to': form_dict.get('date_posted_to', [''])[0],
'date_applied': form_dict.get('date_applied', [''])[0],
'date_applied_from': form_dict.get('date_applied_from', [''])[0],
'date_applied_to': form_dict.get('date_applied_to', [''])[0],
'client': form_dict.get('client', []), # Список всех выбранных клиентов
'assignees': form_dict.get('assignee', []), # Список всех выбранных assignees
'status': form_dict.get('status', []), # Список всех выбранных статусов
}
# Строим базовый запрос с загрузкой связанных данных
stmt = select(AppliedJob).options(
selectinload(AppliedJob.job),
selectinload(AppliedJob.client),
selectinload(AppliedJob.users)
)
# Добавляем условия фильтрации
conditions = []
# http://127.0.0.1:8120/product
# Фильтр по дате requested
if date_requested == 'custom_date_requested' and date_requested_from and date_requested_to:
start = datetime.strptime(date_requested_from, '%Y-%m-%d')
end = datetime.strptime(date_requested_to, '%Y-%m-%d')
conditions.append(Job.data_requested.between(start, end))
elif date_requested == 'Today':
# conditions.append(func.date(Job.data_requested) == func.current_date())
today = datetime.now().date() # Получаем сегодняшнюю дату
conditions.append(Job.data_requested >= today)
conditions.append(Job.data_requested < today + timedelta(days=1))
elif date_requested == 'Yesterday':
conditions.append(func.date(Job.data_requested) == func.date_sub(func.current_date(), 1))
elif date_requested == 'Last 7 days':
seven_days_ago = datetime.now().date() - timedelta(days=6)
tomorrow = datetime.now().date() + timedelta(days=1)
conditions.append(Job.data_requested >= seven_days_ago)
conditions.append(Job.data_requested < tomorrow)
print(f"++++++++++++++++++++++++++++++++++++++++++{ (conditions)}")
# Фильтр по дате posted
if date_posted == 'custom_date_posted' and date_posted_from and date_posted_to:
start = datetime.strptime(date_posted_from, '%Y-%m-%d')
end = datetime.strptime(date_posted_to, '%Y-%m-%d')
conditions.append(Job.date_posted.between(start, end))
elif date_posted == 'Today':
# conditions.append(func.date(Job.date_posted) == func.current_date())
today = datetime.now().date() # Получаем сегодняшнюю дату
conditions.append(Job.date_posted >= today)
conditions.append(Job.date_posted < today + timedelta(days=1))
elif date_posted == 'Yesterday':
conditions.append(func.date(Job.date_posted) == func.date_sub(func.current_date(), 1))
elif date_posted == 'Last 7 days':
seven_days_ago = datetime.now().date() - timedelta(days=6)
tomorrow = datetime.now().date() + timedelta(days=1)
conditions.append(Job.date_posted >= seven_days_ago)
conditions.append(Job.date_posted < tomorrow)
# Фильтр по дате applied
if date_applied == 'custom_date_applied' and date_applied_from and date_applied_to:
start = datetime.strptime(date_applied_from, '%Y-%m-%d')
end = datetime.strptime(date_applied_to, '%Y-%m-%d')
conditions.append(AppliedJob.applied_on.between(start, end))
elif date_applied == 'Today':
# conditions.append(func.date(AppliedJob.applied_on) == func.current_date())
today = datetime.now().date() # Получаем сегодняшнюю дату
conditions.append(AppliedJob.applied_on >= today)
conditions.append(AppliedJob.applied_on < today + timedelta(days=1))
elif date_applied == 'Yesterday':
conditions.append(func.date(AppliedJob.applied_on) == func.date_sub(func.current_date(), 1))
elif date_applied == 'Last 7 days':
seven_days_ago = datetime.now().date() - timedelta(days=6)
tomorrow = datetime.now().date() + timedelta(days=1)
conditions.append(AppliedJob.applied_on >= seven_days_ago)
conditions.append(AppliedJob.applied_on < tomorrow)
# Фильтр по клиенту
if client and client[0]:
conditions.append(AppliedJob.client_id.in_([int(c) for c in client]))
# Фильтр по assignees
if assignees:
conditions.append(AppliedJob.assignee.in_([int(a) for a in assignees]))
# Фильтр по статусу
if status and status[0]:
conditions.append(AppliedJob.status.in_(status))
# Применяем все условия
if conditions:
stmt = stmt.join(Job).where(and_(*conditions))
print("client_id:", client)
print("status:", status)
# Выполняем запрос
try:
result = await session.execute(stmt)
applied_jobs = result.scalars().all()
result1 = await session.execute(select(User))
users = result1.scalars().all()
except Exception as e:
await session.rollback()
raise HTTPException(status_code=500, detail=str(e))
return templates.TemplateResponse(
"productf.html",
{
"request": request, "size": size,
"jobs": applied_jobs, "role": username["role"],
"username": username['username'], "users": users,
"role": username["role"],
"status": status, "assigned_users_ids": assignees,
"client_id": client,
"date_requested":date_requested,
"date_requested_from": date_requested_from,
"date_requested_to":date_requested_to,
"date_posted": date_posted,
"date_posted_from": date_posted_from,
"date_posted_to": date_posted_to,
"date_applied":date_applied,
"date_applied_from": date_applied_from,
"date_applied_to": date_applied_to,
"current_path": request.url.path
}
)
# Pydantic модель запроса
class StatusUpdate(BaseModel):
job_id: int
status: str
@router.post("/update_status/")
async def update_status(data: StatusUpdate,
session: AsyncSession = Depends(get_async_session)):
job = await session.execute(select(AppliedJob).where(AppliedJob.id == data.job_id))
job = job.scalars().first()
if not job:
raise HTTPException(status_code=404, detail="Job not found")
job.status = data.status
await session.commit()
return {"message": "Статус обновлён", "job_id": job.id, "new_status": job.status}
class AssigneeUpdate(BaseModel):
job_id: int
assignee_id: int
@router.post("/update_assignee/")
async def update_assignee(data: AssigneeUpdate,
session: AsyncSession = Depends(get_async_session)):
job = await session.execute(select(AppliedJob).where(AppliedJob.id == data.job_id))
job = job.scalars().first()
if not job:
raise HTTPException(status_code=404, detail="Job not found")
user = await session.execute(select(User).where(User.id == data.assignee_id))
user = user.scalars().first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
job.assignee = data.assignee_id # 👈 Убрал _id, чтобы совпадало с моделью
await session.commit()
return {"message": "Исполнитель обновлён", "job_id": job.id, "new_assignee": job.assignee}