Oauth2๏ธโฃ โ¶
๐ ๐ช โ๏ธ Oauth2๏ธโฃ โ ๐ โฎ๏ธ FastAPI, ๐ซ ๐ ๏ธ ๐ท ๐.
๐ ๐ โ ๐ โ๏ธ ๐ ๐-๐งฝ โ โ๏ธ, ๐ Oauth2๏ธโฃ ๐ฉ, ๐ ๏ธ ๐ ๐ ๐ ๐ธ (& ๐ ๏ธ ๐ฉบ).
Oauth2๏ธโฃ โฎ๏ธ โ ๐ ๏ธ โ๏ธ ๐ ๐ฆ ๐ค ๐โ๐ฆบ, ๐ ๐ฑ๐, ๐บ๐ธ๐, ๐, ๐คธโโ, ๐ฑ๐, โ๏ธ. ๐ซ โ๏ธ โซ๏ธ ๐ ๐ฏ โ ๐ฉโ๐ป & ๐ธ.
๐ ๐ฐ ๐ "๐น โฎ๏ธ" ๐ฑ๐, ๐บ๐ธ๐, ๐, ๐คธโโ, ๐ฑ๐, ๐ ๐ธ โ๏ธ Oauth2๏ธโฃ โฎ๏ธ โ.
๐ ๐ ๐ ๐ ๐ โ ๐ ๏ธ ๐ค & โ โฎ๏ธ ๐ Oauth2๏ธโฃ โฎ๏ธ โ ๐ FastAPI ๐ธ.
Warning
๐ ๐ โ๏ธ ๐ ๐ง ๐. ๐ฅ ๐ โถ๏ธ, ๐ ๐ช ๐ถ โซ๏ธ.
๐ ๐ซ ๐ฏ ๐ช Oauth2๏ธโฃ โ, & ๐ ๐ช ๐ต ๐ค & โ ๐ ๐ ๐.
โ๏ธ Oauth2๏ธโฃ โฎ๏ธ โ ๐ช ๐ ๐ ๏ธ ๐ ๐ ๐ ๏ธ (โฎ๏ธ ๐) & ๐ ๐ ๏ธ ๐ฉบ.
๐, ๐ ๐ ๏ธ ๐ โ, โ๏ธ ๐ ๐ ๐โโ/โ ๐, ๐ ๐ ๐ช, ๐ ๐.
๐ ๐ผ, Oauth2๏ธโฃ โฎ๏ธ โ ๐ช ๐น.
โ๏ธ ๐ฅ ๐ ๐ญ ๐ ๐ช โซ๏ธ, โ๏ธ ๐ ๐, ๐ง ๐.
Oauth2๏ธโฃ โ & ๐¶
Oauth2๏ธโฃ ๐ง ๐ฌ "โ" ๐ ๐ป ๐ ๐.
๐ ๐ ๐ ๐ป ๐ช โ๏ธ ๐ ๐, โ๏ธ ๐ ๐ซ ๐ ๐.
๐ซ โ ๐จ "โ".
๐ (โ ๐ ๏ธ ๐ฉบ), ๐ ๐ช ๐ฌ "๐โโ โ".
๐โ 1๏ธโฃ ๐ซ ๐โโ โ โ๏ธ Oauth2๏ธโฃ, ๐ ๐ช ๐ฃ & โ๏ธ โ.
๐ "โ" ๐ป (๐ต ๐).
๐ซ ๐ โ๏ธ ๐ฃ ๐ฏ ๐โโ โ, ๐ผ:
users:read
โ๏ธusers:write
โ ๐ผ.instagram_basic
โ๏ธ ๐ฑ๐ / ๐ฑ๐.https://www.googleapis.com/auth/drive
โ๏ธ ๐บ๐ธ๐.
Info
Oauth2๏ธโฃ "โ" ๐ป ๐ ๐ฃ ๐ฏ โ โ.
โซ๏ธ ๐ซ ๐ค ๐ฅ โซ๏ธ โ๏ธ ๐ ๐ฆน ๐ :
โ๏ธ ๐ฅ โซ๏ธ ๐.
๐ โน ๐ ๏ธ ๐ฏ.
Oauth2๏ธโฃ ๐ซ ๐ป.
๐ ๐¶
๐ฅ, โก๏ธ ๐ ๐ ๐ ๐ ๐ โช๏ธโก๏ธ ๐ผ ๐ ๐ฐ - ๐ฉโ๐ป ๐ฆฎ Oauth2๏ธโฃ โฎ๏ธ ๐ (& ๐), ๐จ โฎ๏ธ ๐ฅ ๐ค. ๐ โ๏ธ Oauth2๏ธโฃ โ:
from datetime import datetime, timedelta, timezone
from typing import List, Union
from fastapi import Depends, FastAPI, HTTPException, Security, status
from fastapi.security import (
OAuth2PasswordBearer,
OAuth2PasswordRequestForm,
SecurityScopes,
)
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel, ValidationError
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Chains",
"email": "alicechains@example.com",
"hashed_password": "$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm",
"disabled": True,
},
}
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Union[str, None] = None
scopes: List[str] = []
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="token",
scopes={"me": "Read information about the current user.", "items": "Read items."},
)
app = FastAPI()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(
security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme)
):
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = "Bearer"
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_scopes = payload.get("scopes", [])
token_data = TokenData(scopes=token_scopes, username=username)
except (JWTError, ValidationError):
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
for scope in security_scopes.scopes:
if scope not in token_data.scopes:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not enough permissions",
headers={"WWW-Authenticate": authenticate_value},
)
return user
async def get_current_active_user(
current_user: User = Security(get_current_user, scopes=["me"])
):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=400, detail="Incorrect username or password")
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username, "scopes": form_data.scopes},
expires_delta=access_token_expires,
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get("/users/me/items/")
async def read_own_items(
current_user: User = Security(get_current_active_user, scopes=["items"])
):
return [{"item_id": "Foo", "owner": current_user.username}]
@app.get("/status/")
async def read_system_status(current_user: User = Depends(get_current_user)):
return {"status": "ok"}
๐ โก๏ธ ๐ ๐ ๐ ๐ ๐.
Oauth2๏ธโฃ ๐โโ โ¶
๐ฅ ๐ ๐ ๐ ๐ฅ ๐ฃ Oauth2๏ธโฃ ๐โโ โ โฎ๏ธ 2๏ธโฃ ๐ช โ, me
& items
.
scopes
๐ข ๐จ dict
โฎ๏ธ ๐ โ ๐ & ๐ ๐ฒ:
from datetime import datetime, timedelta, timezone
from typing import List, Union
from fastapi import Depends, FastAPI, HTTPException, Security, status
from fastapi.security import (
OAuth2PasswordBearer,
OAuth2PasswordRequestForm,
SecurityScopes,
)
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel, ValidationError
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Chains",
"email": "alicechains@example.com",
"hashed_password": "$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm",
"disabled": True,
},
}
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Union[str, None] = None
scopes: List[str] = []
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="token",
scopes={"me": "Read information about the current user.", "items": "Read items."},
)
app = FastAPI()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(
security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme)
):
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = "Bearer"
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_scopes = payload.get("scopes", [])
token_data = TokenData(scopes=token_scopes, username=username)
except (JWTError, ValidationError):
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
for scope in security_scopes.scopes:
if scope not in token_data.scopes:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not enough permissions",
headers={"WWW-Authenticate": authenticate_value},
)
return user
async def get_current_active_user(
current_user: User = Security(get_current_user, scopes=["me"])
):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=400, detail="Incorrect username or password")
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username, "scopes": form_data.scopes},
expires_delta=access_token_expires,
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get("/users/me/items/")
async def read_own_items(
current_user: User = Security(get_current_active_user, scopes=["items"])
):
return [{"item_id": "Foo", "owner": current_user.username}]
@app.get("/status/")
async def read_system_status(current_user: User = Depends(get_current_user)):
return {"status": "ok"}
โฉ๏ธ ๐ฅ ๐ ๐ฃ ๐ โ, ๐ซ ๐ ๐ฆ ๐ ๐ ๏ธ ๐ฉบ ๐โ ๐ ๐น-/โ.
& ๐ ๐ ๐ช ๐ โ โ ๐ ๐ ๐ค ๐: me
& items
.
๐ ๐ ๐ ๏ธ โ๏ธ ๐โ ๐ ๐ค โ โช ๐จ โฎ๏ธ ๐ฑ๐, ๐บ๐ธ๐, ๐, โ๏ธ:
๐ฅ ๐ค โฎ๏ธ โ¶
๐, ๐ ๐ค โก ๐ ๏ธ ๐จ โ ๐จ.
๐ฅ โ๏ธ ๐ OAuth2PasswordRequestForm
. โซ๏ธ ๐ ๐ scopes
โฎ๏ธ list
str
, โฎ๏ธ ๐ โ โซ๏ธ ๐จ ๐จ.
& ๐ฅ ๐จ โ ๐ ๐ฅ ๐ค.
Danger
๐ฆ, ๐ฅ ๐ฅ โ โ ๐จ ๐ ๐ค.
โ๏ธ ๐ ๐ธ, ๐โโ, ๐ ๐ โ ๐ญ ๐ ๐ด ๐ฎ โ ๐ ๐ฉโ๐ป ๐ค ๐ช โ๏ธ, โ๏ธ ๐ ๐ โ๏ธ ๐.
from datetime import datetime, timedelta, timezone
from typing import List, Union
from fastapi import Depends, FastAPI, HTTPException, Security, status
from fastapi.security import (
OAuth2PasswordBearer,
OAuth2PasswordRequestForm,
SecurityScopes,
)
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel, ValidationError
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Chains",
"email": "alicechains@example.com",
"hashed_password": "$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm",
"disabled": True,
},
}
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Union[str, None] = None
scopes: List[str] = []
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="token",
scopes={"me": "Read information about the current user.", "items": "Read items."},
)
app = FastAPI()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(
security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme)
):
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = "Bearer"
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_scopes = payload.get("scopes", [])
token_data = TokenData(scopes=token_scopes, username=username)
except (JWTError, ValidationError):
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
for scope in security_scopes.scopes:
if scope not in token_data.scopes:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not enough permissions",
headers={"WWW-Authenticate": authenticate_value},
)
return user
async def get_current_active_user(
current_user: User = Security(get_current_user, scopes=["me"])
):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=400, detail="Incorrect username or password")
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username, "scopes": form_data.scopes},
expires_delta=access_token_expires,
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get("/users/me/items/")
async def read_own_items(
current_user: User = Security(get_current_active_user, scopes=["items"])
):
return [{"item_id": "Foo", "owner": current_user.username}]
@app.get("/status/")
async def read_system_status(current_user: User = Depends(get_current_user)):
return {"status": "ok"}
๐ฃ โ โก ๐ ๏ธ & ๐¶
๐ ๐ฅ ๐ฃ ๐ โก ๐ ๏ธ /users/me/items/
๐ โ items
.
๐, ๐ฅ ๐ & โ๏ธ Security
โช๏ธโก๏ธ fastapi
.
๐ ๐ช โ๏ธ Security
๐ฃ ๐ (๐ Depends
), โ๏ธ Security
๐จ ๐ข scopes
โฎ๏ธ ๐ โ (๐ป).
๐ ๐ผ, ๐ฅ ๐ถโโ๏ธ ๐ ๐ข get_current_active_user
Security
(๐ ๐ ๐ฅ ๐ โฎ๏ธ Depends
).
โ๏ธ ๐ฅ ๐ถโโ๏ธ list
โ, ๐ ๐ผ โฎ๏ธ 1๏ธโฃ โ: items
(โซ๏ธ ๐ช โ๏ธ ๐
).
& ๐ ๐ข get_current_active_user
๐ช ๐ฃ ๐ง-๐, ๐ซ ๐ด โฎ๏ธ Depends
โ๏ธ โฎ๏ธ Security
. ๐ฃ ๐ฎ ๐ ๐ง-๐ ๐ข (get_current_user
), & ๐ โ ๐.
๐ ๐ผ, โซ๏ธ ๐ โ me
(โซ๏ธ ๐ช ๐ ๐
๐ 1๏ธโฃ โ).
Note
๐ ๐ซ ๐ฏ ๐ช ๐ฎ ๐ โ ๐ ๐ฅ.
๐ฅ ๐จ โซ๏ธ ๐ฅ ๐ฆ โ FastAPI ๐ต โ ๐ฃ ๐ ๐.
from datetime import datetime, timedelta, timezone
from typing import List, Union
from fastapi import Depends, FastAPI, HTTPException, Security, status
from fastapi.security import (
OAuth2PasswordBearer,
OAuth2PasswordRequestForm,
SecurityScopes,
)
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel, ValidationError
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Chains",
"email": "alicechains@example.com",
"hashed_password": "$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm",
"disabled": True,
},
}
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Union[str, None] = None
scopes: List[str] = []
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="token",
scopes={"me": "Read information about the current user.", "items": "Read items."},
)
app = FastAPI()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(
security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme)
):
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = "Bearer"
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_scopes = payload.get("scopes", [])
token_data = TokenData(scopes=token_scopes, username=username)
except (JWTError, ValidationError):
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
for scope in security_scopes.scopes:
if scope not in token_data.scopes:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not enough permissions",
headers={"WWW-Authenticate": authenticate_value},
)
return user
async def get_current_active_user(
current_user: User = Security(get_current_user, scopes=["me"])
):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=400, detail="Incorrect username or password")
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username, "scopes": form_data.scopes},
expires_delta=access_token_expires,
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get("/users/me/items/")
async def read_own_items(
current_user: User = Security(get_current_active_user, scopes=["items"])
):
return [{"item_id": "Foo", "owner": current_user.username}]
@app.get("/status/")
async def read_system_status(current_user: User = Depends(get_current_user)):
return {"status": "ok"}
๐ก โน
Security
๐ค ๐ฟ Depends
, & โซ๏ธ โ๏ธ 1๏ธโฃ โ ๐ข ๐ ๐ฅ ๐ ๐ โช.
โ๏ธ โ๏ธ Security
โฉ๏ธ Depends
, FastAPI ๐ ๐ญ ๐ โซ๏ธ ๐ช ๐ฃ ๐โโ โ, โ๏ธ ๐ซ ๐, & ๐ ๐ ๏ธ โฎ๏ธ ๐.
โ๏ธ ๐โ ๐ ๐ Query
, Path
, Depends
, Security
& ๐ โช๏ธโก๏ธ fastapi
, ๐ ๐ค ๐ข ๐ ๐จ ๐ ๐.
โ๏ธ SecurityScopes
¶
๐ โน ๐ get_current_user
.
๐ 1๏ธโฃ โ๏ธ ๐ ๐.
๐ฅ ๐ฅ โ๏ธ ๐ Oauth2๏ธโฃ โ ๐ฅ โ โญ, ๐ฃ โซ๏ธ ๐: oauth2_scheme
.
โฉ๏ธ ๐ ๐ ๐ข ๐ซ โ๏ธ ๐ โ ๐ โซ๏ธ, ๐ฅ ๐ช โ๏ธ Depends
โฎ๏ธ oauth2_scheme
, ๐ฅ ๐ซ โ๏ธ โ๏ธ Security
๐โ ๐ฅ ๐ซ ๐ช โ ๐โโ โ.
๐ฅ ๐ฃ ๐ ๐ข ๐ SecurityScopes
, ๐ โช๏ธโก๏ธ fastapi.security
.
๐ SecurityScopes
๐ ๐ Request
(Request
โ๏ธ ๐ค ๐จ ๐ ๐).
from datetime import datetime, timedelta, timezone
from typing import List, Union
from fastapi import Depends, FastAPI, HTTPException, Security, status
from fastapi.security import (
OAuth2PasswordBearer,
OAuth2PasswordRequestForm,
SecurityScopes,
)
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel, ValidationError
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Chains",
"email": "alicechains@example.com",
"hashed_password": "$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm",
"disabled": True,
},
}
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Union[str, None] = None
scopes: List[str] = []
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="token",
scopes={"me": "Read information about the current user.", "items": "Read items."},
)
app = FastAPI()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(
security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme)
):
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = "Bearer"
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_scopes = payload.get("scopes", [])
token_data = TokenData(scopes=token_scopes, username=username)
except (JWTError, ValidationError):
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
for scope in security_scopes.scopes:
if scope not in token_data.scopes:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not enough permissions",
headers={"WWW-Authenticate": authenticate_value},
)
return user
async def get_current_active_user(
current_user: User = Security(get_current_user, scopes=["me"])
):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=400, detail="Incorrect username or password")
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username, "scopes": form_data.scopes},
expires_delta=access_token_expires,
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get("/users/me/items/")
async def read_own_items(
current_user: User = Security(get_current_active_user, scopes=["items"])
):
return [{"item_id": "Foo", "owner": current_user.username}]
@app.get("/status/")
async def read_system_status(current_user: User = Depends(get_current_user)):
return {"status": "ok"}
โ๏ธ scopes
¶
๐ข security_scopes
๐ ๐ SecurityScopes
.
โซ๏ธ ๐ โ๏ธ ๐ scopes
โฎ๏ธ ๐ โ ๐ โ โ โซ๏ธ & ๐ ๐ ๐ โ๏ธ ๐ ๐ง-๐. ๐ โ, ๐ "โ๏ธ"... ๐ ๐ช ๐ ๐จ, โซ๏ธ ๐ฌ ๐ โช ๐.
security_scopes
๐ (๐ SecurityScopes
) ๐ scope_str
๐ข โฎ๏ธ ๐ ๐ป, ๐ ๐ โ ๐ฝ ๐ (๐ฅ ๐ โ๏ธ โซ๏ธ).
๐ฅ โ HTTPException
๐ ๐ฅ ๐ช ๐ค-โ๏ธ (raise
) โช ๐ โ.
๐ โ , ๐ฅ ๐ โ ๐ (๐ฅ ๐) ๐ป ๐ฝ ๐ (โ๏ธ scope_str
). ๐ฅ ๐ฎ ๐ ๐ป โ โ WWW-Authenticate
๐ (๐ ๐ ๐).
from datetime import datetime, timedelta, timezone
from typing import List, Union
from fastapi import Depends, FastAPI, HTTPException, Security, status
from fastapi.security import (
OAuth2PasswordBearer,
OAuth2PasswordRequestForm,
SecurityScopes,
)
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel, ValidationError
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Chains",
"email": "alicechains@example.com",
"hashed_password": "$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm",
"disabled": True,
},
}
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Union[str, None] = None
scopes: List[str] = []
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="token",
scopes={"me": "Read information about the current user.", "items": "Read items."},
)
app = FastAPI()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(
security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme)
):
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = "Bearer"
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_scopes = payload.get("scopes", [])
token_data = TokenData(scopes=token_scopes, username=username)
except (JWTError, ValidationError):
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
for scope in security_scopes.scopes:
if scope not in token_data.scopes:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not enough permissions",
headers={"WWW-Authenticate": authenticate_value},
)
return user
async def get_current_active_user(
current_user: User = Security(get_current_user, scopes=["me"])
):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=400, detail="Incorrect username or password")
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username, "scopes": form_data.scopes},
expires_delta=access_token_expires,
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get("/users/me/items/")
async def read_own_items(
current_user: User = Security(get_current_active_user, scopes=["items"])
):
return [{"item_id": "Foo", "owner": current_user.username}]
@app.get("/status/")
async def read_system_status(current_user: User = Depends(get_current_user)):
return {"status": "ok"}
โ username
& ๐ฝ ๐ ¶
๐ฅ โ ๐ ๐ฅ ๐ค username
, & โ โ.
& โคด๏ธ ๐ฅ โ ๐ ๐ โฎ๏ธ Pydantic ๐ท (โ ValidationError
โ ), & ๐ฅ ๐ฅ ๐ค โ ๐ ๐ฅ ๐ค โ๏ธ โ ๐ โฎ๏ธ Pydantic, ๐ฅ ๐ค HTTPException
๐ฅ โ โญ.
๐, ๐ฅ โน Pydantic ๐ท TokenData
โฎ๏ธ ๐ ๐ scopes
.
โ ๐ โฎ๏ธ Pydantic ๐ฅ ๐ช โ ๐ญ ๐ ๐ฅ โ๏ธ, ๐ผ, โซ๏ธโ list
str
โฎ๏ธ โ & str
โฎ๏ธ username
.
โฉ๏ธ, ๐ผ, dict
, โ๏ธ ๐ณ ๐, โซ๏ธ ๐ช ๐ ๐ธ โ โช, โ โซ๏ธ ๐โโ โ .
๐ฅ โ ๐ ๐ฅ โ๏ธ ๐ฉโ๐ป โฎ๏ธ ๐ ๐, & ๐ฅ ๐ซ, ๐ฅ ๐ค ๐ ๐ โ ๐ฅ โ โญ.
from datetime import datetime, timedelta, timezone
from typing import List, Union
from fastapi import Depends, FastAPI, HTTPException, Security, status
from fastapi.security import (
OAuth2PasswordBearer,
OAuth2PasswordRequestForm,
SecurityScopes,
)
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel, ValidationError
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Chains",
"email": "alicechains@example.com",
"hashed_password": "$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm",
"disabled": True,
},
}
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Union[str, None] = None
scopes: List[str] = []
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="token",
scopes={"me": "Read information about the current user.", "items": "Read items."},
)
app = FastAPI()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(
security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme)
):
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = "Bearer"
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_scopes = payload.get("scopes", [])
token_data = TokenData(scopes=token_scopes, username=username)
except (JWTError, ValidationError):
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
for scope in security_scopes.scopes:
if scope not in token_data.scopes:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not enough permissions",
headers={"WWW-Authenticate": authenticate_value},
)
return user
async def get_current_active_user(
current_user: User = Security(get_current_user, scopes=["me"])
):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=400, detail="Incorrect username or password")
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username, "scopes": form_data.scopes},
expires_delta=access_token_expires,
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get("/users/me/items/")
async def read_own_items(
current_user: User = Security(get_current_active_user, scopes=["items"])
):
return [{"item_id": "Foo", "owner": current_user.username}]
@app.get("/status/")
async def read_system_status(current_user: User = Depends(get_current_user)):
return {"status": "ok"}
โ scopes
¶
๐ฅ ๐ โ ๐ ๐ โ โ, ๐ ๐ & ๐ โ๏ธ (๐ โก ๐ ๏ธ), ๐ โ ๐ ๐ค ๐จ, โช ๐ค HTTPException
.
๐, ๐ฅ โ๏ธ security_scopes.scopes
, ๐ ๐ list
โฎ๏ธ ๐ ๐ซ โ str
.
from datetime import datetime, timedelta, timezone
from typing import List, Union
from fastapi import Depends, FastAPI, HTTPException, Security, status
from fastapi.security import (
OAuth2PasswordBearer,
OAuth2PasswordRequestForm,
SecurityScopes,
)
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel, ValidationError
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Chains",
"email": "alicechains@example.com",
"hashed_password": "$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm",
"disabled": True,
},
}
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Union[str, None] = None
scopes: List[str] = []
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="token",
scopes={"me": "Read information about the current user.", "items": "Read items."},
)
app = FastAPI()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(
security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme)
):
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = "Bearer"
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_scopes = payload.get("scopes", [])
token_data = TokenData(scopes=token_scopes, username=username)
except (JWTError, ValidationError):
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
for scope in security_scopes.scopes:
if scope not in token_data.scopes:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not enough permissions",
headers={"WWW-Authenticate": authenticate_value},
)
return user
async def get_current_active_user(
current_user: User = Security(get_current_user, scopes=["me"])
):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=400, detail="Incorrect username or password")
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username, "scopes": form_data.scopes},
expires_delta=access_token_expires,
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get("/users/me/items/")
async def read_own_items(
current_user: User = Security(get_current_active_user, scopes=["items"])
):
return [{"item_id": "Foo", "owner": current_user.username}]
@app.get("/status/")
async def read_system_status(current_user: User = Depends(get_current_user)):
return {"status": "ok"}
๐ ๐ฒ & โ¶
โก๏ธ ๐ ๐ ๐ ๐ ๐ฒ & โ.
get_current_active_user
๐ โ๏ธ ๐ง-๐ ๐ get_current_user
, โ "me"
๐ฃ get_current_active_user
๐ ๐ ๐ โ โ security_scopes.scopes
๐ถโโ๏ธ get_current_user
.
โก ๐ ๏ธ โซ๏ธ ๐ฃ โ, "items"
, ๐ ๐ ๐ security_scopes.scopes
๐ถโโ๏ธ get_current_user
.
๐ฅ โ ๐ ๐ & โ ๐ ๐:
- โก ๐ ๏ธ
read_own_items
โ๏ธ:- โ โ
["items"]
โฎ๏ธ ๐: get_current_active_user
:- ๐ ๐ข
get_current_active_user
โ๏ธ:- โ โ
["me"]
โฎ๏ธ ๐: get_current_user
:- ๐ ๐ข
get_current_user
โ๏ธ:- ๐ โโ โ โ โซ๏ธ.
- ๐ โ๏ธ
oauth2_scheme
. security_scopes
๐ข ๐SecurityScopes
:- ๐
security_scopes
๐ข โ๏ธ ๐scopes
โฎ๏ธlist
โ ๐ ๐ซ โ ๐ฃ ๐,:security_scopes.scopes
๐ ๐["me", "items"]
โก ๐ ๏ธread_own_items
.security_scopes.scopes
๐ ๐["me"]
โก ๐ ๏ธread_users_me
, โฉ๏ธ โซ๏ธ ๐ฃ ๐get_current_active_user
.security_scopes.scopes
๐ ๐[]
(๐ณ) โก ๐ ๏ธread_system_status
, โฉ๏ธ โซ๏ธ ๐ซ ๐ฃ ๐Security
โฎ๏ธscopes
, & ๐ฎ ๐,get_current_user
, ๐ซ ๐ฃ ๐scope
๐ฏโโ๏ธ.
- ๐
- ๐ ๐ข
- โ โ
- ๐ ๐ข
- โ โ
Tip
โ & "๐ฑ" ๐ ๐ฅ ๐ get_current_user
๐ โ๏ธ ๐ ๐ scopes
โ
๐ โก ๐ ๏ธ.
๐ โ๏ธ ๐ scopes
๐ฃ ๐ โก ๐ ๏ธ & ๐ ๐ ๐ ๐ฒ ๐ ๐ฏ โก ๐ ๏ธ.
๐ โน ๐ SecurityScopes
¶
๐ ๐ช โ๏ธ SecurityScopes
๐ โ, & ๐ ๐ฅ, โซ๏ธ ๐ซ โ๏ธ "๐ฑ" ๐.
โซ๏ธ ๐ ๐ง โ๏ธ ๐โโ โ ๐ฃ โฎ๏ธ Security
๐ & ๐ โ๏ธ ๐ ๐ฏ โก ๐ ๏ธ & ๐ ๐ฏ ๐ ๐ฒ.
โฉ๏ธ SecurityScopes
๐ โ๏ธ ๐ โ ๐ฃ โ๏ธ, ๐ ๐ช โ๏ธ โซ๏ธ โ ๐ ๐ค โ๏ธ ๐ โ ๐จ๐ซ ๐ ๐ข, & โคด๏ธ ๐ฃ ๐ โ ๐ ๐ โก ๐ ๏ธ.
๐ซ ๐ โ โก ๐ โก ๐ ๏ธ.
โ โซ๏ธ¶
๐ฅ ๐ ๐ ๐ ๏ธ ๐ฉบ, ๐ ๐ช ๐ & โ โ โ ๐ ๐ โ.
๐ฅ ๐ ๐ซ ๐ ๐ โ, ๐ ๐ "๐", โ๏ธ ๐โ ๐ ๐ ๐ /users/me/
โ๏ธ /users/me/items/
๐ ๐ ๐ค โ ๐ฌ ๐ ๐ ๐ซ โ๏ธ ๐ฅ โ. ๐ ๐ ๐ช ๐ /status/
.
& ๐ฅ ๐ ๐ โ me
โ๏ธ ๐ซ โ items
, ๐ ๐ ๐ช ๐ /users/me/
โ๏ธ ๐ซ /users/me/items/
.
๐ โซ๏ธโ ๐ ๐จ ๐ฅ ๐ฅณ ๐ธ ๐ ๐ ๐ 1๏ธโฃ ๐ซ โก ๐ ๏ธ โฎ๏ธ ๐ค ๐ ๐ฉโ๐ป, โ๏ธ ๐ โ ๐ โ ๐ฉโ๐ป ๐ค ๐ธ.
๐ ๐ฅ ๐ฅณ ๐ ๏ธ¶
๐ ๐ผ ๐ฅ โ๏ธ Oauth2๏ธโฃ "๐" ๐ง.
๐ โ ๐โ ๐ฅ ๐จ ๐ ๐ ๐ธ, ๐ฒ โฎ๏ธ ๐ ๐ ๐ธ.
โฉ๏ธ ๐ฅ ๐ช ๐ โซ๏ธ ๐จ username
& password
, ๐ฅ ๐ โซ๏ธ.
โ๏ธ ๐ฅ ๐ ๐ Oauth2๏ธโฃ ๐ธ ๐ ๐ ๐ ๐ (โก, ๐ฅ ๐ ๐ ๐ค ๐โ๐ฆบ ๐ ๐ฑ๐, ๐บ๐ธ๐, ๐, โ๏ธ.) ๐ ๐ โ๏ธ 1๏ธโฃ ๐ ๐ง.
๐ โ ๐ ๐ง.
๐ ๐ ๐ ๐ง, โ๏ธ ๐ ๐ ๐ ๏ธ โซ๏ธ ๐ ๐ ๐ถ. โซ๏ธ ๐ ๐, ๐ ๐โ๐ฆบ ๐ ๐ โ ๐ ๐ง.
Note
โซ๏ธ โ ๐ ๐ ๐ค ๐โ๐ฆบ ๐ ๐ซ ๐ง ๐ ๐, โ โซ๏ธ ๐ ๐ซ ๐ท.
โ๏ธ ๐, ๐ซ ๐ ๏ธ ๐ Oauth2๏ธโฃ ๐ฉ.
FastAPI ๐ ๐ ๐ ๐ซ Oauth2๏ธโฃ ๐ค ๐ง fastapi.security.oauth2
.
Security
๐จโ๐จ dependencies
¶
๐ ๐ ๐ ๐ช ๐ฌ list
Depends
๐จโ๐จ dependencies
๐ข (๐ฌ ๐ โก ๐ ๏ธ ๐จโ๐จ), ๐ ๐ช โ๏ธ Security
โฎ๏ธ scopes
๐ค.