メインコンテンツまでスキップ

「authentication」タグの記事が1件件あります

全てのタグを見る

FastAPI Authentication

· 約3分
Mikyan
白い柴犬
  • Use jose to encode / decode jwt
  • Use passlib to verify hashed password

Details

Utility functions

from typing import Optional

from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from passlib.context import CryptContext

# --- Configuration ---
# You should get these from environment variables in a real application
SECRET_KEY = "your-super-secret-jwt-key" # MAKE THIS A LONG, RANDOM STRING!
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30 # Example: 30 minutes

# Password hashing context
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# OAuth2PasswordBearer to extract token from Authorization header
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") # "token" is the endpoint for login

# --- Password Hashing Functions ---
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verifies a plain password against a hashed password."""
return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password: str) -> str:
"""Hashes a plain password."""
return pwd_context.hash(password)

# --- JWT Token Functions ---
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Creates a JWT access token."""
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt

def decode_access_token(token: str) -> dict:
"""Decodes and validates a JWT access token."""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
# You can add more validation here, e.g., check for 'sub' or 'user_id'
return payload
except JWTError:
raise credentials_exception

# --- Dependency for current user ---
async def get_current_user(token: str = Depends(oauth2_scheme)) -> dict:
"""
Dependency to get the current user from a JWT token.
Raises an HTTPException if the token is invalid or expired.
"""
payload = decode_access_token(token)
user_id: str = payload.get("sub") # 'sub' is commonly used for subject (e.g., user ID)
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token payload",
headers={"WWW-Authenticate": "Bearer"},
)
# In a real app, you would fetch the user from your database here
# to ensure they still exist and are active.
# For simplicity, we'll just return the user_id for now.
return {"user_id": user_id}

Usage

# A helper function to authenticate user (combines with your "database" logic)
def authenticate_user(username: str, password: str) -> UserInDB | None:
user = get_user(username)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user

@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
"""
Login endpoint to generate JWT access token.
Uses OAuth2PasswordRequestForm for standard username/password input.
"""
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, # 'sub' claim typically holds the user identifier
expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}

@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: dict = Depends(get_current_user)):
"""
Protected endpoint: Returns information about the current authenticated user.
Requires a valid JWT token in the Authorization header.
"""
# In a real application, you would fetch the full user object from the DB
# using current_user["user_id"]
username = current_user["user_id"]
user_data = fake_users_db.get(username)
if not user_data:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return User(**user_data)

@app.get("/protected-route/")
async def protected_route(current_user: dict = Depends(get_current_user)):
"""
Another protected endpoint.
"""
return {"message": f"Welcome, {current_user['user_id']}! You accessed a protected route."}

# Example of a public endpoint
@app.get("/")
async def read_root():
return {"message": "Welcome to the unauthenticated public endpoint!"}