メインコンテンツまでスキップ

Python SQL Alchemy

· 約5分
Mikyan
白い柴犬
  • Use sqlalchemy library to read/write db
  • engine is the database connection gateway
  • Base: The Declarative Base for ORM Models

Details

engine is the primary communication hub between your Python application and your actual database.

Connection Management: It's responsible for managing a pool of database connections. Instead of opening and closing a new connection for every single operation (which is slow and resource-intensive), the engine keeps a pool of ready-to-use connections.

Dialect Specifics: It understands the "dialect" of the specific database you're using (e.g., MySQL, PostgreSQL, SQLite). It translates SQLAlchemy's generic commands into the correct SQL syntax for that database.

Statement Execution: It's the underlying component that actually sends SQL statements to the database and receives results.

Transaction Management: It works with sessions to manage transactions.

Base object is the foundation upon which you build your SQLAlchemy ORM models. It links your Python classes to your database tables.

import os
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy.exc import SQLAlchemyError
from contextlib import contextmanager # For a clean session manager

# --- Database Configuration ---
# You'd typically get these from environment variables or a config file
DB_USER = os.environ.get("DB_USER", "myuser")
DB_PASSWORD = os.environ.get("DB_PASSWORD", "mypassword")
DB_HOST = os.environ.get("DB_HOST", "localhost")
DB_PORT = os.environ.get("DB_PORT", "3306")
DB_NAME = os.environ.get("DB_NAME", "my_test_db")

# MySQL connection string using PyMySQL driver
# Format: mysql+pymysql://user:password@host:port/dbname
DATABASE_URL = f"mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"

print(f"Attempting to connect to: {DATABASE_URL}")

# --- 1. Create the Engine ---
# The engine manages the connection pool and dialect specifics.
# echo=True is great for debugging; it logs all SQL statements to console.
engine = create_engine(DATABASE_URL, echo=True, pool_pre_ping=True)

# --- 2. Define the Base ---
# Base is the declarative base class that our ORM models will inherit from.
Base = declarative_base()

# --- 3. Define the ORM Model ---
# This Python class maps to a database table.
class User(Base):
__tablename__ = 'users' # The actual table name in the database

id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(50), nullable=False)
email = Column(String(100), unique=True, nullable=False)

def __repr__(self):
return f"<User(id={self.id}, name='{self.name}', email='{self.email}')>"

# --- 4. Create the Session Factory ---
# A sessionmaker factory creates Session objects.
# Sessions are the actual interface for database operations (transactions, queries).
Session = sessionmaker(bind=engine)

# --- Context Manager for Session (Best Practice) ---
# This ensures the session is properly closed even if errors occur.
@contextmanager
def get_session():
session = Session()
try:
yield session
session.commit() # Auto-commit on success
except SQLAlchemyError as e:
print(f"Database error occurred: {e}")
session.rollback() # Rollback on error
raise # Re-raise the exception after rollback
finally:
session.close() # Always close the session

# --- CRUD Operations ---

def create_tables():
print("\n--- Creating tables ---")
try:
# Base.metadata contains all table definitions inherited from Base.
# create_all creates these tables in the database linked by the engine.
Base.metadata.create_all(engine)
print("Tables created successfully.")
except SQLAlchemyError as e:
print(f"Error creating tables: {e}")

def create_user(name: str, email: str):
print(f"\n--- Creating user: {name} ({email}) ---")
with get_session() as session:
new_user = User(name=name, email=email)
session.add(new_user)
print(f"Added user: {new_user}")
return new_user

def read_users():
print("\n--- Reading users ---")
with get_session() as session:
users = session.query(User).all() # Query all users
if users:
for user in users:
print(user)
else:
print("No users found.")
return users

def read_user_by_email(email: str):
print(f"\n--- Reading user by email: {email} ---")
with get_session() as session:
user = session.query(User).filter_by(email=email).first() # Query by email
if user:
print(f"Found user: {user}")
else:
print(f"User with email '{email}' not found.")
return user

def update_user_email(user_id: int, new_email: str):
print(f"\n--- Updating user {user_id}'s email to {new_email} ---")
with get_session() as session:
user = session.query(User).filter_by(id=user_id).first()
if user:
old_email = user.email
user.email = new_email
# session.commit() is handled by the context manager on success
print(f"Updated user {user.name} email from {old_email} to {user.email}")
return user
else:
print(f"User with ID {user_id} not found.")
return None

def delete_user(user_id: int):
print(f"\n--- Deleting user with ID: {user_id} ---")
with get_session() as session:
user = session.query(User).filter_by(id=user_id).first()
if user:
session.delete(user)
# session.commit() is handled by the context manager on success
print(f"Deleted user: {user.name} ({user.id})")
return user
else:
print(f"User with ID {user_id} not found.")
return None

# --- Main Execution ---
if __name__ == "__main__":
# Ensure environment variables are set or defaults are acceptable
if "DB_USER" not in os.environ:
print("WARNING: DB_USER environment variable not set. Using default 'myuser'.")
if "DB_PASSWORD" not in os.environ:
print("WARNING: DB_PASSWORD environment variable not set. Using default 'mypassword'.")

# 1. Create tables (only run this once or when schema changes)
create_tables()

# 2. Create some users
user1 = create_user("Alice", "[email protected]")
user2 = create_user("Bob", "[email protected]")
# Try to create user with duplicate email to see error handling
try:
create_user("Charlie", "[email protected]")
except SQLAlchemyError:
print(" (Expected error: Duplicate email caught and rolled back)")

# 3. Read all users
read_users()

