メインコンテンツまでスキップ

「Python」タグの記事が10件件あります

Pythonプログラミング言語、フレームワーク、データサイエンス

全てのタグを見る

asdfでPythonをバージョン管理

· 約8分
Mikyan
白い柴犬

Pythonのバージョン管理にasdfを使用することで、複数のPythonバージョンを簡単に管理できるようになりました。

asdfはPythonだけでなく、Node.js、Javaなど複数のランタイムを統一的に管理できる非常に便利なツールです。

以下の手順はasdf v0.18.0に基づいて説明します。

asdfとは

asdfは複数のプログラミング言語のランタイムバージョンを管理するためのツールです。pyenv、nvm、rbenvなどの個別のバージョン管理ツールを統一的に置き換えることができます。

主な利点

  • 複数の言語を一つのツールで管理
  • プロジェクトごとのバージョン設定
  • 豊富なプラグインエコシステム

asdfとvenvの違い

asdf: Pythonインタープリター自体のバージョン管理

  • Python 3.11.7、3.12.1など異なるPythonバージョンを切り替え

venv: パッケージ(ライブラリ)の分離管理

  • 同じPythonバージョンでもプロジェクトごとに異なるパッケージセットを使用
  • 依存関係の競合を防ぐ

両方を組み合わせることで、Pythonバージョンとパッケージの両方を適切に管理できます。

asdfとvenvの違い

asdf: Pythonインタープリター自体のバージョン管理

  • Python 3.11.7、3.12.1など異なるPythonバージョンを切り替え

venv: パッケージ(ライブラリ)の分離管理

  • 同じPythonバージョンでもプロジェクトごとに異なるパッケージセットを使用
  • 依存関係の競合を防ぐ

両方を組み合わせることで、Pythonバージョンとパッケージの両方を適切に管理できます。

asdfのインストール

macOSの場合

公式ガイドに従ってインストールします。

brew install asdf

zshの設定

~/.zshrcファイルに以下を追加:

# asdfの設定
export PATH="${ASDF_DATA_DIR:-$HOME/.asdf}/shims:$PATH"
export ASDF_DATA_DIR="$HOME/.asdf/data"

補完機能の設定

# 補完ディレクトリの作成
mkdir -p "${ASDF_DATA_DIR:-$HOME/.asdf}/completions"

# zsh補完ファイルの生成
asdf completion zsh > "${ASDF_DATA_DIR:-$HOME/.asdf}/completions/_asdf"

再度~/.zshrcに以下を追加:

# asdf補完の有効化
fpath=(${ASDF_DATA_DIR:-$HOME/.asdf}/completions $fpath)
autoload -Uz compinit && compinit

設定を反映:

source ~/.zshrc

Pythonプラグインのインストールと設定

プラグインの追加

# Pythonプラグインを追加
asdf plugin add python

# インストール済みプラグインの確認
asdf plugin list

利用可能なPythonバージョンの確認

# インストール可能なPythonバージョンを表示
asdf list all python

Pythonのインストール

# 最新版をインストール
asdf install python latest

# 特定バージョンをインストール(例)
asdf install python 3.11.7

# インストール済みバージョンの確認
asdf list python

バージョンの設定

asdf v0.18.0ではasdf setコマンドを使用します:

# グローバル設定(HOMEフォルダ以下全体で使用)
asdf set --home python latest

# 現在の設定確認
asdf current python

出力例:

Name            Version         Source
python 3.13.5 /Users/username/.tool-versions
# Pythonコマンドの場所確認
which python3

出力例:

/Users/username/.asdf/data/shims/python3

プロジェクト固有の設定

プロジェクトディレクトリで特定のPythonバージョンを使用する場合:

# プロジェクトディレクトリに移動
cd /path/to/your/project

# プロジェクト用のPythonバージョンを設定
asdf set python 3.11.7

# .tool-versionsファイルが作成される
cat .tool-versions

出力例:

python 3.11.7

このファイルがあるディレクトリとその配下では、指定されたPythonバージョンが自動的に使用されます。

よく使うコマンド一覧

# 現在使用中のバージョン確認
asdf current

# 特定言語の現在バージョン
asdf current python

# インストール済みバージョン一覧
asdf list python

# 利用可能バージョン一覧(最新10件)
asdf list all python | tail -10

# プラグインの更新
asdf plugin update python


# 古いバージョンの削除
asdf uninstall python 3.10.0

トラブルシューティング

Pythonが見つからない場合

# shimの再作成
asdf reshim python

# パスの確認
echo $PATH | grep asdf

権限エラーが発生する場合

# asdfディレクトリの権限確認
ls -la ~/.asdf/

# 必要に応じて権限修正
chmod -R 755 ~/.asdf/

設定が反映されない場合

# .zshrcの再読み込み
source ~/.zshrc

# 現在のシェル設定確認
echo $ASDF_DATA_DIR
echo $PATH | grep asdf

実際の使用例

新しいプロジェクトでの設定

# プロジェクトディレクトリを作成
mkdir my-python-project
cd my-python-project

# プロジェクト用のPythonバージョンを指定
asdf set --local python 3.11.7

# 仮想環境の作成
python -m venv venv
source venv/bin/activate

# パッケージのインストール
pip install requests flask

チームでの.tool-versionsファイル共有

# .tool-versionsファイルをGitで共有
echo "python 3.11.7" > .tool-versions
echo "nodejs 18.19.0" >> .tool-versions

