[note] Python SQLAlchemy & Alembic
[toc]
參考資料
以 PostgreSQL 為例
Prerequisites
安裝需要用到的套件
$ poetry add SQLAlchemy psycopg2-binary
使用 Docker 啟動 PostgreSQL
$ docker run --name postgres -e POSTGRES_PASSWORD=postgres -e POSTGRES_USER=admin -e POSTGRES_DB=dev -p 5432:5432 -d postgres
使用 Alembic
CLI
# 初始化 alembic
$ alembic init alembic
# 建立 migration script 來追蹤 database schema 的改變
$ alembic revision --autogenerate -m "initial migration"
# alembic upgrade <revision>
$ alembic upgrade head # 除了用 head(latest revision)外,也可以用 +2 或 revision_id
# alembic downgrade <revision>
$ alembic downgrade -1 # 也可以用 revision_id 或 base(回到最初始的狀態)
$ alembic current # 顯示目前 migration 的狀態
$ alembic history # 顯示 migration history
修改 alembic 中的 env.py 檔
在 /alembic/env.py
中需要
- 定義連線到 database 的方法
- 讓 alembic 可以讀到 database 的 metadata(
target_metadata
)
diff --git a/alembic/env.py b/alembic/env.py
index dd0f6ba..346ea44 100644
--- a/alembic/env.py
+++ b/alembic/env.py
@@ -1,6 +1,7 @@
from logging.config import fileConfig
-from sqlalchemy import engine_from_config
+from environs import Env
+from sqlalchemy import engine_from_config, URL
from sqlalchemy import pool
from alembic import context
@@ -14,11 +15,29 @@ config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
+env = Env()
+env.read_env(".env")
+
+url = URL.create(
+ drivername="postgresql",
+ username=env.str("POSTGRES_USER"),
+ password=env.str("POSTGRES_PASSWORD"),
+ host=env.str("DATABASE_HOST"),
+ database=env.str("POSTGRES_DB"),
+ port=5432
+)
+
+config.set_main_option(
+ "sqlalchemy.url", url.render_as_string(hide_password=False)
+)
+
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
-target_metadata = None
+import lesson_2
+
+target_metadata = lesson_2.Base.metadata
使用 SQLAlchemy
官方文件
- ORM Quick Start: A glimpse at what working with the ORM looks like
- SQLAlchemy Unified Tutorial: In depth tutorial for Core and ORM
Execute SQL in the App
與 database 建立連線
- 建立 connection URL
- 建立 engine
- 建立 session
from sqlalchemy import URL, create_engine, text
from sqlalchemy.orm import sessionmaker
url = URL.create(
drivername="postgresql",
username="admin",
password="postgres",
host="localhost",
database="dev",
port=5432
)
url.render_as_string(hide_password=False) # postgresql://admin:***@localhost:5432/dev
engine = create_engine(url, echo=True)
Session = sessionmaker(engine)
with Session() as session:
# do something with the session...
使用 session 執行 raw SQL
# ...
Session = sessionmaker(engine)
# Create Table
with Session() as session:
query = text("""
CREATE TABLE IF NOT EXISTS users (
telegram_id BIGINT PRIMARY KEY,
full_name VARCHAR(255) NOT NULL,
username VARCHAR(255),
language_code VARCHAR(255) NOT NULL,
created_at TIMESTAMP DEFAULT NOW(),
referred_id BIGINT,
FOREIGN KEY (referred_id) REFERENCES users(telegram_id) ON DELETE SET NULL
);
""")
session.execute(query)
session.commit()
# Insert Data
with Session() as session:
insert_query = text("""
INSERT INTO users (telegram_id, full_name, username, language_code, referred_id)
VALUES (1, 'John Doe', 'john_doe', 'en', NULL),
(2, 'Jane Doe', 'jane_doe', 'en', 1);
""")
session.execute(insert_query)
session.commit()
# Select Query
with Session() as session:
select_query = text("""
SELECT * FROM users;
""")
result = session.execute(select_query)
for row in result:
print(row)
在執行 execute()
後,會得到 ResultProxy
的物件,這個物件用來表示 database 的 cursor,讓我們可以存取資料夾中的資料:
with Session() as session:
result = session.execute(text("SELECT * FROM users;"))
print(
f"execute result: {result}") # <sqlalchemy.engine.cursor.CursorResult object at 0x102d562e0>
fetchall_result = session.execute(text("SELECT * FROM users;")).scalar()
print(
f"fetchall_result: {fetchall_result}"
) # fetchall_result: [(1, 'John Doe', 'john_doe', 'en', datetime.datetime(2021, 9, 20, 12, 41, 53, 122000), None), (2, 'Jane Doe', 'jane_doe', 'en', datetime.datetime(2021, 9, 20, 12, 41, 53, 122000), 1)]
fetchone_result = session.execute(text("SELECT * FROM users;")).fetchone()
print(f"fetchone_result: {fetchone_result}")
first_result = session.execute(text("SELECT * FROM users;")).first()
print(f"first_result: {first_result}")
scalar_result = session.execute(
text("SELECT username FROM users WHERE telegram_id = :telegram_id"),
{"telegram_id": 1}).scalar()
print(f"scalar_result: {scalar_result}") # scalar_result: john_doe
scalar_result = session.execute(text("SELECT COUNT(*) FROM users;")).scalar()
print(f"scalar_result: {scalar_result}") # scalar_result: 2
SELECT
scalars
當 stmt 一樣的時候,如果使用的是:
-
session.execute(stmt).all()
:資料會是 list of tuple,tuple 裡的第一個元素才會是 object- 如果是 SELECT ALL 的情況,則 tuple 裡的第一個元素會是 Class Object
- 如果是 SELECT 特定 Column 的情況,則 tuple 裡的第一個元素會是 SELECT 出來的第一個 Column
-
session.scalars(stmt).all()
:會「把 tuple 中的第一個元素(object)」取出來,所以資料會是 list of object。- 效果等同於
session.execute(stmt).scalars().ll()
- 如果用的不是 Select All,而是選出特定的 Columns 而已,則要留意可能只能拿到第一個 Column 的值
- 效果等同於
stmt = select(Campaign).where(Campaign.id == "1").limit(1)
# [(Campaign(),), (Campaign(),), (Campaign(),)]
rows = session.execute(stmt).fetchAll()
for row in rows:
campaign = row[0]
pprint(campaign.__dict__)
# [Campaign(), Campaign(), Campaign()]
campaigns = session.scalars(stmt).all()
for campaign in campaigns:
pprint(campaign.__dict__)
def get_users(user_ids: list[str]):
session = ClickHouse().session
# users = session.query(User).filter(User.id.in_(user_ids)).all()
"""
Select Columns
"""
stmt = select(User.id, User.name).where(User.id.in_(user_ids))
# [('0f04989d-0fda-4392-a5a9-9e18e22bb851', 'Aaron'), ...]
users = session.execute(stmt).all()
# ['0f04989d-0fda-4392-a5a9-9e18e22bb851', ...]
users = session.scalars(stmt).all()
# ['0f04989d-0fda-4392-a5a9-9e18e22bb851', ...]
users = session.execute(stmt).scalars().all()
"""
Select All
"""
# stmt = select(User).where(User.id.in_(user_ids))
# [(<model.user.User object at 0x104e6c350>,) ...]
users = session.execute(stmt).all()
# [<model.user.User object at 0x106b624b0>, ...]
users = session.scalars(stmt).all()
# [<model.user.User object at 0x1070186e0>, ...]
users = session.execute(stmt).scalars().all()
return users
建立 Model
建立 User
model:
- 使用
DeclarativeBase
建立Base
Model - 針對 Table 如果有重複的操作,像是都需要添加特定的欄位,則可以使用
Mixin
,例如,一旦繼承了這裡的TimestampMixin
,該 Table 就會長出created_at
和updated_at
的欄位 - 針對 Column(field)如果有重複的操作,像是設定成 PK,則可以使用 Annotated
import datetime
from typing import Optional, Annotated
from sqlalchemy import BIGINT, TIMESTAMP, func, ForeignKey, INTEGER, String, DECIMAL
from sqlalchemy.dialects.mysql import VARCHAR
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
# 建立 Base Model
class Base(DeclarativeBase):
pass
# 使用 Mixin:套用這個 mixin 的 table 就會產生對應的欄位
class TimestampMixin:
created_at: Mapped[datetime] = mapped_column(TIMESTAMP, server_default=func.now())
updated_at: Mapped[datetime] = mapped_column(
TIMESTAMP,
server_default=func.now(),
onupdate=func.now(),
)
# 使用 Annotated:如果有許多欄位的「格式」都是一樣的,可以建立共用的 Annotated
user_fk = Annotated[
int, mapped_column(BIGINT, ForeignKey("users.telegram_id", ondelete="CASCADE"))
]
int_pk = Annotated[int, mapped_column(INTEGER, primary_key=True)]
str_255 = Annotated[str, mapped_column(String(255))]
class User(Base, TimestampMixin):
__tablename__ = "users"
telegram_id: Mapped[int] = mapped_column(BIGINT, primary_key=True, autoincrement=False)
full_name: Mapped[str_255]
user_name: Mapped[Optional[str_255]]
language_code: Mapped[str] = mapped_column(VARCHAR(10))
referrer_id: Mapped[Optional[user_fk]]
class Order(Base, TimestampMixin):
__tablename__ = "orders"
order_id: Mapped[int_pk]
user_id: Mapped[user_fk]