# 4. Read a specific user by email
read_user_by_email("[email protected]")
read_user_by_email("[email protected]")

# 5. Update a user
if user1: # Only if user1 was created successfully
update_user_email(user1.id, "[email protected]")
read_user_by_email("[email protected]")

# 6. Delete a user
if user2: # Only if user2 was created successfully
delete_user(user2.id)
read_users() # Show that Bob is gone

# Try deleting a non-existent user
delete_user(999)

print("\n--- All operations complete ---")
# In a real application, the engine would be disposed when the app shuts down.
# For this script, Python will clean it up on exit.

Python File System Operations

· 約3分
Mikyan
白い柴犬
  • Use pathlib to handle file paths and operations by default
  • For finegraind read/write over file I/O (streaming), use context manager
  • Use the tempfile Module for Temporary Files/Directories
  • Use shutil for High-level Operations

Details

Use pathlib is the modern, oo way to handle file paths, and operations.

from pathlib import Path

# Create Path Objects:
my_file = Path("data")

# Use Path methods
my_file.exists()

# Content I/O

my_file.read_text()
#Path.write_text()
#Path.write_text()
#Path.write_bytes()

Create a Path Object

# From CWD
#
current_dir = Path.cwd()
# From Home
home_dir = Path.home()
# From absolute Paths
abs_path = Path("/usr/local/bin/python")
# From relative paths (relative to CWD)
relative_path = Path("data/input.csv")

# Create path by manipulation
base_dir = Path('/opt/my_app')
config_file = base_dire / "config" / "settings.yaml"

parent_dir = config_file.parent

Dealing with file name

# Get file / directory name
config_file.name

# Getting Stem
config_file.stem # settings

# Getting suffix
config_file.suffix
config_file.suffixes

# Get absolute path
config_file.resolve()
# or
config_file.absolute()

# Get relative path
relative_to = config_file.relative_to(project_root)

Check / Query File System

my_file.exists()

my_file.is_file()
my_file.is_dir()
my_file.is_symlink()

# Statistics
stats = temp_file.stat()

Operations

# Create directories
new_dir.mkdir()

# create empty file
empty_file.touch()

# delete file

file_to_delete.unlink()


# delete empty directories

empty_folder.rmdir()

# rename / move file or directories

old_path.rename(new_path)

# Changing suffix
config_file.with_suffix('.yml')

File Content I/O

config_path = Path("config.txt")
config_path.write_text("debug=True\nlog_level=INFO")
content = config_path.read_text()

binary_data_file = Path("binary_data.bin")
binary_data_file.write_bytes(b'\x01\x02\x03\x04')
data = binary_data_file.read_bytes()
print(f"Binary data: {data}")

directory iteration / traversal

# List
project_root.iterdir()

# Globbing
project_root.glob("*.py")

# Walking Directory Tree (Python 3.12+)
project_root.walk()

Use Context Managers (with open(...)) for File I/O

When you need more fine-grained control over file reading/writing, (streaming large files, specific encoding, or binary modes), use the with statement.

try:
with open("my_large_file.csv", "w", encoding="utf-8") as f:
f.write("Header1,Header2\n")
for i in range(1000):
f.write(f"data_{i},value_{i}\n")
except IOError as e:
print(f"Error writing file: {e}")

Use the tempfile Module for Temporary Files/Directories

import tempfile
from pathlib import Path

# Using a temporary directory
with tempfile.TemporaryDirectory() as tmp_dir_str:
tmp_dir = Path(tmp_dir_str)
temp_file = tmp_dir / "temp_report.txt"
temp_file.write_text("Ephemeral data.")
print(f"Created temporary file at: {temp_file}")
# At the end of the 'with' block, tmp_dir_str and its contents are deleted
print("Temporary directory removed.")

Use shutil for High-level Operations

shutil Focuses on operations that involing moving, copying, or deleting entire trees of files and directories, or other utility functions that go beyond a single Path obejct's scope.

import shutil

source_dir = Path("my_data")
destination_dir = Path("backup_data")


try:
shutil.copytree(source_dir, destination_dir)
print(f"Copied '{source_dir}' to '{destination_dir}'")
except FileExistsError:
print(f"Destination '{destination_dir}' already exists. Skipping copy.")
except Exception as e:
print(f"Error copying tree: {e}")
import shutil
from pathlib import Path

dir_to_delete = Path("backup_data") # Assuming this exists from the copytree example

if dir_to_delete.exists():
print(f"Deleting '{dir_to_delete}'...")
shutil.rmtree(dir_to_delete)
print("Directory deleted.")
else:
print(f"Directory '{dir_to_delete}' does not exist.")

Zip / Tarring

shutil even can create compressed archieves, and unpack them.

archive_path = shutil.make_archive(archive_name, 'zip', source_dir)
print(f"Created archive: {archive_path}")

Copy File Metadata

  • shutil.copystat(src, dst) copy permission bits, last access time, last modification time and flags from one file to another
  • shutil.copy2(src, dst) copies the file and metadata

Getting Disk Usage

usage = shutil.disk_usage(Path(".")) # Check current directory's disk
print(f"Total: {usage.total / (1024**3):.2f} GB")
print(f"Used: {usage.used / (1024**3):.2f} GB")
print(f"Free: {usage.free / (1024**3):.2f} GB")

Do not

  • Avoid os.system() or subprocess.run() for file operations in most case

Python Async Programming

· 約2分
Mikyan
白い柴犬

Python's asynchronous programming is built around the asyncio module, and async/await keywords.

Concept

coroutine is a special type of function that represents a computation that can be paused and resumed.