# チームメンバーは以下でバージョンをインストール
asdf install

設定が反映されない場合

# .zshrcの再読み込み
source ~/.zshrc

# 現在のシェル設定確認
echo $ASDF_DATA_DIR
echo $PATH | grep asdf

実際の使用例

新しいプロジェクトでの設定

# プロジェクトディレクトリを作成
mkdir my-python-project
cd my-python-project

# 1. asdfでPythonバージョンを指定(インタープリター管理)
asdf set python 3.11.7

# 2. venvで仮想環境を作成(パッケージ管理)
python -m venv venv
source venv/bin/activate

# 3. プロジェクト固有のパッケージをインストール
pip install requests flask

# 結果:Python 3.11.7 + 独立したパッケージ環境

なぜvenvも必要なのか?

問題:venvを使わない場合

# プロジェクトA
cd project-a
asdf set python 3.11.7
pip install django==4.2.0 requests==2.28.0

# プロジェクトB(同じPython 3.11.7)
cd ../project-b
pip install flask==2.3.0 requests==2.31.0 # ← requests 2.28.0が上書きされる

# プロジェクトAに戻ると...
cd ../project-a
# django が requests 2.31.0 で動作しない可能性!

解決:asdf + venv

# プロジェクトA: Python 3.11.7 + 独立した環境
project-a/ → django 4.2.0 + requests 2.28.0

# プロジェクトB: Python 3.11.7 + 独立した環境
project-b/ → flask 2.3.0 + requests 2.31.0

チームでの.tool-versionsファイル共有

# .tool-versionsファイルをGitで共有
echo "python 3.11.7" > .tool-versions
echo "nodejs 18.19.0" >> .tool-versions

# チームメンバーは以下でバージョンをインストール
asdf install

まとめ

asdfを使用することで以下のメリットが得られます:

  • 統一的な管理: 複数の言語のバージョン管理を一つのツールで実現
  • プロジェクト固有設定: .tool-versionsファイルでプロジェクトごとの環境を管理
  • 簡単な切り替え: コマンド一つでバージョン切り替えが可能
  • チーム開発: .tool-versionsファイルでチーム全体の環境を統一

Python開発での環境構築が大幅に簡素化されるため、特に複数のプロジェクトを扱う開発者にはぜひ活用していただきたいツールです。

Python FAISSでベクトル検索

· 約6分
Mikyan
白い柴犬
  • FAISSを使って高次元ベクトルの中から、クエリベクトルに最も「似ている」ベクトルを高速に検索できる
  • RAGにおいて、FAISSを使って、関連文書を検出ことができる

詳細

高次元の課題 (Curse of Dimensionality): 次元数が増えると(例えば、数千次元のベクトル)、ベクトル間の距離計算は非常に高コストになり、従来のデータベース手法では効率的に検索できなくなります。FAISSは、この高次元空間での非効率性を克服するための様々な工夫(近似検索アルゴリズム)を提供します。

類似性検索のニーズ: 「意味的に似ているものを探す」「内容が近い画像を見つける」「類似する推薦アイテムを提案する」といった、人間の直感的な「似ている」を数値データから見つけ出すニーズが、AI/MLの発展とともに爆発的に増加しました。FAISSは、このニーズに応えるための高速なソリューションです。

スケーラビリティ: データ量が膨大になっても、秒単位やミリ秒単位で検索結果を返すことが求められます。FAISSは、インデックス構造の工夫や並列処理、GPU活用などにより、このスケーラビリティを実現します。

FAISSは、基本的にメモリ内で高速な類似検索を行いますが、大規模なデータセットに対応するために、メモリ管理やストレージとの連携に関する様々な機能や戦略を持っています。

利用

FAISSのAPIは、大きく分けてインデックスの作成と検索の2つのフェーズで構成されます。

  1. インデックスの作成(Add / Train) まず、検索対象となるベクトルデータをFAISSのインデックス構造に「追加」します。
import faiss
import numpy as np

# 1. ベクトルデータの準備
# 例として、128次元のベクトルが1000個あると仮定
dimension = 128
num_vectors = 1000
data_vectors = np.random.rand(num_vectors, dimension).astype('float32')

# 2. インデックスの選択と初期化
# 最もシンプルなインデックス: IndexFlatL2
# L2 はユークリッド距離を意味し、最も厳密な(近似ではない)検索を行う
index = faiss.IndexFlatL2(dimension)

print(f"インデックスが空か?: {index.is_trained}") # IndexFlatL2は訓練不要なのでTrue
print(f"インデックスのベクトルの数: {index.ntotal}") # 初期状態では0

# 3. インデックスへのベクトルの追加
index.add(data_vectors)

print(f"インデックスのベクトルの数: {index.ntotal}") # 1000

faiss.IndexFlatL2(dimension): これは最も基本的なインデックスで、全てのベクトルをそのままメモリに格納し、クエリが来たら総当たりで距離を計算します(ブルートフォース検索)。L2 はユークリッド距離(二点間の直線距離)を意味します。これは厳密な最近傍探索を行いますが、データ量が増えると非常に遅くなります。

index.is_trained: インデックスによっては、データを追加する前に「訓練(train)」が必要なものがあります(例: IndexIVFFlat, IndexPQ)。これは、ベクトル空間を効率的に分割するためのクラスタリングなどを行います。IndexFlatL2 は訓練不要なので最初から True です。

