Python SQL Alchemy
- Use sqlalchemy library to read/write db
- engine is the database connection gateway
- Base: The Declarative Base for ORM Models
Details
engine is the primary communication hub between your Python application and your actual database.
Connection Management: It's responsible for managing a pool of database connections. Instead of opening and closing a new connection for every single operation (which is slow and resource-intensive), the engine keeps a pool of ready-to-use connections.
Dialect Specifics: It understands the "dialect" of the specific database you're using (e.g., MySQL, PostgreSQL, SQLite). It translates SQLAlchemy's generic commands into the correct SQL syntax for that database.
Statement Execution: It's the underlying component that actually sends SQL statements to the database and receives results.
Transaction Management: It works with sessions to manage transactions.
Base object is the foundation upon which you build your SQLAlchemy ORM models. It links your Python classes to your database tables.
import os
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy.exc import SQLAlchemyError
from contextlib import contextmanager # For a clean session manager
# --- Database Configuration ---
# You'd typically get these from environment variables or a config file
DB_USER = os.environ.get("DB_USER", "myuser")
DB_PASSWORD = os.environ.get("DB_PASSWORD", "mypassword")
DB_HOST = os.environ.get("DB_HOST", "localhost")
DB_PORT = os.environ.get("DB_PORT", "3306")
DB_NAME = os.environ.get("DB_NAME", "my_test_db")
# MySQL connection string using PyMySQL driver
# Format: mysql+pymysql://user:password@host:port/dbname
DATABASE_URL = f"mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
print(f"Attempting to connect to: {DATABASE_URL}")
# --- 1. Create the Engine ---
# The engine manages the connection pool and dialect specifics.
# echo=True is great for debugging; it logs all SQL statements to console.
engine = create_engine(DATABASE_URL, echo=True, pool_pre_ping=True)
# --- 2. Define the Base ---
# Base is the declarative base class that our ORM models will inherit from.
Base = declarative_base()
# --- 3. Define the ORM Model ---
# This Python class maps to a database table.
class User(Base):
__tablename__ = 'users' # The actual table name in the database
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(50), nullable=False)
email = Column(String(100), unique=True, nullable=False)
def __repr__(self):
return f"<User(id={self.id}, name='{self.name}', email='{self.email}')>"
# --- 4. Create the Session Factory ---
# A sessionmaker factory creates Session objects.
# Sessions are the actual interface for database operations (transactions, queries).
Session = sessionmaker(bind=engine)
# --- Context Manager for Session (Best Practice) ---
# This ensures the session is properly closed even if errors occur.
@contextmanager
def get_session():
session = Session()
try:
yield session
session.commit() # Auto-commit on success
except SQLAlchemyError as e:
print(f"Database error occurred: {e}")
session.rollback() # Rollback on error
raise # Re-raise the exception after rollback
finally:
session.close() # Always close the session
# --- CRUD Operations ---
def create_tables():
print("\n--- Creating tables ---")
try:
# Base.metadata contains all table definitions inherited from Base.
# create_all creates these tables in the database linked by the engine.
Base.metadata.create_all(engine)
print("Tables created successfully.")
except SQLAlchemyError as e:
print(f"Error creating tables: {e}")
def create_user(name: str, email: str):
print(f"\n--- Creating user: {name} ({email}) ---")
with get_session() as session:
new_user = User(name=name, email=email)
session.add(new_user)
print(f"Added user: {new_user}")
return new_user
def read_users():
print("\n--- Reading users ---")
with get_session() as session:
users = session.query(User).all() # Query all users
if users:
for user in users:
print(user)
else:
print("No users found.")
return users
def read_user_by_email(email: str):
print(f"\n--- Reading user by email: {email} ---")
with get_session() as session:
user = session.query(User).filter_by(email=email).first() # Query by email
if user:
print(f"Found user: {user}")
else:
print(f"User with email '{email}' not found.")
return user
def update_user_email(user_id: int, new_email: str):
print(f"\n--- Updating user {user_id}'s email to {new_email} ---")
with get_session() as session:
user = session.query(User).filter_by(id=user_id).first()
if user:
old_email = user.email
user.email = new_email
# session.commit() is handled by the context manager on success
print(f"Updated user {user.name} email from {old_email} to {user.email}")
return user
else:
print(f"User with ID {user_id} not found.")
return None
def delete_user(user_id: int):
print(f"\n--- Deleting user with ID: {user_id} ---")
with get_session() as session:
user = session.query(User).filter_by(id=user_id).first()
if user:
session.delete(user)
# session.commit() is handled by the context manager on success
print(f"Deleted user: {user.name} ({user.id})")
return user
else:
print(f"User with ID {user_id} not found.")
return None
# --- Main Execution ---
if __name__ == "__main__":
# Ensure environment variables are set or defaults are acceptable
if "DB_USER" not in os.environ:
print("WARNING: DB_USER environment variable not set. Using default 'myuser'.")
if "DB_PASSWORD" not in os.environ:
print("WARNING: DB_PASSWORD environment variable not set. Using default 'mypassword'.")
# 1. Create tables (only run this once or when schema changes)
create_tables()
# 2. Create some users
user1 = create_user("Alice", "[email protected]")
user2 = create_user("Bob", "[email protected]")
# Try to create user with duplicate email to see error handling
try:
create_user("Charlie", "[email protected]")
except SQLAlchemyError:
print(" (Expected error: Duplicate email caught and rolled back)")
# 3. Read all users
read_users()
# 4. Read a specific user by email
read_user_by_email("[email protected]")
read_user_by_email("[email protected]")
# 5. Update a user
if user1: # Only if user1 was created successfully
update_user_email(user1.id, "[email protected]")
read_user_by_email("[email protected]")
# 6. Delete a user
if user2: # Only if user2 was created successfully
delete_user(user2.id)
read_users() # Show that Bob is gone
# Try deleting a non-existent user
delete_user(999)
print("\n--- All operations complete ---")
# In a real application, the engine would be disposed when the app shuts down.
# For this script, Python will clean it up on exit.