A coroutine is defined with async def.

For example the following function is a coroutine

async def my_coroutine():
print("Coroutine started")
await asyncio.sleep(1) # This is a pause point
print("Coroutine resumed after 1 second")
return "Done!"
  • Inside an async def function, the await keyword is used to pause the execution of the current coroutine.
  • When a coroutine awaits something, it singals to the event loop that it's waiting for an I/O operation or some other asynchronous event to complete
  • While the current coroutine is paused, the event loop can switch its attention to other coroutines or tasks that are ready to run, ensuring efficient use of the CPU.

Why async def functions can be paused

  • A regular def function is executed directly by the Python interpreter, when you call it the interpreter's program counter moves through its instructions sequentially. If it encounters something that blocks, the entire thread stops until that blocking operation is done.

  • An Async def function, when called doesn't immediately execute its body. Instead it returns a coroutine object. This object is a special kind of generator that the asyncio event loop knows how to manage.

  • use the await keyword to singal an intentional pause.

  • if there is no await inside an async def function, it will run like regular synchronous function until completion.

The Event loop is the orchestrator.

  • The asyncio event loop is continuously monitoring a set of registered coroutines/tasks. It's like a dispatcher.

  • State Preservation: (Generators)

Conceptually, Python coroutines are built on top of generators. When a generator yields a value, its local state (variables, instruction pointer) is saved. When next() is called on it again, it resumes from where it left off.

Similarly, when an async def function awaits, its internal state is saved. When the awaited operation completes, the coroutine is "sent" a signal to resume, and it continues execution from the line immediately following the await.

Why Async is important for web framework

Python Pydantic

· 約4分
Mikyan
白い柴犬

Pydantic は、Pythonのクラス構文と型ヒント(Type Hint)を活用して、以下のような堅牢なデータ処理を可能にするライブラリです。

  • データの検証(Validation): 型ヒントに基づき、値の型や制約条件を自動チェック

  • 型の自動変換(Coercion): 可能であれば、入力値を期待される型に自動変換

  • データのシリアライズ/デシリアライズ: 辞書やJSONからのモデル生成、辞書やJSON形式での出力

  • バリデーションロジックのカスタマイズ: フィールド単位・モデル単位で検証ロジックを定義可能

1. モデルの定義

基本は BaseModel を継承したクラスを作るだけです。必要に応じて Field() を使うことで、制約(例: 最大文字数、正の数など)や説明文を追加できます。

from typing import Optional
from pydantic import BaseModel, Field, EmailStr

class User(BaseModel):
name: str
age: int
email: str

# Valid data
user = User(name="Alice", age=30, email="[email protected]")
print(user)

# 不正なデータは自動的にエラーになります:

try:
User(name="Bob", age="twenty", email="bob@invalid")
except Exception as e:
print(e)

2. フィールド制約と説明文の付与

class Product(BaseModel):
id: int = Field(..., gt=0, description="Unique product identifier")
name: str = Field(..., min_length=2, max_length=100)
price: float = Field(..., gt=0.0)
description: Optional[str] = None # Optional field
seller_email: EmailStr # Pydantic's built-in email validation

product = Product(id=1, name="Laptop", price=1200.50, seller_email="[email protected]")
print(product)

3. モデルの生成方法

  • 辞書からの検証付きインスタンス生成
  • JSON文字列からの生成

辞書からの作成は、Constructorまたはmodel_validateを使って、モデルを作成できます。

model_validate_json はJsonの文字列でモデルを作成できます。

user_data = {
"name": "Alice",
"age": 30,
"email": "[email protected]"
}
user_model = User(**user_data)

user_model = User.model_validate(user_data)


class Movie(BaseModel):
title: str
year: int
director: str
genres: list[str]

# Your JSON string data
json_string = '''
{
"title": "Inception",
"year": 2010,
"director": "Christopher Nolan",
"genres": ["Sci-Fi", "Action", "Thriller"]
}
'''
movie_model = Movie.model_validate_json(json_string)

4. モデルのシリアライズ: model_validate(), model_validate_json()

  • model_dump: Python辞書へ変換
  • model_dump_json: JSON文字列で出力
from pydantic import BaseModel

class City(BaseModel):
name: str
population: int

tokyo = City(name="Tokyo", population=14000000)
print(tokyo.model_dump())
print(tokyo.model_dump_json(indent=2)) # Pretty print JSON

5. カスタムバリデーション

  • @field_validator 関数を使って、フィールド単位のバリデータ
  • @model_validator 関数を使って、モデル全体の検証
from datetime import date
from pydantic import BaseModel, ValidationError, field_validator, model_validator

class Event(BaseModel):
name: str
start_date: date
end_date: date

@field_validator('name')
@classmethod
def check_name_is_not_empty(cls, v):
if not v.strip():
raise ValueError('Event name cannot be empty')
return v

@model_validator(mode='after') # 'after' means after field validation
def check_dates_order(self):
if self.start_date > self.end_date:
raise ValueError('Start date must be before end date')
return self

try:
event1 = Event(name="Conference", start_date="2025-07-20", end_date="2025-07-22")
print(event1)
except ValidationError as e:
print(e)

try:
Event(name="Bad Event", start_date="2025-07-25", end_date="2025-07-23")
except ValidationError as e:
print(e)

6. 入れ子モデル(ネスト構造)

from pydantic import BaseModel
from typing import List

class Address(BaseModel):
street: str
city: str
zip_code: str

class Customer(BaseModel):
customer_id: int
name: str
shipping_addresses: List[Address]