index.add(vectors): 訓練が終わった(または不要な)インデックスに、実際にベクトルデータを追加します。

# 1. クエリベクトルの準備
# 例として、1つのクエリベクトルを準備
query_vector = np.random.rand(1, dimension).astype('float32') # 1つのクエリは (1, dimension) 形状

# 2. 検索の実行
# k=5: 最も近い5つのベクトルを検索
k = 5
distances, indices = index.search(query_vector, k)

print(f"検索されたベクトルのインデックス (元の data_vectors での): {indices}")
print(f"検索されたベクトルの距離: {distances}")

index.search(query_vectors, k):

query_vectors: 検索したいベクトル。複数個のクエリを一度に渡すことも可能です(例: (num_queries, dimension) のNumPy配列)。

k: 取得したい最近傍ベクトルの数。

戻り値:

distances: 検索された k 個のベクトルとクエリベクトルとの間の距離。

indices: 検索された k 個のベクトルが、インデックス追加時に持っていた元のインデックス(add したときの順番)を示します。

3. インデックスの保存とロード

作成したインデックスはファイルに保存し、後でロードして再利用できます。

# インデックスの保存
faiss.write_index(index, "my_faiss_index.bin")
print("インデックスを 'my_faiss_index.bin' に保存しました。")

# インデックスのロード
loaded_index = faiss.read_index("my_faiss_index.bin")
print(f"ロードされたインデックスのベクトルの数: {loaded_index.ntotal}")

Python Development Setup, pyenv, uv

· 約2分
Mikyan
白い柴犬

This article introduce how to setup python development environment for Web development.

For data science / machine learning development environment, you might prefer other approaches, like anaconda.

Details

Install uv

US is the modern solution and handles both Python versions AND package management. It's like having nvm + npm in one tool.

curl -LsSf https://astral.sh/uv/install.sh | sh

source $HOME/.local/bin/env

Use uv

# Install uv
curl -LsSf https://astral.sh/uv/install.sh | sh

# Install and use Python
uv python install 3.12
uv python pin 3.12 # Sets Python version for project

Use uv to initialize the project

cd my-project

uv init --python 3.12

Then the following files are generated:

tree
├── main.py
├── pyproject.toml
└── README.md

pyproject.toml is just like package.json

Initialize venv:

% uv venv
Using CPython 3.12.11
Creating virtual environment at: .venv
Activate with: source .venv/bin/activate
% source .venv/bin/activate

You can use venv in this project now.

# install deps
# it automatically use venv
uv sync

# add new package
uv add fastapi sqlalchemy alembic

Check the pyproject.toml file's

If want to init a fastapi project, using fastapi's template here might be a better choice.

git clone [email protected]:fastapi/full-stack-fastapi-template.git my-full-stack
cd my-full-stack
git remote set-url origin [email protected]:octocat/my-full-stack.git
git push -u origin master

Pytest

· 約7分
Mikyan
白い柴犬
  • pytest を使うことで、テストコードを簡潔かつ可読性の高いものにできる
  • フィクスチャを活用することで、テストの前提条件の準備や後処理を柔軟に記述可能
  • デコレーターによってデータ駆動テストをシンプルに記述できる
  • 詳細なテスト結果を提供し、デバッグが容易
  • 豊富なプラグインエコシステムにより、機能の拡張も容易

はじめに

Jest や JUnit などの他のテストフレームワークと比べて、pytest の導入は非常にシンプルです。

インストール

pip install -U pytest

セットアップ

pytest は、以下の簡単なルールに基づいてテストケースを自動的に検出します:

  • テストファイル名:test_ で始まるか、_test.py で終わるファイル
  • テスト関数/クラス:test_ で始まる関数、または Test で始まるクラスの中にある関数(ただし init は不可)

特別な設定ファイルなどは基本的に不要で、すぐにテストコードの記述を始められます。

一般的には、プロジェクトルートに tests/ ディレクトリを作成してテストコードを配置します。

my_project/
├── my_package/
│ ├── __init__.py
│ ├── module_a.py
│ └── module_b.py
├── tests/
│ ├── __init__.py # Can be empty, but often present
│ ├── unit/ # Optional: subdirectories for different test types
│ │ ├── test_module_a_unit.py
│ │ └── test_module_b_unit.py
│ ├── integration/ # Optional: for integration tests
│ │ └── test_api_integration.py
│ ├── functional/ # Optional: for end-to-end tests
│ │ └── test_user_flow.py
│ └── conftest.py # For shared fixtures (more on this below)
├── README.md
└── setup.py

テストの書き方

基本は assert を使って期待値を検証

# test_calculations.py
from my_functions import add, subtract, multiply

def test_add_positive_numbers():
assert add(2, 3) == 5

def test_add_negative_numbers():
assert add(-1, -5) == -6

@pytest.fixture を使った前処理・後処理の共通化

目的

  • テスト実行前の初期状態の準備(例:DB接続、設定ファイル、モックの準備)
  • テスト後のクリーンアップ処理(例:接続の切断、一時ファイル削除)
  • 共通処理の再利用による重複の排除

定義方法

  • 通常の Python 関数に @pytest.fixture をつけて定義します
  • yield の前に前処理、後に後処理を記述します
  • 使用する側では引数にフィクスチャ名を指定するだけで自動的に注入されます

