Oauth2๏ธโฃ โฎ๏ธ ๐ (& ๐), ๐จ โฎ๏ธ ๐ฅ ๐ค¶
๐ ๐ ๐ฅ โ๏ธ ๐ ๐โโ ๐ง, โก๏ธ โ ๐ธ ๐ค ๐, โ๏ธ ๐ฅ ๐ค & ๐ ๐ ๐.
๐ ๐ ๐ณ ๐ ๐ช ๐ค โ๏ธ ๐ ๐ธ, ๐ ๐ #๏ธโฃ ๐ ๐ฝ, โ๏ธ.
๐ฅ ๐ โถ๏ธ โช๏ธโก๏ธ ๐โ ๐ฅ โ๏ธ โฎ๏ธ ๐ & ๐ โซ๏ธ.
๐ ๐ฅ¶
๐ฅ โ "๐ป ๐ธ ๐ค".
โซ๏ธ ๐ฉ ๐ซ ๐ป ๐ ๐ ๐ง ๐ป ๐ต ๐. โซ๏ธ ๐ ๐ ๐:
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
โซ๏ธ ๐ซ ๐,, ๐ ๐ช ๐ก โน โช๏ธโก๏ธ ๐.
โ๏ธ โซ๏ธ ๐. , ๐โ ๐ ๐จ ๐ค ๐ ๐ โจ, ๐ ๐ช โ ๐ ๐ ๐ค โจ โซ๏ธ.
๐ ๐, ๐ ๐ช โ ๐ค โฎ๏ธ ๐, โก๏ธ ๐ฌ, 1๏ธโฃ ๐๏ธ. & โคด๏ธ ๐โ ๐ฉโ๐ป ๐ ๐ โญ ๐ โฎ๏ธ ๐ค, ๐ ๐ญ ๐ ๐ฉโ๐ป ๐น ๐ โ๏ธ.
โฎ๏ธ ๐๏ธ, ๐ค ๐ ๐ & ๐ฉโ๐ป ๐ ๐ซ โ & ๐ โ๏ธ ๐ ๐ ๐ค ๐ ๐ค. & ๐ฅ ๐ฉโ๐ป (โ๏ธ ๐ฅ ๐ฅณ) ๐ ๐ ๐ค ๐ ๐, ๐ ๐ ๐ช ๐ โซ๏ธ, โฉ๏ธ ๐ณ ๐ ๐ซ ๐.
๐ฅ ๐ ๐ ๐คพ โฎ๏ธ ๐ฅ ๐ค & ๐ โ ๐ซ ๐ท, โ https://jwt.io.
โ python-jose
¶
๐ฅ ๐ช โ python-jose
๐ & โ ๐ฅ ๐ค ๐:
$ pip install "python-jose[cryptography]"
---> 100%
๐-๐ฉ๐ฌ ๐ ๐ ๐ฉโ๐ป โ.
๐ฅ ๐ฅ โ๏ธ ๐ 1๏ธโฃ: )/โ.
Tip
๐ ๐ฐ โช โ๏ธ PyJWT.
โ๏ธ โซ๏ธ โน โ๏ธ ๐-๐ฉ๐ฌ โฉ๏ธ โซ๏ธ ๐ ๐ โ โช๏ธโก๏ธ PyJWT โ โ ๐ ๐ ๐ช ๐ช โช ๐โ ๐ ๐ ๏ธ โฎ๏ธ ๐ ๐งฐ.
๐ ๐¶
"๐" โ ๐ญ ๐ (๐ ๐ ๐ผ) ๐ ๐ ๐ข (๐ป) ๐ ๐ ๐ ๐.
๐โ ๐ ๐ถโโ๏ธ โซ๏ธโ ๐ ๐ (โซ๏ธโ ๐ ๐) ๐ ๐ค โซ๏ธโ ๐ ๐.
โ๏ธ ๐ ๐ซ๐ ๐ โช๏ธโก๏ธ ๐ ๐ ๐.
โซ๏ธโ โ๏ธ ๐ ๐¶
๐ฅ ๐ ๐ฝ ๐, ๐งโโ ๐ ๐ซ โ๏ธ ๐ ๐ฉโ๐ป' ๐ข ๐, ๐ด#๏ธโฃ.
, ๐งโโ ๐ ๐ซ ๐ช ๐ โ๏ธ ๐ ๐ โ1๏ธโฃ โ๏ธ (๐ ๐ฉโ๐ป โ๏ธ ๐ ๐ ๐, ๐ ๐ โ ).
โ passlib
¶
๐ธ๐ฒ ๐ ๐ ๐ฆ ๐ต ๐#๏ธโฃ.
โซ๏ธ ๐โ๐ฆบ ๐ ๐ ๐ ๐ & ๐ ๐ท โฎ๏ธ ๐ซ.
๐ ๐ "๐ก".
, โ ๐ธ๐ฒ โฎ๏ธ ๐ก:
$ pip install "passlib[bcrypt]"
---> 100%
Tip
โฎ๏ธ passlib
, ๐ ๐ช ๐ โซ๏ธ ๐ช โ ๐ โ โณ, ๐บ ๐โโ ๐-โ๏ธ ๐ ๐.
, ๐ ๐ ๐ช, ๐ผ, ๐ฐ ๐ ๐ โช๏ธโก๏ธ โณ ๐ธ ๐ฝ โฎ๏ธ FastAPI ๐ธ. โ๏ธ ๐ โ โณ ๐ธ โ๏ธ ๐ ๐ฝ.
& ๐ ๐ฉโ๐ป ๐ ๐ช ๐ณ โช๏ธโก๏ธ ๐ โณ ๐ฑ โ๏ธ โช๏ธโก๏ธ ๐ FastAPI ๐ฑ, ๐ ๐ฐ.
#๏ธโฃ & โ ๐¶
๐ ๐งฐ ๐ฅ ๐ช โช๏ธโก๏ธ passlib
.
โ ๐ธ๐ฒ "๐". ๐ โซ๏ธโ ๐ โ๏ธ #๏ธโฃ & โ ๐.
Tip
๐ธ๐ฒ ๐ โ๏ธ ๐ ๏ธ โ๏ธ ๐ ๐ ๐, ๐ ๐ข ๐ ๐ ๐ด โ โ ๐ซ, โ๏ธ.
๐ผ, ๐ ๐ช โ๏ธ โซ๏ธ โ & โ ๐ ๐ โ1๏ธโฃ โ๏ธ (๐ โณ) โ๏ธ #๏ธโฃ ๐ ๐ ๐ โฎ๏ธ ๐ ๐ ๐ ๐ก.
& ๐ โฎ๏ธ ๐ ๐ซ ๐ ๐ฐ.
โ ๐ ๐ข #๏ธโฃ ๐ ๐ โช๏ธโก๏ธ ๐ฉโ๐ป.
& โ1๏ธโฃ ๐ โ ๐ฅ ๐จ ๐ ๐ #๏ธโฃ ๐ช.
& โ1๏ธโฃ 1๏ธโฃ ๐ & ๐จ ๐ฉโ๐ป.
from datetime import datetime, timedelta, timezone
from typing import Union
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
}
}
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Union[str, None] = None
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
app = FastAPI()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get("/users/me/items/")
async def read_own_items(current_user: User = Depends(get_current_active_user)):
return [{"item_id": "Foo", "owner": current_user.username}]
from datetime import datetime, timedelta, timezone
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
}
}
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str | None = None
class User(BaseModel):
username: str
email: str | None = None
full_name: str | None = None
disabled: bool | None = None
class UserInDB(User):
hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
app = FastAPI()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get("/users/me/items/")
async def read_own_items(current_user: User = Depends(get_current_active_user)):
return [{"item_id": "Foo", "owner": current_user.username}]
Note
๐ฅ ๐ โ
๐ (โ) ๐ฝ fake_users_db
, ๐ ๐ ๐ โ #๏ธโฃ ๐ ๐ ๐ ๐: "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW"
.
๐ต ๐ฅ ๐ค¶
๐ ๐น โ.
โ ๐ฒ ใ ๐ ๐ ๐ โ๏ธ ๐ ๐ฅ ๐ค.
๐ ๐ ๐ฒ ใ ๐ โ๏ธ ๐:
$ openssl rand -hex 32
09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7
& ๐ ๐ข ๐ข SECRET_KEY
(๐ซ โ๏ธ 1๏ธโฃ ๐ผ).
โ ๐ข ALGORITHM
โฎ๏ธ ๐ โ๏ธ ๐ ๐ฅ ๐ค & โ โซ๏ธ "HS256"
.
โ ๐ข ๐ ๐ค.
๐ฌ Pydantic ๐ท ๐ ๐ โ๏ธ ๐ค ๐ ๐จ.
โ ๐ ๐ข ๐ ๐ ๐ ๐ค.
from datetime import datetime, timedelta, timezone
from typing import Union
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
}
}
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Union[str, None] = None
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
app = FastAPI()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get("/users/me/items/")
async def read_own_items(current_user: User = Depends(get_current_active_user)):
return [{"item_id": "Foo", "owner": current_user.username}]
from datetime import datetime, timedelta, timezone
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
}
}
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str | None = None
class User(BaseModel):
username: str
email: str | None = None
full_name: str | None = None
disabled: bool | None = None
class UserInDB(User):
hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
app = FastAPI()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get("/users/me/items/")
async def read_own_items(current_user: User = Depends(get_current_active_user)):
return [{"item_id": "Foo", "owner": current_user.username}]
โน ๐¶
โน get_current_user
๐จ ๐ ๐ค โญ, โ๏ธ ๐ ๐ฐ, โ๏ธ ๐ฅ ๐ค.
๐ฃ ๐จ ๐ค, โ โซ๏ธ, & ๐จ โฎ๏ธ ๐ฉโ๐ป.
๐ฅ ๐ค โ, ๐จ ๐บ๐ธ๐ โ โถ๏ธ๏ธ โ๏ธ.
from datetime import datetime, timedelta, timezone
from typing import Union
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
}
}
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Union[str, None] = None
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
app = FastAPI()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get("/users/me/items/")
async def read_own_items(current_user: User = Depends(get_current_active_user)):
return [{"item_id": "Foo", "owner": current_user.username}]
from datetime import datetime, timedelta, timezone
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
}
}
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str | None = None
class User(BaseModel):
username: str
email: str | None = None
full_name: str | None = None
disabled: bool | None = None
class UserInDB(User):
hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
app = FastAPI()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get("/users/me/items/")
async def read_own_items(current_user: User = Depends(get_current_active_user)):
return [{"item_id": "Foo", "owner": current_user.username}]
โน /token
โก ๐ ๏ธ¶
โ timedelta
โฎ๏ธ ๐ ๐ฐ ๐ค.
โ ๐ฐ ๐ฅ ๐ ๐ค & ๐จ โซ๏ธ.
from datetime import datetime, timedelta, timezone
from typing import Union
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
}
}
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Union[str, None] = None
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
app = FastAPI()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get("/users/me/items/")
async def read_own_items(current_user: User = Depends(get_current_active_user)):
return [{"item_id": "Foo", "owner": current_user.username}]
from datetime import datetime, timedelta, timezone
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
}
}
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str | None = None
class User(BaseModel):
username: str
email: str | None = None
full_name: str | None = None
disabled: bool | None = None
class UserInDB(User):
hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
app = FastAPI()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get("/users/me/items/")
async def read_own_items(current_user: User = Depends(get_current_active_user)):
return [{"item_id": "Foo", "owner": current_user.username}]
๐ก โน ๐ ๐ฅ "๐" sub
¶
๐ฅ ๐ง ๐ฌ ๐ ๐ค ๐ sub
, โฎ๏ธ ๐ ๐ค.
โซ๏ธ ๐ฆ โ๏ธ โซ๏ธ, โ๏ธ ๐ ๐โ ๐ ๐ ๐ฎ ๐ฉโ๐ป ๐, ๐ฅ โ๏ธ โซ๏ธ ๐ฅ.
๐ฅ 5๏ธโฃ๐ โ๏ธ ๐ ๐ โ๏ธ โช๏ธโก๏ธ โ ๐ฉโ๐ป & ๐ค ๐ซ ๐ญ ๐ ๏ธ ๐ ๐ ๐ ๐ ๏ธ.
๐ผ, ๐ ๐ช ๐ฌ "๐" โ๏ธ "๐ฐ ๐ค".
โคด๏ธ ๐ ๐ช ๐ฎ โ ๐ ๐ ๐จโ๐ผ, ๐ "๐พ" (๐) โ๏ธ "โ" (๐ฐ).
& โคด๏ธ, ๐ ๐ช ๐ค ๐ ๐ฅ ๐ค ๐ฉโ๐ป (โ๏ธ ๐ค), & ๐ซ ๐ช โ๏ธ โซ๏ธ ๐ญ ๐ ๐ฏ (๐พ ๐, โ๏ธ โ ๐ฐ ๐ค) ๐ต ๐โโ โ๏ธ ๐ง, โฎ๏ธ ๐ฅ ๐ค ๐ ๐ ๏ธ ๐ ๐.
โ๏ธ ๐ซ ๐ญ, ๐ฅ ๐ช โ๏ธ ๐ ๐ ๐ค ๐.
๐ ๐ผ, ๐ ๐ ๐จโ๐ผ ๐ช โ๏ธ ๐ ๐, โก๏ธ ๐ฌ foo
(๐ฉโ๐ป foo
, ๐ foo
, & ๐ฐ ๐ค foo
).
, โ ๐ ๐ฅ, ๐โ ๐ ๐ฅ ๐ค ๐ฉโ๐ป, ๐ ๐ช ๐ก ๐ฒ sub
๐, โ
โฎ๏ธ username:
. , ๐ ๐ผ, ๐ฒ sub
๐ช โ๏ธ: username:johndoe
.
โ ๐ โ๏ธ ๐คฏ ๐ sub
๐ ๐ โ๏ธ ๐ ๐ ๐คญ ๐ ๐ธ, & โซ๏ธ ๐ ๐ป.
โ โซ๏ธ¶
๐ ๐ฝ & ๐ถ ๐ฉบ: http://127.0.0.1:8000/docs.
๐ ๐ ๐ ๐ฉโ๐ป ๐ข ๐:
โ ๐ธ ๐ ๐ โญ.
โ๏ธ ๐:
๐: johndoe
๐: secret
Check
๐ ๐ ๐ณ ๐ ๐ข ๐ "secret
", ๐ฅ ๐ด โ๏ธ #๏ธโฃ โฌ.
๐ค ๐ /users/me/
, ๐ ๐ ๐ค ๐จ:
{
"username": "johndoe",
"email": "johndoe@example.com",
"full_name": "John Doe",
"disabled": false
}
๐ฅ ๐ ๐ ๐ฉโ๐ป ๐งฐ, ๐ ๐ช ๐ โ ๐ ๐จ ๐ด ๐ ๐ค, ๐ ๐ด ๐จ ๐ฅ ๐จ ๐ ๐ฉโ๐ป & ๐ค ๐ ๐ ๐ค, โ๏ธ ๐ซ โฎ๏ธ:
Note
๐ ๐ Authorization
, โฎ๏ธ ๐ฒ ๐ โถ๏ธ โฎ๏ธ Bearer
.
๐ง โ๏ธ โฎ๏ธ scopes
¶
Oauth2๏ธโฃ โ๏ธ ๐ "โ".
๐ ๐ช โ๏ธ ๐ซ ๐ฎ ๐ฏ โ โ ๐ฅ ๐ค.
โคด๏ธ ๐ ๐ช ๐ค ๐ ๐ค ๐ฉโ๐ป ๐ โ๏ธ ๐ฅ ๐ฅณ, ๐ โฎ๏ธ ๐ ๐ ๏ธ โฎ๏ธ โ ๐ซ.
๐ ๐ช ๐ก โ โ๏ธ ๐ซ & โ ๐ซ ๐ ๏ธ ๐ FastAPI โช ๐ง ๐ฉโ๐ป ๐ฆฎ.
๐¶
โฎ๏ธ โซ๏ธโ ๐ โ๏ธ ๐ ๐ ๐, ๐ ๐ช โ ๐ ๐ FastAPI ๐ธ โ๏ธ ๐ฉ ๐ Oauth2๏ธโฃ & ๐ฅ.
๐ ๐ ๐ ๏ธ ๐ ๐โโ โถ๏ธ๏ธ ๐ ๐ ๐ ๐.
๐ ๐ฆ ๐ ๐ โซ๏ธ ๐ โ๏ธ โ ๐ โ โฎ๏ธ ๐ฝ ๐ท, ๐ฝ, & ๐ช โ. & ๐ ๐ฆ ๐ ๐ ๐ ๐โโ๏ธ ๐ ๐ค โ๏ธ ๐โโ โ ๐.
FastAPI ๐ซ โ ๐ โ โฎ๏ธ ๐ ๐ฝ, ๐ฝ ๐ท โ๏ธ ๐งฐ.
โซ๏ธ ๐ค ๐ ๐ ๐ช โ ๐ ๐ ๐ ๐ ๐ ๐.
& ๐ ๐ช โ๏ธ ๐ ๐ ๐ ๐ง & ๐ โ๏ธ ๐ฆ ๐ passlib
& python-jose
, โฉ๏ธ FastAPI ๐ซ ๐ ๐ ๐ ๐ ๏ธ ๐ ๏ธ ๐ข ๐ฆ.
โ๏ธ โซ๏ธ ๐ ๐ ๐งฐ ๐ ๐ ๏ธ ๐ ๐ช ๐ต ๐ฏ ๐ช, โ, โ๏ธ ๐โโ.
& ๐ ๐ช โ๏ธ & ๐ ๏ธ ๐, ๐ฉ ๐ ๏ธ, ๐ Oauth2๏ธโฃ ๐ถ ๐ ๐.
๐ ๐ช ๐ก ๐ ๐ง ๐ฉโ๐ป ๐ฆฎ ๐ โ โ๏ธ Oauth2๏ธโฃ "โ", ๐ ๐-๐งฝ โ โ๏ธ, ๐ ๐ซ ๐ ๐ฉ. Oauth2๏ธโฃ โฎ๏ธ โ ๐ ๏ธ โ๏ธ ๐ ๐ฆ ๐ค ๐โ๐ฆบ, ๐ ๐ฑ๐, ๐บ๐ธ๐, ๐, ๐คธโโ, ๐ฑ๐, โ๏ธ. โ ๐ฅ ๐ฅณ ๐ธ ๐ โฎ๏ธ ๐ซ ๐ ๐ ๐จโ๐ผ ๐ซ ๐ฉโ๐ป.