customer_data = {
"customer_id": 123,
"name": "Jane Doe",
"shipping_addresses": [
{"street": "123 Main St", "city": "Anytown", "zip_code": "12345"},
{"street": "456 Oak Ave", "city": "Otherville", "zip_code": "67890"}
]
}

customer = Customer.model_validate(customer_data)
print(customer)

7. JSON Schemaの自動生成

from pydantic import BaseModel

class Task(BaseModel):
id: int
title: str
completed: bool = False

print(Task.model_json_schema(indent=2))

おわりに

Pydanticは以下のような用途に特に有効です:

  • APIのリクエスト・レスポンスモデル(FastAPIとの統合が特に強力)

  • 設定ファイルや外部入力の安全な読み込み

  • 型安全で自己文書化されたデータモデルの構築

  • 静的型チェックが弱いPythonにおいて、Pydanticは強力な型バリデーションと開発者体験を提供してくれます。

参考リンク

document on how to use it.

Python Type Hint

· 約2分
Mikyan
白い柴犬

Since Python 3.5, Python introduced Type hint. And it become more and more powerful.

With it You can set the type for your variable for readibility.

Type hints are hints, not enforcements. Python still runs the code even if types don't match.

Usage

# Primitives
name: str = "Tom"
age: int = 30
salary: float = 500.5
is_active: bool = True

# Collections
numbers: list = [1,2,3]
scores: tuple = (90, 85, 88)
unique: set = {1, 2, 3}
data: dict = {"key": "value"}


# Specific Collection Types

from typing import List, Dict, Tuple, Set

names: List[str] = ["Alice", "Bob", "Charlie"]
user: Dict[str, str] = {
"name": "John",
"email": "[email protected]"
}
person: Tuple[str, int, bool] = ("Alice", 30, True)
unique_ids: Set[int] = {1, 2, 3, 4, 5}

# after python 3.9 the following are also work
names: list[str] = ["Alice", "Bob", "Charlie"]
user: dict[str, str] = {
"name": "John",
"email": "[email protected]"
}person: tuple[str, int, bool] = ("Alice", 30, True)
unique_ids: set[int] = {1, 2, 3, 4, 5}

# Optional

from typing import Optional

# can be string or None
middle_name: Optional[str] = None

# Union
from typing import Union
number: Union[int, float] = 10
number = 10.5


# Literal for exact values
from typing import Literal
Status = Literal["pending", "approved", "rejected"]

def process_order(status: Status) -> None:
pass

# TypedDict
from typing import TypedDict
# TypedDict for dictionary structures
class UserDict(TypedDict):
name: str
age: int
email: str


# Class
user: User = get_user(123)

# method
def calculate_bmi(weight: float, height: float) -> float:
return weight / (height ** 2)

# Self
from typing import Self

class User:
def copy(self) -> Self: # Returns same class type
return User()

Java Development Setup, SDKMAN, Gradle, VS Code

· 約2分
Mikyan
白い柴犬

SDKMAN! is a popular Java version manager for developers. It allows you to easily install, switch between, and manage multiple Java versions and development tools.

Install SDKMAN!

# install
curl -s "https://get.sdkman.io" | bash
# init
source "$HOME/.sdkman/bin/sdkman-init.sh"

Install Java

# list available Java versions
sdk list java

# Install Java 21
sdk install Java 21.0.1-tem

# Set as default
sdk default java 21.0.1-tem

# Verify installation
java -version

Fix JAVA_HOME Environment Variable

SDKMAN! sometimes doesn't set JAVA_HOME automatically. Add this to your shell configuration:

# For zsh (default on newer macOS)
echo 'export JAVA_HOME="$SDKMAN_DIR/candidates/java/current"' >> ~/.zshrc
echo 'export PATH="$JAVA_HOME/bin:$PATH"' >> ~/.zshrc
source ~/.zshrc

# Verify JAVA_HOME is set
echo $JAVA_HOME

Install Gradle

sdk install gradle

gradle --verion

Install Spring boot

sdk install springboot

Start coding Java with VSCode

Install the Extension Pack for Java which includes 7 essential extensions:

  • Language Support for Java
  • Debugger for Java
  • Test Runner for Java
  • Maven for Java
  • Project Manager for Java
  • Visual Studio IntelliCode
  • Gradle for Java

Update user settings: Command + Shift + P > Open user settings (JSON) with the following settings:

{
// Java Runtime Configuration
"java.jdt.ls.java.home": "${env:JAVA_HOME}",
"java.configuration.runtimes": [
{
"name": "JavaSE-21",
"path": "${env:JAVA_HOME}",
"default": true
}
],
// Java Language Server Settings
"java.configuration.detectJdksAtStart": true,
"java.configuration.updateBuildConfiguration": "automatic",
"java.server.launchMode": "Standard",
"java.autobuild.enabled": true,
"java.maxConcurrentBuilds": 1,

// Code Completion and Analysis
"java.completion.enabled": true,
"java.completion.guessMethodArguments": true,
"java.completion.favoriteStaticMembers": [
"org.junit.jupiter.api.Assertions.*",
"org.mockito.Mockito.*",
"org.springframework.test.web.servlet.request.MockMvcRequestBuilders.*",
"org.springframework.test.web.servlet.result.MockMvcResultMatchers.*"
],
"java.compile.nullAnalysis.mode": "automatic",
"java.sources.organizeImports.staticStarThreshold": 5,
"java.sources.organizeImports.starThreshold": 5,

// Code Formatting
"java.format.enabled": true,
"java.format.settings.url": "https://raw.githubusercontent.com/google/styleguide/gh-pages/eclipse-java-google-style.xml",
"java.format.settings.profile": "GoogleStyle",
"java.format.comments.enabled": true,

// Gradle Settings
"java.import.gradle.enabled": true,
"java.import.gradle.wrapper.enabled": true,
"java.import.gradle.offline.enabled": false,
"java.import.gradle.arguments": "--refresh-dependencies",
"gradle.javaDebug.cleanOutput": true,
"gradle.debug": false,

// Testing
"java.test.defaultConfig": "",
"java.test.config": {},

// Debugging
"java.debug.logLevel": "warn",
"java.debug.settings.enableRunDebugCodeLens": true,
"java.debug.settings.showHex": false,
"java.debug.settings.showStaticVariables": false,
"java.debug.settings.showQualifiedNames": false,
"java.debug.settings.maxStringLength": 0,
"java.debug.settings.exceptionBreakpoint.skipClasses": [
"$JDK",
"junit.*",
"org.junit.*",
"org.springframework.*"
],

// Code Lens
"java.references.includeAccessors": true,
"java.references.includeDecompiledSources": true,
"java.implementationsCodeLens.enabled": true,
"java.referencesCodeLens.enabled": true,

// Error Reporting and Telemetry
"java.errors.incompleteClasspath.severity": "warning",
"java.telemetry.enabled": false,

// Performance
"java.jdt.ls.vmargs": "-XX:+UseParallelGC -XX:GCTimeRatio=4 -XX:AdaptiveSizePolicyWeight=90 -Dsun.zip.disableMemoryMapping=true -Xmx2G -Xms100m",

// Code Completion Filtering (keep packages you don't commonly use out of suggestions)
"java.completion.filteredTypes": [
"java.awt.*",
"com.sun.*",
"sun.*",
"jdk.*",
"org.graalvm.*",
"io.micrometer.shaded.*",
"javax.swing.*"
],

// Spring Boot specific (useful for microservices)
"spring-boot.ls.checkjvm": false,
"spring.initializr.defaultLanguage": "Java",
"spring.initializr.defaultJavaVersion": "21",
"spring.initializr.defaultPackaging": "Jar",
}

Java Sealed

· 約2分
Mikyan
白い柴犬

After Java 8, the interface of java have become more and more powerful.

  • (Java 8) Default methods Provide optional functionality to all implementations
  • (Java 8) Static methods Utility methods that belong to the interface contract itself, CANNOT be overridden. utility functions that don't need instance context
  • (Java 9) Private methods: keeping internal helper logic hidden from implementers
  • Constants: Shared Contract Values public static final cannot be overriden
  • (Java 17) Sealed: Sealed means you can control exactly which classes can implement your interface (or extend your class). closed hierarchy, This enables better pattern matching, safer APIs, and clearer domain modeling!
  • (Java 8) @FunctionalInterface indicate this interface supports lambda.

Example:

public interface PaymentProcessor {
// Abstract method - must be implemented
void processPayment(double amount);

// Default method - optional to override
default void logTransaction(double amount) {
System.out.println("Processing payment of: $" + amount);
// Common logging logic here
}

// Another default method
default boolean validateAmount(double amount) {
return amount > 0 && amount < 10000;
}
}
public abstract class AbstractPaymentProcessor {
private int defaultAmount = 500;
// Abstract method - must be implemented
void processPayment(double amount);

// Default method - optional to override
void logTransaction(double amount) {
System.out.println("Processing payment of: $" + amount);
// Common logging logic here
}

// Another default method
boolean validateAmount(double amount) {
return amount > 0 && amount < 10000;
}
}

Java Interface powerful features

· 約2分
Mikyan
白い柴犬

After Java 8, the interface of java have become more and more powerful.

  • (Java 8) Default methods Provide optional functionality to all implementations
  • (Java 8) Static methods Utility methods that belong to the interface contract itself, CANNOT be overridden. utility functions that don't need instance context
  • (Java 9) Private methods: keeping internal helper logic hidden from implementers
  • Constants: Shared Contract Values public static final cannot be overriden
  • (Java 17) Sealed: Sealed means you can control exactly which classes can implement your interface (or extend your class). closed hierarchy, This enables better pattern matching, safer APIs, and clearer domain modeling!
  • (Java 8) @FunctionalInterface indicate this interface supports lambda.

Example:

public interface PaymentProcessor {
// Abstract method - must be implemented
void processPayment(double amount);

// Default method - optional to override
default void logTransaction(double amount) {
System.out.println("Processing payment of: $" + amount);
// Common logging logic here
}

// Another default method
default boolean validateAmount(double amount) {
return amount > 0 && amount < 10000;
}
}
public abstract class AbstractPaymentProcessor {
private int defaultAmount = 500;
// Abstract method - must be implemented
void processPayment(double amount);

// Default method - optional to override
void logTransaction(double amount) {
System.out.println("Processing payment of: $" + amount);
// Common logging logic here
}

// Another default method
boolean validateAmount(double amount) {
return amount > 0 && amount < 10000;
}
}

Javascriptのクロージャについて

· 約2分
Mikyan
白い柴犬

クロージャはJavascriptの囲まれた関数とその周囲のレキシカル環境です。 クロージャは関数にその外側のスコープにアクセスする機能を提供します。

クロージャを使うことで、プライベートのステートを隠すことができる 非同期のコードのステートを保持できる

よく使う場所は、Function Factories Callback、Eventハンドラー、Moduleパータンではクロージャはよく使える。

プライベートの環境を関数に提供できます。

ユースケース

