跳至主要内容

[note] Python SQLAlchemy & Alembic

[toc]

參考資料

以 PostgreSQL 為例

Prerequisites

安裝需要用到的套件

$ poetry add SQLAlchemy psycopg2-binary

使用 Docker 啟動 PostgreSQL

$ docker run --name postgres -e POSTGRES_PASSWORD=postgres -e POSTGRES_USER=admin -e POSTGRES_DB=dev -p 5432:5432 -d postgres

使用 Alembic

CLI

# 初始化 alembic
$ alembic init alembic

# 建立 migration script 來追蹤 database schema 的改變
$ alembic revision --autogenerate -m "initial migration"

# alembic upgrade <revision>
$ alembic upgrade head # 除了用 head(latest revision)外,也可以用 +2 或 revision_id

# alembic downgrade <revision>
$ alembic downgrade -1 # 也可以用 revision_id 或 base(回到最初始的狀態)

$ alembic heads # 顯示目前所有的 heads
$ alembic merge heads # merge heads

$ alembic current # 顯示目前 migration 的狀態
$ alembic history # 顯示 migration history

修改 alembic 中的 env.py 檔

/alembic/env.py 中需要

  • 定義連線到 database 的方法
  • 讓 alembic 可以讀到 database 的 metadata(target_metadata
diff --git a/alembic/env.py b/alembic/env.py
index dd0f6ba..346ea44 100644
--- a/alembic/env.py
+++ b/alembic/env.py
@@ -1,6 +1,7 @@
from logging.config import fileConfig

-from sqlalchemy import engine_from_config
+from environs import Env
+from sqlalchemy import engine_from_config, URL
from sqlalchemy import pool

from alembic import context
@@ -14,11 +15,29 @@ config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)

+env = Env()
+env.read_env(".env")
+
+url = URL.create(
+ drivername="postgresql",
+ username=env.str("POSTGRES_USER"),
+ password=env.str("POSTGRES_PASSWORD"),
+ host=env.str("DATABASE_HOST"),
+ database=env.str("POSTGRES_DB"),
+ port=5432
+)
+
+config.set_main_option(
+ "sqlalchemy.url", url.render_as_string(hide_password=False)
+)
+
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
-target_metadata = None
+import lesson_2
+
+target_metadata = lesson_2.Base.metadata

使用 SQLAlchemy

官方文件

Execute SQL in the App

與 database 建立連線

  • 建立 connection URL
  • 建立 engine
  • 建立 session
from sqlalchemy import URL, create_engine, text
from sqlalchemy.orm import sessionmaker

url = URL.create(
drivername="postgresql",
username="admin",
password="postgres",
host="localhost",
database="dev",
port=5432
)

url.render_as_string(hide_password=False) # postgresql://admin:***@localhost:5432/dev

engine = create_engine(url, echo=True)
Session = sessionmaker(engine)

with Session() as session:
# do something with the session...

使用 session 執行 raw SQL

# ...
Session = sessionmaker(engine)

# Create Table
with Session() as session:
query = text("""
CREATE TABLE IF NOT EXISTS users (
telegram_id BIGINT PRIMARY KEY,
full_name VARCHAR(255) NOT NULL,
username VARCHAR(255),
language_code VARCHAR(255) NOT NULL,
created_at TIMESTAMP DEFAULT NOW(),
referred_id BIGINT,
FOREIGN KEY (referred_id) REFERENCES users(telegram_id) ON DELETE SET NULL
);
""")
session.execute(query)
session.commit()

# Insert Data
with Session() as session:
insert_query = text("""
INSERT INTO users (telegram_id, full_name, username, language_code, referred_id)
VALUES (1, 'John Doe', 'john_doe', 'en', NULL),
(2, 'Jane Doe', 'jane_doe', 'en', 1);
""")

session.execute(insert_query)
session.commit()

# Select Query
with Session() as session:
select_query = text("""
SELECT * FROM users;
""")
result = session.execute(select_query)
for row in result:
print(row)

在執行 execute() 後,會得到 ResultProxy 的物件,這個物件用來表示 database 的 cursor,讓我們可以存取資料夾中的資料:

with Session() as session:
result = session.execute(text("SELECT * FROM users;"))
print(
f"execute result: {result}") # <sqlalchemy.engine.cursor.CursorResult object at 0x102d562e0>

fetchall_result = session.execute(text("SELECT * FROM users;")).scalar()
print(
f"fetchall_result: {fetchall_result}"
) # fetchall_result: [(1, 'John Doe', 'john_doe', 'en', datetime.datetime(2021, 9, 20, 12, 41, 53, 122000), None), (2, 'Jane Doe', 'jane_doe', 'en', datetime.datetime(2021, 9, 20, 12, 41, 53, 122000), 1)]