利用方法

  • フィクスチャを使用したいテスト関数や他のフィクスチャの引数に、定義したフィクスチャ関数名を指定するだけです。Pytestが自動的に依存性を解決し、フィクスチャが提供する値を注入してくれます。

スコープの指定

フィクスチャはデフォルトで function スコープですが、scope 引数でその実行頻度を変更できます。

  • function (デフォルト): 各テスト関数ごとに1回実行。
  • class: テストクラス内の全テストメソッドに対して1回実行。
  • module: モジュール(ファイル)内の全テストに対して1回実行。
  • session: テストセッション全体で1回だけ実行。

フィクスチャの依存関係

フィクスチャは他のフィクスチャを引数として受け取ることができます。

conftest.py を使ったフィクスチャの共有

フィクスチャは、conftest.py という特別なファイルに定義することで、複数のテストファイル間で共有することができます。

conftest.py に定義されたフィクスチャは、明示的なインポートなしに、同じディレクトリやサブディレクトリ内のすべてのテストファイルから利用可能です。これにより、プロジェクト全体で共通のセットアップロジックを一元管理できます。

@pytest.fixture(scope="module") # モジュールスコープに設定
def module_db_connection():
"""モジュール内で一度だけ実行されるDB接続のフィクスチャ"""
print("\n[Module Scope] DB接続を確立しました。")
db_conn = {"status": "connected", "data": []}
yield db_conn
print("[Module Scope] DB接続を切断しました。")

@pytest.fixture
def empty_db():
"""A fixture that provides an empty dictionary, simulating an empty database."""
print("\nSetting up empty_db...") # This will print during test execution
_mock_db.clear() # Ensure it's empty before each test using this fixture
yield _mock_db # Yield the resource to the test
print("Tearing down empty_db...") # This runs after the test finishes
_mock_db.clear() # Clean up after the test

@pytest.fixture
def populated_db():
"""A fixture that provides a populated dictionary, simulating a database with some data."""
print("\nSetting up populated_db...")
_mock_db.clear()
_mock_db["user1"] = {"name": "Alice", "email": "[email protected]"}
_mock_db["user2"] = {"name": "Bob", "email": "[email protected]"}
yield _mock_db
print("Tearing down populated_db...")
_mock_db.clear()

def test_add_user_to_empty_db(module_db_connection, empty_db):
"""Test adding a user to an initially empty database."""
print("Running test_add_user_to_empty_db...")
empty_db["user3"] = {"name": "Charlie", "email": "[email protected]"}
assert "user3" in empty_db
assert len(empty_db) == 1

def test_retrieve_user_from_populated_db(populated_db):
"""Test retrieving an existing user from a populated database."""
print("Running test_retrieve_user_from_populated_db...")
user = populated_db.get("user1")
assert user is not None
assert user["name"] == "Alice"
assert user["email"] == "[email protected]"

@pytest.mark.parametrize でデータ駆動テスト

同じロジックを異なるデータセットで繰り返しテストしたい場合、parametrize を使えばコードを繰り返す必要がありません。

import pytest

def is_palindrome(s):
"""Checks if a string is a palindrome."""
cleaned_s = "".join(filter(str.isalnum, s)).lower()
return cleaned_s == cleaned_s[::-1]

@pytest.mark.parametrize("input_string, expected_output", [
("racecar", True),
("madam", True),
("A man, a plan, a canal: Panama", True), # With punctuation and spaces
("hello", False),
("Python", False),
("", True), # Empty string is a palindrome
("a", True), # Single character is a palindrome
])
def test_is_palindrome(input_string, expected_output):
"""Test the is_palindrome function with various inputs."""
assert is_palindrome(input_string) == expected_output

その他

テストをスキップする方法。 実際のプロジェクトを開発する際に、テストをスキップするのは、実務上たまに使ってるユースケースです。

Pytestを使ったら簡単にできます。

# test_feature_status.py
import pytest

def divide(a, b):
if b == 0:
raise ZeroDivisionError("Cannot divide by zero")
return a / b

@pytest.mark.skip(reason="This feature is not yet implemented")
def test_new_feature_logic():
"""A test for a feature that's still under development."""
assert 1 == 2 # This test would fail, but it's skipped

@pytest.mark.skipif(
pytest.__version__ < "8.0",
reason="Requires pytest version 8.0 or higher"
)
def test_new_pytest_feature():
"""This test only runs if a specific Pytest version is met."""
assert True

@pytest.mark.xfail(reason="Bug #1234: Division by zero is not handled gracefully yet")
def test_divide_by_zero_xfail():
"""This test is expected to fail due to a known bug."""
assert divide(10, 0) == 0 # This will raise ZeroDivisionError, but it's xfailed

総評

pytest はシンプルながらも強力な機能を持ち、テストコードの質を大きく向上させることができます。特にフィクスチャやパラメータ化テストを活用することで、実用的で保守性の高いテストが可能です。

FastAPI Authentication

· 約3分
Mikyan
白い柴犬
  • Use jose to encode / decode jwt
  • Use passlib to verify hashed password

Details

Utility functions

from typing import Optional

from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from passlib.context import CryptContext

# --- Configuration ---
# You should get these from environment variables in a real application
SECRET_KEY = "your-super-secret-jwt-key" # MAKE THIS A LONG, RANDOM STRING!
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30 # Example: 30 minutes

# Password hashing context
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# OAuth2PasswordBearer to extract token from Authorization header
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") # "token" is the endpoint for login