実はLodash関数は大量にクロージャを使っています。

memorize

function memorize(func) {
const cache = new Map();
return function (...args) {
const key = args[0];
if (cache.has(key)) {
return cache.get(key);
}
const result = func.apply(this, args);
cache.set(key, result);
return result;
}
}
function useState(init: string) {
let value = init;
function setValue(newValue: string) {
let value = newValue;
}
return [value, setValue]
}
function useEffect(func, dependency) {
const executed = new Map();
if (executed[dependency.join(',')]) {
return ;
} else {
executed[dependency.join(',')] = true;
func();
}
return ;
}

function executeUseEffect() {
let a = 1, b = 2;
useEffect(() {
console.log(a + b)
}, [a, b]);
useEffect(() {
console.log(a + b)
}, [a, b]);

b = 3;
useEffect(() {
console.log(a + b)
}, [a, b]);
}

Node.jsからプロセス実行:child_processのガイド

· 約14分
Mikyan
白い柴犬

Node.jsで外部プロセスを扱う際は、具体的なニーズに基づいて適切な手法を選択してください:

  • exec()を使用 - 短時間実行で小さな出力のコマンドをシェル環境で実行したい場合
  • spawn()を使用 - 長時間実行のプロセス、大きな出力、stdinで入力コントロール、またはリアルタイムでのデータストリーミングが必要な場合
  • execFile()を使用 - シェルを介さずに特定のファイルを実行してセキュリティを向上させたい場合
  • fork()を使用 - Node.jsモジュールを別プロセスでIPC通信とともに実行する場合
  • ユーザー入力の必須サニタイズ - シェルモードでコマンド実行時はインジェクション攻撃を防ぐため
  • util.promisify()の活用を検討 - コールバックベースのメソッドをPromiseに変換してasync/await構文をよりクリーンに使用

プロセスと子プロセスの紹介

プロセスとは?

プロセスとは、実行中のプログラムのインスタンスです。システム上のすべてのアプリケーションは一つ以上のプロセスとして動作し、それぞれが独自のメモリ空間とシステムリソースを持ちます。Node.jsでは、アプリケーションは固有のプロセスID(PID)を持つ単一のプロセスとして動作します。

子プロセスの理解

子プロセスとは、既存のプロセス(親プロセス)によって作成された新しいプロセスです。Node.jsの文脈では:

  • メインのNode.jsアプリケーションが親プロセス
  • 実行する外部プログラムが子プロセスになる
  • 子プロセスは独自のPIDとメモリ空間を持つ
  • 標準I/Oストリーム(stdin、stdout、stderr)を通じて通信が行われる

なぜ子プロセスを使うのか?

子プロセスを使用することで以下が可能になります:

  • システムコマンドや外部プログラムの実行
  • メインイベントループをブロックせずにCPU集約的なタスクを実行
  • 他の言語で書かれた既存のツールやユーティリティの活用
  • 複数のコアにわたってアプリケーションをスケール

Node.js child_process API概要

Node.jsは、子プロセスを作成するための4つの主要メソッドを持つ組み込みchild_processモジュールを提供します:

const { spawn, exec, execFile, fork } = require('child_process');

ChildProcessクラス

すべての子プロセスメソッドはChildProcessクラスのインスタンスを返します。このクラスはEventEmitterを拡張し、以下を提供します:

  • イベントハンドリング: 'exit'、'close'、'error'、'disconnect'
  • ストリームアクセス: stdinstdoutstderrプロパティ
  • プロセス制御: kill()disconnect()メソッド
  • プロセス情報: pidconnectedexitCodeプロパティ
const child = spawn('ls', ['-la']);

child.stdout.on('data', (data) => {
console.log(`stdout: ${data}`);
});

child.on('exit', (code) => {
console.log(`プロセスが終了しました。終了コード: ${code}`);
});

共通オプション

ほとんどの子プロセスメソッドは、共通プロパティを持つオプションオブジェクトを受け入れます:

const options = {
cwd: '/path/to/working/directory', // 作業ディレクトリ
env: { ...process.env, NODE_ENV: 'production' }, // 環境変数
shell: true, // シェルで実行
timeout: 5000, // 最大実行時間
maxBuffer: 1024 * 1024, // stdout/stderrの最大バッファサイズ
stdio: 'inherit' // stdin/stdout/stderrの処理方法
};

メソッド間の類似点

戻り値の型(非同期)

すべての非同期メソッドはChildProcessインスタンスを返します:

const child1 = spawn('echo', ['hello']);
const child2 = exec('echo hello');
const child3 = execFile('echo', ['hello']);
const child4 = fork('./child-script.js');

同期バリアント

ほとんどのメソッドには実行をブロックする同期版があります:

  • spawnSync()
  • execSync()
  • execFileSync()

同期メソッドは結果オブジェクトを返します:

{
pid: 12345,
output: [], // stdioの結果配列
stdout: '', // 標準出力(BufferまたはString)
stderr: '', // 標準エラー(BufferまたはString)
status: 0, // 終了コード(シグナルで終了した場合はnull)
signal: null, // プロセスを終了させたシグナル
error: undefined // プロセスが失敗した場合のErrorオブジェクト
}

イベント駆動アーキテクチャ

すべての子プロセスは同じ主要イベントを発火します:

child.on('spawn', () => {});     // プロセス開始
child.on('exit', (code, signal) => {}); // プロセス終了
child.on('error', (error) => {}); // エラー発生
child.on('close', (code, signal) => {}); // すべてのstdioストリームが閉じられた

メソッド間の違い

spawn()