fetchone_result = session.execute(text("SELECT * FROM users;")).fetchone()
print(f"fetchone_result: {fetchone_result}")

first_result = session.execute(text("SELECT * FROM users;")).first()
print(f"first_result: {first_result}")

scalar_result = session.execute(
text("SELECT username FROM users WHERE telegram_id = :telegram_id"),
{"telegram_id": 1}).scalar()
print(f"scalar_result: {scalar_result}") # scalar_result: john_doe

scalar_result = session.execute(text("SELECT COUNT(*) FROM users;")).scalar()
print(f"scalar_result: {scalar_result}") # scalar_result: 2

SELECT

scalars

當 stmt 一樣的時候,如果使用的是:

  • session.execute(stmt).all():資料會是 list of tuple,tuple 裡的第一個元素才會是 object

    • 如果是 SELECT ALL 的情況,則 tuple 裡的第一個元素會是 Class Object
    • 如果是 SELECT 特定 Column 的情況,則 tuple 裡的第一個元素會是 SELECT 出來的第一個 Column
  • session.scalars(stmt).all():會「把 tuple 中的第一個元素(object)」取出來,所以資料會是 list of object。

    • 效果等同於 session.execute(stmt).scalars().ll()
    • 如果用的不是 Select All,而是選出特定的 Columns 而已,則要留意可能只能拿到第一個 Column 的值
stmt = select(Campaign).where(Campaign.id == "1").limit(1)

# [(Campaign(),), (Campaign(),), (Campaign(),)]
rows = session.execute(stmt).fetchAll()
for row in rows:
campaign = row[0]
pprint(campaign.__dict__)

# [Campaign(), Campaign(), Campaign()]
campaigns = session.scalars(stmt).all()
for campaign in campaigns:
pprint(campaign.__dict__)
def get_users(user_ids: list[str]):
session = ClickHouse().session
# users = session.query(User).filter(User.id.in_(user_ids)).all()

"""
Select Columns
"""
stmt = select(User.id, User.name).where(User.id.in_(user_ids))

# [('0f04989d-0fda-4392-a5a9-9e18e22bb851', 'Aaron'), ...]
users = session.execute(stmt).all()

# ['0f04989d-0fda-4392-a5a9-9e18e22bb851', ...]
users = session.scalars(stmt).all()

# ['0f04989d-0fda-4392-a5a9-9e18e22bb851', ...]
users = session.execute(stmt).scalars().all()

"""
Select All
"""
# stmt = select(User).where(User.id.in_(user_ids))

# [(<model.user.User object at 0x104e6c350>,) ...]
users = session.execute(stmt).all()

# [<model.user.User object at 0x106b624b0>, ...]
users = session.scalars(stmt).all()

# [<model.user.User object at 0x1070186e0>, ...]
users = session.execute(stmt).scalars().all()

return users

建立 Model

建立 User model:

  • 使用 DeclarativeBase 建立 Base Model
  • 針對 Table 如果有重複的操作,像是都需要添加特定的欄位,則可以使用 Mixin,例如,一旦繼承了這裡的 TimestampMixin,該 Table 就會長出 created_atupdated_at 的欄位
  • 針對 Column(field)如果有重複的操作,像是設定成 PK,則可以使用 Annotated
import datetime
from typing import Optional, Annotated

from sqlalchemy import BIGINT, TIMESTAMP, func, ForeignKey, INTEGER, String, DECIMAL
from sqlalchemy.dialects.mysql import VARCHAR
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column


# 建立 Base Model
class Base(DeclarativeBase):
pass


# 使用 Mixin:套用這個 mixin 的 table 就會產生對應的欄位
class TimestampMixin:
created_at: Mapped[datetime] = mapped_column(TIMESTAMP, server_default=func.now())
updated_at: Mapped[datetime] = mapped_column(
TIMESTAMP,
server_default=func.now(),
onupdate=func.now(),
)


# 使用 Annotated:如果有許多欄位的「格式」都是一樣的,可以建立共用的 Annotated
user_fk = Annotated[
int, mapped_column(BIGINT, ForeignKey("users.telegram_id", ondelete="CASCADE"))
]
int_pk = Annotated[int, mapped_column(INTEGER, primary_key=True)]
str_255 = Annotated[str, mapped_column(String(255))]


class User(Base, TimestampMixin):
__tablename__ = "users"


telegram_id: Mapped[int] = mapped_column(BIGINT, primary_key=True, autoincrement=False)
full_name: Mapped[str_255]
user_name: Mapped[Optional[str_255]]
language_code: Mapped[str] = mapped_column(VARCHAR(10))
referrer_id: Mapped[Optional[user_fk]]


class Order(Base, TimestampMixin):
__tablename__ = "orders"

order_id: Mapped[int_pk]
user_id: Mapped[user_fk]