# --- Password Hashing Functions ---
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verifies a plain password against a hashed password."""
return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password: str) -> str:
"""Hashes a plain password."""
return pwd_context.hash(password)

# --- JWT Token Functions ---
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Creates a JWT access token."""
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt

def decode_access_token(token: str) -> dict:
"""Decodes and validates a JWT access token."""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
# You can add more validation here, e.g., check for 'sub' or 'user_id'
return payload
except JWTError:
raise credentials_exception

# --- Dependency for current user ---
async def get_current_user(token: str = Depends(oauth2_scheme)) -> dict:
"""
Dependency to get the current user from a JWT token.
Raises an HTTPException if the token is invalid or expired.
"""
payload = decode_access_token(token)
user_id: str = payload.get("sub") # 'sub' is commonly used for subject (e.g., user ID)
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token payload",
headers={"WWW-Authenticate": "Bearer"},
)
# In a real app, you would fetch the user from your database here
# to ensure they still exist and are active.
# For simplicity, we'll just return the user_id for now.
return {"user_id": user_id}

Usage

# A helper function to authenticate user (combines with your "database" logic)
def authenticate_user(username: str, password: str) -> UserInDB | None:
user = get_user(username)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user

@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
"""
Login endpoint to generate JWT access token.
Uses OAuth2PasswordRequestForm for standard username/password input.
"""
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, # 'sub' claim typically holds the user identifier
expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}

@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: dict = Depends(get_current_user)):
"""
Protected endpoint: Returns information about the current authenticated user.
Requires a valid JWT token in the Authorization header.
"""
# In a real application, you would fetch the full user object from the DB
# using current_user["user_id"]
username = current_user["user_id"]
user_data = fake_users_db.get(username)
if not user_data:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return User(**user_data)

@app.get("/protected-route/")
async def protected_route(current_user: dict = Depends(get_current_user)):
"""
Another protected endpoint.
"""
return {"message": f"Welcome, {current_user['user_id']}! You accessed a protected route."}

# Example of a public endpoint
@app.get("/")
async def read_root():
return {"message": "Welcome to the unauthenticated public endpoint!"}

Python SQL Alchemy

· 約5分
Mikyan
白い柴犬
  • Use sqlalchemy library to read/write db
  • engine is the database connection gateway
  • Base: The Declarative Base for ORM Models

Details

engine is the primary communication hub between your Python application and your actual database.

Connection Management: It's responsible for managing a pool of database connections. Instead of opening and closing a new connection for every single operation (which is slow and resource-intensive), the engine keeps a pool of ready-to-use connections.

Dialect Specifics: It understands the "dialect" of the specific database you're using (e.g., MySQL, PostgreSQL, SQLite). It translates SQLAlchemy's generic commands into the correct SQL syntax for that database.

Statement Execution: It's the underlying component that actually sends SQL statements to the database and receives results.

Transaction Management: It works with sessions to manage transactions.

Base object is the foundation upon which you build your SQLAlchemy ORM models. It links your Python classes to your database tables.

import os
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy.exc import SQLAlchemyError
from contextlib import contextmanager # For a clean session manager

# --- Database Configuration ---
# You'd typically get these from environment variables or a config file
DB_USER = os.environ.get("DB_USER", "myuser")
DB_PASSWORD = os.environ.get("DB_PASSWORD", "mypassword")
DB_HOST = os.environ.get("DB_HOST", "localhost")
DB_PORT = os.environ.get("DB_PORT", "3306")
DB_NAME = os.environ.get("DB_NAME", "my_test_db")

# MySQL connection string using PyMySQL driver
# Format: mysql+pymysql://user:password@host:port/dbname
DATABASE_URL = f"mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"

print(f"Attempting to connect to: {DATABASE_URL}")

# --- 1. Create the Engine ---
# The engine manages the connection pool and dialect specifics.
# echo=True is great for debugging; it logs all SQL statements to console.
engine = create_engine(DATABASE_URL, echo=True, pool_pre_ping=True)

# --- 2. Define the Base ---
# Base is the declarative base class that our ORM models will inherit from.
Base = declarative_base()

# --- 3. Define the ORM Model ---
# This Python class maps to a database table.
class User(Base):
__tablename__ = 'users' # The actual table name in the database

id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(50), nullable=False)
email = Column(String(100), unique=True, nullable=False)

def __repr__(self):
return f"<User(id={self.id}, name='{self.name}', email='{self.email}')>"

# --- 4. Create the Session Factory ---
# A sessionmaker factory creates Session objects.
# Sessions are the actual interface for database operations (transactions, queries).
Session = sessionmaker(bind=engine)

# --- Context Manager for Session (Best Practice) ---
# This ensures the session is properly closed even if errors occur.
@contextmanager
def get_session():
session = Session()
try:
yield session
session.commit() # Auto-commit on success
except SQLAlchemyError as e:
print(f"Database error occurred: {e}")
session.rollback() # Rollback on error
raise # Re-raise the exception after rollback
finally:
session.close() # Always close the session

# --- CRUD Operations ---

def create_tables():
print("\n--- Creating tables ---")
try:
# Base.metadata contains all table definitions inherited from Base.
# create_all creates these tables in the database linked by the engine.
Base.metadata.create_all(engine)
print("Tables created successfully.")
except SQLAlchemyError as e:
print(f"Error creating tables: {e}")

