69 lines
3.0 KiB
Python
69 lines
3.0 KiB
Python
from fastapi import FastAPI, Depends, HTTPException, status, Form, Response, Request, APIRouter
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.future import select
|
|
from passlib.context import CryptContext
|
|
import jwt
|
|
from datetime import datetime, timedelta
|
|
from model.database import get_async_session, User
|
|
|
|
SECRET_KEY = "your_secret_key"
|
|
REFRESH_SECRET_KEY = "your_refresh_secret_key"
|
|
ALGORITHM = "HS256"
|
|
ACCESS_TOKEN_EXPIRE_MINUTES = 1
|
|
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
|
|
|
def create_access_token(data: dict, expires_delta: timedelta | None = None):
|
|
to_encode = data.copy()
|
|
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
|
|
to_encode.update({"exp": expire})
|
|
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
|
|
|
|
async def authenticate_user(db: AsyncSession, username: str, password: str):
|
|
result = await db.execute(select(User).where(User.username == username))
|
|
user = result.scalars().first()
|
|
return user if user and pwd_context.verify(password, user.hashed_password) else None
|
|
|
|
router = APIRouter(prefix="/auth", tags=["auth"])
|
|
|
|
@router.post("/logins")
|
|
async def logins(username: str = Form(...), password: str = Form(...), db: AsyncSession = Depends(get_async_session)):
|
|
user = await authenticate_user(db, username, password)
|
|
if not user:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Неправильный логин или пароль")
|
|
|
|
access_token = create_access_token({"sub": user.username, "role": user.role})
|
|
refresh_token = create_access_token({"sub": user.username}, expires_delta=timedelta(days=7))
|
|
|
|
response = Response()
|
|
response.set_cookie(key="access_token", value=access_token, httponly=True)
|
|
response.set_cookie(key="refresh_token", value=refresh_token, httponly=True)
|
|
|
|
return {"message": "Успешный вход"}
|
|
|
|
@router.post("/logout")
|
|
async def logout(response: Response):
|
|
response.delete_cookie("access_token")
|
|
response.delete_cookie("refresh_token")
|
|
return {"message": "Вы вышли"}
|
|
|
|
@router.post("/refresh")
|
|
async def refresh_token(request: Request, response: Response):
|
|
refresh_token = request.cookies.get("refresh_token")
|
|
if not refresh_token:
|
|
raise HTTPException(status_code=401, detail="❌ Refresh token отсутствует")
|
|
|
|
try:
|
|
payload = jwt.decode(refresh_token, REFRESH_SECRET_KEY, algorithms=[ALGORITHM])
|
|
username = payload.get("sub")
|
|
if not username:
|
|
raise HTTPException(status_code=401, detail="❌ Ошибка токена")
|
|
except jwt.ExpiredSignatureError:
|
|
raise HTTPException(status_code=401, detail="❌ Refresh token истёк")
|
|
except jwt.InvalidTokenError:
|
|
raise HTTPException(status_code=401, detail="❌ Ошибка refresh токена")
|
|
|
|
new_access_token = create_access_token({"sub": username})
|
|
response.set_cookie(key="access_token", value=new_access_token, httponly=True)
|
|
|
|
return {"access_token": new_access_token}
|