目的: ストリーミングI/Oを使用した低レベルプロセス作成

特徴:

  • デフォルトではシェルを使用しない
  • 即座にChildProcessを返す
  • リアルタイムでデータをストリーミング
  • 自動バッファリングなし
  • 最もメモリ効率が良い
const child = spawn('grep', ['pattern'], {
stdio: ['pipe', 'pipe', 'pipe']
});

child.stdout.on('data', (data) => {
console.log(`発見: ${data}`);
});

最適な用途: 長時間実行プロセス、大きな出力、リアルタイムデータ処理

exec()

目的: バッファ出力を持つシェルでのコマンド実行

特徴:

  • 常にシェルを使用
  • 出力全体をメモリにバッファ
  • 完全なstdout/stderrを持つコールバックを提供
  • maxBuffer制限の対象
  • シェル機能(パイプ、リダイレクトなど)をサポート
exec('ls -la | grep .js', (error, stdout, stderr) => {
if (error) {
console.error(`エラー: ${error}`);
return;
}
console.log(`ファイル: ${stdout}`);
});

最適な用途: シンプルなシェルコマンド、小さな出力、一回限りの操作

注意: exec()も他のメソッド同様ChildProcessインスタンスを返し、stdin、stdout、stderrのストリームデータを取得できますが、内部的なBuffer処理により、大きな出力ではパフォーマンスに影響を与える可能性があります。

execFile()

目的: シェルを介さない直接的なファイル実行

特徴:

  • デフォルトではシェルを使用しない
  • exec()よりも安全
  • exec()と同様に出力をバッファ
  • 直接的なファイル実行
  • shell: trueオプションでシェルを有効化可能
execFile('node', ['--version'], (error, stdout, stderr) => {
if (error) {
console.error(`エラー: ${error}`);
return;
}
console.log(`Nodeバージョン: ${stdout}`);
});

最適な用途: プログラムの安全な実行、シェルインジェクションの回避

fork()

目的: IPC通信を持つ新しいNode.jsプロセスの作成

特徴:

  • Node.jsモジュール専用
  • IPC(プロセス間通信)チャンネルを確立
  • Node.js環境を継承
  • 組み込みメッセージパッシング
  • 独立したV8インスタンス
// parent.js
const child = fork('./worker.js');
child.send({ task: 'process-data', data: largeDataSet });
child.on('message', (result) => {
console.log('受信:', result);
});

// worker.js
process.on('message', (msg) => {
if (msg.task === 'process-data') {
const result = processData(msg.data);
process.send({ result });
}
});

最適な用途: CPU集約的なタスク、Node.jsモジュール実行、ワーカープロセス

実践的な使用例

タイムアウト処理

長時間実行されるプロセスには適切なタイムアウトを設定しましょう:

const { spawn } = require('child_process');

function runWithTimeout(command, args, timeoutMs = 5000) {
return new Promise((resolve, reject) => {
const child = spawn(command, args);
let output = '';
let errorOutput = '';

// タイムアウトタイマー設定
const timer = setTimeout(() => {
child.kill('SIGTERM');
reject(new Error(`プロセスがタイムアウトしました: ${timeoutMs}ms`));
}, timeoutMs);

child.stdout.on('data', (data) => {
output += data.toString();
});

child.stderr.on('data', (data) => {
errorOutput += data.toString();
});

child.on('close', (code) => {
clearTimeout(timer);
if (code === 0) {
resolve(output);
} else {
reject(new Error(`プロセスが異常終了: コード ${code}, エラー: ${errorOutput}`));
}
});

child.on('error', (error) => {
clearTimeout(timer);
reject(error);
});
});
}

// 使用例
runWithTimeout('ping', ['-c', '3', 'google.com'], 10000)
.then(output => console.log('結果:', output))
.catch(error => console.error('エラー:', error.message));

AbortControllerを使用したプロセス中断

Node.js 15+では、AbortControllerを使用してプロセスを中断できます:

const { spawn } = require('child_process');
const { AbortController } = require('abort-controller');

async function runWithAbort(command, args, signal) {
return new Promise((resolve, reject) => {
const child = spawn(command, args);
let output = '';

// AbortSignalが発火したときの処理
if (signal) {
signal.addEventListener('abort', () => {
child.kill('SIGTERM');
reject(new Error('プロセスが中断されました'));
});
}

child.stdout.on('data', (data) => {
output += data.toString();
});

child.on('close', (code) => {
if (code === 0) {
resolve(output);
} else {
reject(new Error(`プロセスが失敗: ${code}`));
}
});

child.on('error', reject);
});
}

// 使用例
const controller = new AbortController();
const { signal } = controller;

// 5秒後に中断
setTimeout(() => controller.abort(), 5000);

runWithAbort('sleep', ['10'], signal)
.then(output => console.log('完了:', output))
.catch(error => console.error('エラー:', error.message));

対話型プロセスの処理

stdinを使用してプロセスと対話する例:

const { spawn } = require('child_process');

function interactiveProcess() {
const child = spawn('node', ['-i'], {
stdio: ['pipe', 'pipe', 'pipe']
});

// コマンドを送信
child.stdin.write('console.log("Hello from child process");\n');
child.stdin.write('process.version;\n');
child.stdin.write('.exit\n');

child.stdout.on('data', (data) => {
console.log('出力:', data.toString());
});

child.stderr.on('data', (data) => {
console.error('エラー:', data.toString());
});

child.on('close', (code) => {
console.log(`対話型プロセスが終了: ${code}`);
});
}

interactiveProcess();

環境変数とワーキングディレクトリの設定