def create_user(name: str, email: str):
print(f"\n--- Creating user: {name} ({email}) ---")
with get_session() as session:
new_user = User(name=name, email=email)
session.add(new_user)
print(f"Added user: {new_user}")
return new_user

def read_users():
print("\n--- Reading users ---")
with get_session() as session:
users = session.query(User).all() # Query all users
if users:
for user in users:
print(user)
else:
print("No users found.")
return users

def read_user_by_email(email: str):
print(f"\n--- Reading user by email: {email} ---")
with get_session() as session:
user = session.query(User).filter_by(email=email).first() # Query by email
if user:
print(f"Found user: {user}")
else:
print(f"User with email '{email}' not found.")
return user

def update_user_email(user_id: int, new_email: str):
print(f"\n--- Updating user {user_id}'s email to {new_email} ---")
with get_session() as session:
user = session.query(User).filter_by(id=user_id).first()
if user:
old_email = user.email
user.email = new_email
# session.commit() is handled by the context manager on success
print(f"Updated user {user.name} email from {old_email} to {user.email}")
return user
else:
print(f"User with ID {user_id} not found.")
return None

def delete_user(user_id: int):
print(f"\n--- Deleting user with ID: {user_id} ---")
with get_session() as session:
user = session.query(User).filter_by(id=user_id).first()
if user:
session.delete(user)
# session.commit() is handled by the context manager on success
print(f"Deleted user: {user.name} ({user.id})")
return user
else:
print(f"User with ID {user_id} not found.")
return None

# --- Main Execution ---
if __name__ == "__main__":
# Ensure environment variables are set or defaults are acceptable
if "DB_USER" not in os.environ:
print("WARNING: DB_USER environment variable not set. Using default 'myuser'.")
if "DB_PASSWORD" not in os.environ:
print("WARNING: DB_PASSWORD environment variable not set. Using default 'mypassword'.")

# 1. Create tables (only run this once or when schema changes)
create_tables()

# 2. Create some users
user1 = create_user("Alice", "[email protected]")
user2 = create_user("Bob", "[email protected]")
# Try to create user with duplicate email to see error handling
try:
create_user("Charlie", "[email protected]")
except SQLAlchemyError:
print(" (Expected error: Duplicate email caught and rolled back)")

# 3. Read all users
read_users()

# 4. Read a specific user by email
read_user_by_email("[email protected]")
read_user_by_email("[email protected]")

# 5. Update a user
if user1: # Only if user1 was created successfully
update_user_email(user1.id, "[email protected]")
read_user_by_email("[email protected]")

# 6. Delete a user
if user2: # Only if user2 was created successfully
delete_user(user2.id)
read_users() # Show that Bob is gone

# Try deleting a non-existent user
delete_user(999)

print("\n--- All operations complete ---")
# In a real application, the engine would be disposed when the app shuts down.
# For this script, Python will clean it up on exit.

Python File System Operations

· 約3分
Mikyan
白い柴犬
  • Use pathlib to handle file paths and operations by default
  • For finegraind read/write over file I/O (streaming), use context manager
  • Use the tempfile Module for Temporary Files/Directories
  • Use shutil for High-level Operations

Details

Use pathlib is the modern, oo way to handle file paths, and operations.

from pathlib import Path

# Create Path Objects:
my_file = Path("data")

# Use Path methods
my_file.exists()

# Content I/O

my_file.read_text()
#Path.write_text()
#Path.write_text()
#Path.write_bytes()

Create a Path Object

# From CWD
#
current_dir = Path.cwd()
# From Home
home_dir = Path.home()
# From absolute Paths
abs_path = Path("/usr/local/bin/python")
# From relative paths (relative to CWD)
relative_path = Path("data/input.csv")

# Create path by manipulation
base_dir = Path('/opt/my_app')
config_file = base_dire / "config" / "settings.yaml"

parent_dir = config_file.parent

Dealing with file name

# Get file / directory name
config_file.name

# Getting Stem
config_file.stem # settings

# Getting suffix
config_file.suffix
config_file.suffixes

# Get absolute path
config_file.resolve()
# or
config_file.absolute()

# Get relative path
relative_to = config_file.relative_to(project_root)

Check / Query File System

my_file.exists()

my_file.is_file()
my_file.is_dir()
my_file.is_symlink()

# Statistics
stats = temp_file.stat()

Operations

# Create directories
new_dir.mkdir()

# create empty file
empty_file.touch()

# delete file

file_to_delete.unlink()


# delete empty directories

empty_folder.rmdir()

# rename / move file or directories

old_path.rename(new_path)

# Changing suffix
config_file.with_suffix('.yml')

File Content I/O

config_path = Path("config.txt")
config_path.write_text("debug=True\nlog_level=INFO")
content = config_path.read_text()

binary_data_file = Path("binary_data.bin")
binary_data_file.write_bytes(b'\x01\x02\x03\x04')
data = binary_data_file.read_bytes()
print(f"Binary data: {data}")

directory iteration / traversal

# List
project_root.iterdir()

# Globbing
project_root.glob("*.py")

# Walking Directory Tree (Python 3.12+)
project_root.walk()

Use Context Managers (with open(...)) for File I/O

When you need more fine-grained control over file reading/writing, (streaming large files, specific encoding, or binary modes), use the with statement.

try:
with open("my_large_file.csv", "w", encoding="utf-8") as f:
f.write("Header1,Header2\n")
for i in range(1000):
f.write(f"data_{i},value_{i}\n")
except IOError as e:
print(f"Error writing file: {e}")

