linkedin2/routers/auth1.py

69 lines
3.0 KiB
Python

from fastapi import FastAPI, Depends, HTTPException, status, Form, Response, Request, APIRouter
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from passlib.context import CryptContext
import jwt
from datetime import datetime, timedelta
from model.database import get_async_session, User
SECRET_KEY = "your_secret_key"
REFRESH_SECRET_KEY = "your_refresh_secret_key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 1
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
async def authenticate_user(db: AsyncSession, username: str, password: str):
result = await db.execute(select(User).where(User.username == username))
user = result.scalars().first()
return user if user and pwd_context.verify(password, user.hashed_password) else None
router = APIRouter(prefix="/auth", tags=["auth"])
@router.post("/logins")
async def logins(username: str = Form(...), password: str = Form(...), db: AsyncSession = Depends(get_async_session)):
user = await authenticate_user(db, username, password)
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Неправильный логин или пароль")
access_token = create_access_token({"sub": user.username, "role": user.role})
refresh_token = create_access_token({"sub": user.username}, expires_delta=timedelta(days=7))
response = Response()
response.set_cookie(key="access_token", value=access_token, httponly=True)
response.set_cookie(key="refresh_token", value=refresh_token, httponly=True)
return {"message": "Успешный вход"}
@router.post("/logout")
async def logout(response: Response):
response.delete_cookie("access_token")
response.delete_cookie("refresh_token")
return {"message": "Вы вышли"}
@router.post("/refresh")
async def refresh_token(request: Request, response: Response):
refresh_token = request.cookies.get("refresh_token")
if not refresh_token:
raise HTTPException(status_code=401, detail="❌ Refresh token отсутствует")
try:
payload = jwt.decode(refresh_token, REFRESH_SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
if not username:
raise HTTPException(status_code=401, detail="❌ Ошибка токена")
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="❌ Refresh token истёк")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="❌ Ошибка refresh токена")
new_access_token = create_access_token({"sub": username})
response.set_cookie(key="access_token", value=new_access_token, httponly=True)
return {"access_token": new_access_token}