const { spawn } = require('child_process');
const path = require('path');

function runWithCustomEnv() {
const child = spawn('node', ['-e', 'console.log(process.env.CUSTOM_VAR, process.cwd())'], {
cwd: path.join(__dirname, 'subdir'), // 作業ディレクトリを変更
env: {
...process.env,
CUSTOM_VAR: 'カスタム値',
NODE_ENV: 'development'
}
});

child.stdout.on('data', (data) => {
console.log('出力:', data.toString());
});

child.on('error', (error) => {
console.error('実行エラー:', error);
});
}

runWithCustomEnv();

大量データの効率的な処理

大きなファイルやストリームを処理する際のパフォーマンス最適化:

const { spawn } = require('child_process');
const fs = require('fs');

function processLargeFile(inputFile, outputFile) {
return new Promise((resolve, reject) => {
const child = spawn('gzip', ['-c'], {
stdio: ['pipe', 'pipe', 'pipe']
});

const readStream = fs.createReadStream(inputFile);
const writeStream = fs.createWriteStream(outputFile);

// ストリームをパイプで接続
readStream.pipe(child.stdin);
child.stdout.pipe(writeStream);

// エラーハンドリング
readStream.on('error', reject);
writeStream.on('error', reject);
child.stderr.on('data', (data) => {
console.error('gzipエラー:', data.toString());
});

child.on('close', (code) => {
if (code === 0) {
resolve(`ファイル圧縮完了: ${outputFile}`);
} else {
reject(new Error(`圧縮失敗: ${code}`));
}
});
});
}

// 使用例
processLargeFile('large-file.txt', 'large-file.txt.gz')
.then(message => console.log(message))
.catch(error => console.error(error));

プロセスプールパターン

複数の同時実行プロセスを管理:

const { spawn } = require('child_process');

class ProcessPool {
constructor(maxConcurrency = 3) {
this.maxConcurrency = maxConcurrency;
this.running = new Set();
this.queue = [];
}

async execute(command, args) {
return new Promise((resolve, reject) => {
const task = { command, args, resolve, reject };

if (this.running.size < this.maxConcurrency) {
this.runTask(task);
} else {
this.queue.push(task);
}
});
}

runTask(task) {
const { command, args, resolve, reject } = task;
const child = spawn(command, args);

this.running.add(child);
let output = '';
let errorOutput = '';

child.stdout.on('data', (data) => {
output += data.toString();
});

child.stderr.on('data', (data) => {
errorOutput += data.toString();
});

child.on('close', (code) => {
this.running.delete(child);

if (code === 0) {
resolve(output);
} else {
reject(new Error(`プロセス失敗: ${errorOutput}`));
}

// キューから次のタスクを実行
if (this.queue.length > 0) {
const nextTask = this.queue.shift();
this.runTask(nextTask);
}
});

child.on('error', (error) => {
this.running.delete(child);
reject(error);

if (this.queue.length > 0) {
const nextTask = this.queue.shift();
this.runTask(nextTask);
}
});
}
}

// 使用例
const pool = new ProcessPool(2);

const tasks = [
pool.execute('echo', ['タスク1']),
pool.execute('echo', ['タスク2']),
pool.execute('echo', ['タスク3']),
pool.execute('echo', ['タスク4'])
];

Promise.all(tasks)
.then(results => {
console.log('すべてのタスク完了:', results);
})
.catch(error => {
console.error('タスクエラー:', error);
});

セキュリティ上の考慮事項

シェルインジェクション防止

シェルモードを使用する際は、常にユーザー入力をサニタイズしてください:

// ❌ 危険 - これは絶対にやってはいけません
const userInput = req.body.filename;
exec(`cat ${userInput}`, callback); // インジェクション攻撃に脆弱

// ✅ 安全 - spawnで配列引数を使用
spawn('cat', [userInput], callback);

// ✅ 安全 - 入力を検証・サニタイズ
const safeFilename = path.basename(userInput).replace(/[^a-zA-Z0-9.-]/g, '');
exec(`cat ${safeFilename}`, callback);

シェルインジェクションに脆弱なメソッド

  • exec() (常にシェルを使用)
  • spawn() with shell: true
  • execFile() with shell: true

より安全な代替案

  • シェルモードなしでspawn()またはexecFile()を使用
  • すべてのユーザー入力を検証・サニタイズ
  • 許可される値のホワイトリストを使用
  • 適切なエスケープのためにshell-escapeなどのライブラリを検討

モダンなAsync/Await使用法

コールバックベースのメソッドをPromiseに変換:

const { promisify } = require('util');
const execAsync = promisify(exec);

async function getNodeVersion() {
try {
const { stdout } = await execAsync('node --version');
return stdout.trim();
} catch (error) {
console.error('Nodeバージョンの取得に失敗:', error);
throw error;
}
}

最終結論

Node.jsのchild_processモジュールは、外部プロセスを実行するための強力なツールを提供します。ニーズに基づいて適切なメソッドを選択してください:

  • パフォーマンス重視のアプリケーション: ストリーミング機能と低メモリフットプリントのためspawn()を使用
  • シンプルなシェルコマンド: 小さな出力での便利さのためexec()を使用
  • セキュリティ意識の高いアプリケーション: シェルモードなしでexecFile()またはspawn()を選択
  • Node.jsスケーリング: 並列化可能なCPU集約的タスクにはfork()を使用

特にユーザー入力を扱う際は常にセキュリティへの影響を考慮し、よりクリーンで保守しやすいコードのためにモダンなasync/awaitパターンを活用することを忘れないでください。