Use the tempfile Module for Temporary Files/Directories

import tempfile
from pathlib import Path

# Using a temporary directory
with tempfile.TemporaryDirectory() as tmp_dir_str:
tmp_dir = Path(tmp_dir_str)
temp_file = tmp_dir / "temp_report.txt"
temp_file.write_text("Ephemeral data.")
print(f"Created temporary file at: {temp_file}")
# At the end of the 'with' block, tmp_dir_str and its contents are deleted
print("Temporary directory removed.")

Use shutil for High-level Operations

shutil Focuses on operations that involing moving, copying, or deleting entire trees of files and directories, or other utility functions that go beyond a single Path obejct's scope.

import shutil

source_dir = Path("my_data")
destination_dir = Path("backup_data")


try:
shutil.copytree(source_dir, destination_dir)
print(f"Copied '{source_dir}' to '{destination_dir}'")
except FileExistsError:
print(f"Destination '{destination_dir}' already exists. Skipping copy.")
except Exception as e:
print(f"Error copying tree: {e}")
import shutil
from pathlib import Path

dir_to_delete = Path("backup_data") # Assuming this exists from the copytree example

if dir_to_delete.exists():
print(f"Deleting '{dir_to_delete}'...")
shutil.rmtree(dir_to_delete)
print("Directory deleted.")
else:
print(f"Directory '{dir_to_delete}' does not exist.")

Zip / Tarring

shutil even can create compressed archieves, and unpack them.

archive_path = shutil.make_archive(archive_name, 'zip', source_dir)
print(f"Created archive: {archive_path}")

Copy File Metadata

  • shutil.copystat(src, dst) copy permission bits, last access time, last modification time and flags from one file to another
  • shutil.copy2(src, dst) copies the file and metadata

Getting Disk Usage

usage = shutil.disk_usage(Path(".")) # Check current directory's disk
print(f"Total: {usage.total / (1024**3):.2f} GB")
print(f"Used: {usage.used / (1024**3):.2f} GB")
print(f"Free: {usage.free / (1024**3):.2f} GB")

Do not

  • Avoid os.system() or subprocess.run() for file operations in most case

Python Async Programming

· 約2分
Mikyan
白い柴犬

Python's asynchronous programming is built around the asyncio module, and async/await keywords.

Concept

coroutine is a special type of function that represents a computation that can be paused and resumed.

A coroutine is defined with async def.

For example the following function is a coroutine

async def my_coroutine():
print("Coroutine started")
await asyncio.sleep(1) # This is a pause point
print("Coroutine resumed after 1 second")
return "Done!"
  • Inside an async def function, the await keyword is used to pause the execution of the current coroutine.
  • When a coroutine awaits something, it singals to the event loop that it's waiting for an I/O operation or some other asynchronous event to complete
  • While the current coroutine is paused, the event loop can switch its attention to other coroutines or tasks that are ready to run, ensuring efficient use of the CPU.

Why async def functions can be paused

  • A regular def function is executed directly by the Python interpreter, when you call it the interpreter's program counter moves through its instructions sequentially. If it encounters something that blocks, the entire thread stops until that blocking operation is done.

  • An Async def function, when called doesn't immediately execute its body. Instead it returns a coroutine object. This object is a special kind of generator that the asyncio event loop knows how to manage.

  • use the await keyword to singal an intentional pause.

  • if there is no await inside an async def function, it will run like regular synchronous function until completion.

The Event loop is the orchestrator.

  • The asyncio event loop is continuously monitoring a set of registered coroutines/tasks. It's like a dispatcher.

  • State Preservation: (Generators)

Conceptually, Python coroutines are built on top of generators. When a generator yields a value, its local state (variables, instruction pointer) is saved. When next() is called on it again, it resumes from where it left off.

Similarly, when an async def function awaits, its internal state is saved. When the awaited operation completes, the coroutine is "sent" a signal to resume, and it continues execution from the line immediately following the await.

Why Async is important for web framework

Python Pydantic

· 約4分
Mikyan
白い柴犬

Pydantic は、Pythonのクラス構文と型ヒント(Type Hint)を活用して、以下のような堅牢なデータ処理を可能にするライブラリです。

  • データの検証(Validation): 型ヒントに基づき、値の型や制約条件を自動チェック

  • 型の自動変換(Coercion): 可能であれば、入力値を期待される型に自動変換

  • データのシリアライズ/デシリアライズ: 辞書やJSONからのモデル生成、辞書やJSON形式での出力

  • バリデーションロジックのカスタマイズ: フィールド単位・モデル単位で検証ロジックを定義可能

1. モデルの定義

基本は BaseModel を継承したクラスを作るだけです。必要に応じて Field() を使うことで、制約(例: 最大文字数、正の数など)や説明文を追加できます。

from typing import Optional
from pydantic import BaseModel, Field, EmailStr

class User(BaseModel):
name: str
age: int
email: str

# Valid data
user = User(name="Alice", age=30, email="[email protected]")
print(user)

# 不正なデータは自動的にエラーになります:

try:
User(name="Bob", age="twenty", email="bob@invalid")
except Exception as e:
print(e)

2. フィールド制約と説明文の付与

