|
4 | 4 | import secrets |
5 | 5 | from datetime import datetime, timedelta, timezone |
6 | 6 | from email.message import EmailMessage |
| 7 | +from enum import Enum |
7 | 8 | from functools import wraps |
8 | 9 |
|
9 | 10 | import aiosmtplib |
|
18 | 19 | from fastapi.responses import RedirectResponse |
19 | 20 | from pydantic import BaseModel, field_validator |
20 | 21 | from sqlalchemy.ext.asyncio import AsyncSession |
| 22 | +from sqlalchemy import select |
21 | 23 |
|
22 | 24 | from db import get_db |
23 | 25 | from models.user import User |
|
28 | 30 | r = redis.Redis(password=os.getenv("REDIS_PASSWORD", ""), host=HOST) |
29 | 31 |
|
30 | 32 |
|
| 33 | +class Permission(Enum): |
| 34 | + """User permissions""" |
| 35 | + |
| 36 | + ADMIN = 0 |
| 37 | + |
| 38 | + |
31 | 39 | class OtpClientRequest(BaseModel): |
32 | 40 | """OTP send request from client""" |
33 | 41 |
|
@@ -115,44 +123,51 @@ async def wrapper(request: Request, *args, **kwargs): |
115 | 123 | # return wrapper |
116 | 124 |
|
117 | 125 |
|
118 | | -# @decorator |
119 | | -def require_admin(func): |
120 | | - """Require admin status""" |
121 | | - |
122 | | - async def wrapper(*args, request: Request, **kwargs): |
123 | | - if not await is_user_authenticated(request) or not await is_user_admin(): |
124 | | - return RedirectResponse("/login", status_code=418) |
125 | | - elif await is_user_authenticated(request) and not await is_user_admin(): |
126 | | - return RedirectResponse("/home", status_code=403) |
127 | | - return await func(request, *args, **kwargs) |
| 126 | +def permission_dependency(permission: Permission): |
| 127 | + """Permission dependency""" |
128 | 128 |
|
129 | | - return wrapper |
| 129 | + async def verifier( |
| 130 | + request: Request, |
| 131 | + session: AsyncSession = Depends(get_db), |
| 132 | + ): |
| 133 | + payload = await is_user_authenticated(request) |
| 134 | + result = await session.execute(select(User).where(User.email == payload["sub"])) |
| 135 | + user = result.scalar_one_or_none() |
| 136 | + if not user: |
| 137 | + raise HTTPException(status_code=401) |
| 138 | + if permission.value not in (user.permissions or []): |
| 139 | + raise HTTPException(status_code=403) |
| 140 | + request.state.user = user |
| 141 | + |
| 142 | + return verifier |
130 | 143 |
|
131 | 144 |
|
132 | 145 | # @decorator |
133 | | -def require_reviewer(func): |
134 | | - """Require reviewer status""" |
135 | | - |
136 | | - async def wrapper(*args, request: Request, **kwargs): |
137 | | - if not await is_user_authenticated(request) or not await is_user_reviewer(): |
138 | | - return RedirectResponse("/login", status_code=418) |
139 | | - elif await is_user_authenticated(request) and not await is_user_reviewer(): |
140 | | - return RedirectResponse("/home", status_code=403) |
141 | | - return await func(request, *args, **kwargs) |
| 146 | +# def require_admin(func): |
| 147 | +# """Require admin status""" |
142 | 148 |
|
143 | | - return wrapper |
| 149 | +# async def wrapper(*args, request: Request, **kwargs): |
| 150 | +# if not await is_user_authenticated(request) or not await is_user_admin(): |
| 151 | +# return RedirectResponse("/login", status_code=418) |
| 152 | +# elif await is_user_authenticated(request) and not await is_user_admin(): |
| 153 | +# return RedirectResponse("/home", status_code=403) |
| 154 | +# return await func(request, *args, **kwargs) |
144 | 155 |
|
| 156 | +# return wrapper |
145 | 157 |
|
146 | | -async def is_user_admin() -> bool: |
147 | | - """Check if user is an admin""" |
148 | | - # TODO: admin check |
149 | | - return False |
150 | 158 |
|
| 159 | +# @decorator |
| 160 | +# def require_reviewer(func): |
| 161 | +# """Require reviewer status""" |
151 | 162 |
|
152 | | -async def is_user_reviewer() -> bool: |
153 | | - """Check if user is a reviewer""" |
154 | | - # TODO: reviewer check |
155 | | - return False |
| 163 | +# async def wrapper(*args, request: Request, **kwargs): |
| 164 | +# if not await is_user_authenticated(request) or not await is_user_reviewer(): |
| 165 | +# return RedirectResponse("/login", status_code=418) |
| 166 | +# elif await is_user_authenticated(request) and not await is_user_reviewer(): |
| 167 | +# return RedirectResponse("/home", status_code=403) |
| 168 | +# return await func(request, *args, **kwargs) |
| 169 | + |
| 170 | +# return wrapper |
156 | 171 |
|
157 | 172 |
|
158 | 173 | async def is_user_authenticated(request: Request) -> dict: |
|
0 commit comments