class Product(BaseModel):
id: int = Field(..., gt=0, description="Unique product identifier")
name: str = Field(..., min_length=2, max_length=100)
price: float = Field(..., gt=0.0)
description: Optional[str] = None # Optional field
seller_email: EmailStr # Pydantic's built-in email validation

product = Product(id=1, name="Laptop", price=1200.50, seller_email="[email protected]")
print(product)

3. モデルの生成方法

  • 辞書からの検証付きインスタンス生成
  • JSON文字列からの生成

辞書からの作成は、Constructorまたはmodel_validateを使って、モデルを作成できます。

model_validate_json はJsonの文字列でモデルを作成できます。

user_data = {
"name": "Alice",
"age": 30,
"email": "[email protected]"
}
user_model = User(**user_data)

user_model = User.model_validate(user_data)


class Movie(BaseModel):
title: str
year: int
director: str
genres: list[str]

# Your JSON string data
json_string = '''
{
"title": "Inception",
"year": 2010,
"director": "Christopher Nolan",
"genres": ["Sci-Fi", "Action", "Thriller"]
}
'''
movie_model = Movie.model_validate_json(json_string)

4. モデルのシリアライズ: model_validate(), model_validate_json()

  • model_dump: Python辞書へ変換
  • model_dump_json: JSON文字列で出力
from pydantic import BaseModel

class City(BaseModel):
name: str
population: int

tokyo = City(name="Tokyo", population=14000000)
print(tokyo.model_dump())
print(tokyo.model_dump_json(indent=2)) # Pretty print JSON

5. カスタムバリデーション

  • @field_validator 関数を使って、フィールド単位のバリデータ
  • @model_validator 関数を使って、モデル全体の検証
from datetime import date
from pydantic import BaseModel, ValidationError, field_validator, model_validator

class Event(BaseModel):
name: str
start_date: date
end_date: date

@field_validator('name')
@classmethod
def check_name_is_not_empty(cls, v):
if not v.strip():
raise ValueError('Event name cannot be empty')
return v

@model_validator(mode='after') # 'after' means after field validation
def check_dates_order(self):
if self.start_date > self.end_date:
raise ValueError('Start date must be before end date')
return self

try:
event1 = Event(name="Conference", start_date="2025-07-20", end_date="2025-07-22")
print(event1)
except ValidationError as e:
print(e)

try:
Event(name="Bad Event", start_date="2025-07-25", end_date="2025-07-23")
except ValidationError as e:
print(e)

6. 入れ子モデル(ネスト構造)

from pydantic import BaseModel
from typing import List

class Address(BaseModel):
street: str
city: str
zip_code: str

class Customer(BaseModel):
customer_id: int
name: str
shipping_addresses: List[Address]

customer_data = {
"customer_id": 123,
"name": "Jane Doe",
"shipping_addresses": [
{"street": "123 Main St", "city": "Anytown", "zip_code": "12345"},
{"street": "456 Oak Ave", "city": "Otherville", "zip_code": "67890"}
]
}

customer = Customer.model_validate(customer_data)
print(customer)

7. JSON Schemaの自動生成

from pydantic import BaseModel

class Task(BaseModel):
id: int
title: str
completed: bool = False

print(Task.model_json_schema(indent=2))

おわりに

Pydanticは以下のような用途に特に有効です:

  • APIのリクエスト・レスポンスモデル(FastAPIとの統合が特に強力)

  • 設定ファイルや外部入力の安全な読み込み

  • 型安全で自己文書化されたデータモデルの構築

  • 静的型チェックが弱いPythonにおいて、Pydanticは強力な型バリデーションと開発者体験を提供してくれます。

参考リンク

document on how to use it.

Python Type Hint

· 約2分
Mikyan
白い柴犬

Since Python 3.5, Python introduced Type hint. And it become more and more powerful.

With it You can set the type for your variable for readibility.

Type hints are hints, not enforcements. Python still runs the code even if types don't match.

Usage

# Primitives
name: str = "Tom"
age: int = 30
salary: float = 500.5
is_active: bool = True

# Collections
numbers: list = [1,2,3]
scores: tuple = (90, 85, 88)
unique: set = {1, 2, 3}
data: dict = {"key": "value"}


# Specific Collection Types

from typing import List, Dict, Tuple, Set

names: List[str] = ["Alice", "Bob", "Charlie"]
user: Dict[str, str] = {
"name": "John",
"email": "[email protected]"
}
person: Tuple[str, int, bool] = ("Alice", 30, True)
unique_ids: Set[int] = {1, 2, 3, 4, 5}

# after python 3.9 the following are also work
names: list[str] = ["Alice", "Bob", "Charlie"]
user: dict[str, str] = {
"name": "John",
"email": "[email protected]"
}person: tuple[str, int, bool] = ("Alice", 30, True)
unique_ids: set[int] = {1, 2, 3, 4, 5}

# Optional

from typing import Optional

# can be string or None
middle_name: Optional[str] = None

# Union
from typing import Union
number: Union[int, float] = 10
number = 10.5


# Literal for exact values
from typing import Literal
Status = Literal["pending", "approved", "rejected"]

def process_order(status: Status) -> None:
pass

# TypedDict
from typing import TypedDict
# TypedDict for dictionary structures
class UserDict(TypedDict):
name: str
age: int
email: str


# Class
user: User = get_user(123)

# method
def calculate_bmi(weight: float, height: float) -> float:
return weight / (height ** 2)

# Self
from typing import Self

class User:
def copy(self) -> Self: # Returns same class type
return User()