跳至主要内容

Python Getting Started

資料來源
# 在 REPL 環境下載入某支檔案
$ python -i hello.py

Setup & Installation

PIP

官方文件
$ pip list  # 檢視目前電腦上安裝哪些 python package
$ pip install [package] # 安裝套件
$ pip uninstall [package] # 移除套件
$ pip show [package] # 檢視套件資訊

venv

# 建立 virtual env
# python -m venv [virtual_environment_name]
$ python -m venv .venv # 建立名為 .venv 的 virtual environment

# 啟動 virtual env
# source [virtual_environment_name]/bin/activate
$ source .venv/bin/activate

# 進入 virtual env 後,使用 deactivate 可以離開
> deactivate

# 把目前專案有安裝的套件寫進 requirement.txt 中
$ pip freeze > requirements.txt

# 安裝 requirement.txt 中列出的套件
$ pip install -r requirements.txt

# 解除安裝
$ pip uninstall -r requirements.txt -y

Poetry

參考:[note] Python Poetry @ PJCHENder

VSCode

Getting Started with Python in VS Code @ Youtube

Data Structure Basic

Variable

a = 3
type(a) # int

String

##
# Index and Slice
##
'tinker'[1:4] # 'ink'
'tinker'[1:4:2] # 'ik'
'tinker'[::-1] # 'reknit

##
# Formatting with the .format() method
##
'Good {}, {} Chen!'.format('morning', 'Mr.') # 'Good morning, Mr. Chen!'
'My favorite brand is {}, {}, and {}!'.format('Apple', 'Samsung', 'Google') # 'My favorite brand is Apple, Samsung, and Google!'
'My favorite brand is {2}, {1}, and {0}!'.format('Apple', 'Samsung', 'Google') # 'My favorite brand is Google, Samsung, and Apple!'

'Repeat after me: {0}! {0}! {0}!'.format('Ho') # 'Repeat after me: Ho! Ho! Ho!'
'Good {time}, {title} {name}!'.format(time='morning', title='Mr.', name='Chen') # 'Good morning, Mr. Chen!'

##
# Float Formatting
##
result = 100/777 # 0.1287001287001287

# Old Way, {value:width.precision f}
print("The result was {:1.3f}".format(result)) # The result was 0.129

# Formatted String Literals(f-strings)
name = 'Aaron'
age = 33
print(f'Hello, my name is {name}, and I\'m {age} years old.')

List

[0]*3  # [0, 0, 0]

my_list = [1, 3, 2]
another_list = [4, 6, 5]

# array concat
whole_list = my_list + another_list
whole_list # [1, 3, 2, 4, 6, 5]

# sort
whole_list.sort()
whole_list # [1, 2, 3, 4, 5, 6]
whole_list.reverse()
whole_list # [6, 5, 4, 3, 2, 1]

Dictionary

資訊

Mapping Types - Dict @ Python Doc > Built-in Type

Dictionary 基本操作

建立 Dictionary
  • dictionary 的 key 必須要是 hashable object,因此
    • 可以是 string、number、bool、fronzenset、function
    • 不行是 list、set、dictionary,因為它們都是 mutable types
tuple 是 hashable 的嗎?

能不能是 hashable 的實際上是取決於該資料是不是 immutable 的,因此 tuple 不是 hashable 的,取決於裡面的元素。在 Python 中有一個內建的 hash 函式,用它即可知道某資料是不是 hashable 的。例如,hash((1, 'a', True)) 是可以的;但如果 tuple 裡面有 list,hash((1, 'a', [1, 2])) 則會得到 TypeError: unhashable type: 'list'

hash 後的值不保證一定一樣

在 Python 內建的 hash 函式中它會回傳 int,基於某些安全性的理由,Python 會保證在同一次執行時,帶入 hash() 中的 input 相同時,能得到相同的 output,但是如果是不同次執行,則相同的 input 不保證會得到相同的 output,所以盡可能不要用 hash() 後的 hash value 來作為 input 是不是相同的判斷

  • dictionary 的 key order 在 Python 3.5+ 後會依照 insert 時的順序被保持
  • dictionary 的 value 可以是 any object,例如,lambda、list、dictionary
# 1. 使用 literal
user1 = {'name': 'John', 'age': 25}

# 2-1. 使用 constructor
user2 = dict(name='John', age=25) # 使用 constructor

# 2-2. 使用 tuple 透過 constructor 建立 dictionary
user_from_tuple = (('name', 'John'), ('age', 25))
user3 = dict(user_from_tuple)

# 2.3 使用 list 透過 constructor 建立 dictionary
user_from_list = [['name', 'John'], ['age', 25]]
user4 = dict(user_from_list)

# 3. 使用 comprehension
user_from_list = [['name', 'John'], ['age', 25]]
user5 = {k: v for k, v in user_from_list}
在 Dictionary 中新增或更新 Key-Value
# 使用 []
user = {'name': 'John', 'age': 25}
user["height"] = 180

# dict.setdefault(key, value)
# 如果存在就不動作,如果不存在就新增,類似建立預設值
user.setdefault('name', 'Aaron') # 因為 name 已經存在,所以值依然會是 'John'
user.setdefault('height', 170) # 因為 height 不存在,所以值會是 170

# 使用 update,如果存在就更新,不存在就新增
user.update({
"height": 180,
"width": 76
})

# 使用 unpacking operator
new_user = {
**user,
"height": 180,
"width": 76
}

# 使用 |
user = {'name': 'John', 'age': 25}
info = {"height": 180, "width": 76}
new_user = user | info
取值
user = {'name': 'John', 'age': 25}
user["name"] # "John"
user["foo"] # KeyError: 'foo'

# 使用 get:如果沒有該 key 不會噴錯
# get(key, default=None)
user.get("foo") # None
user.get("foo", "default_value") # default_value
刪除 key 與清空
user = {'name': 'John', 'age': 25}

del user["name"] # 刪除 name 這個 key-value pair
user.clear() # 把 user 中的 key-value pairs 清空,變成 empty dictionary
pop, popitem:把 key-value pair 中 dictionary 中取出(後刪除)
# dict.pop(key, [default_value])
user = {'name': 'John', 'age': 25}
user_name = user.pop("name") # "John",並把 user 中的 {"name": "John"} 移除
user_age = user.pop("age", 30) # 25, 如果沒有 age 這個 key,則用 default value(30)

dict.popitem() 是 LIFO,所以可以當作 stack 使用:

# dict.popitem() 把最後面的一組 key-value pair 拿出來
user = {'name': 'John', 'age': 25}
age_info = user.popitem() # ('age', 25)
複製 dictionary
user = {'name': 'John', 'age': 25}

# 使用 copy
new_user = user.copy()

# 使用 dict
new_user = dict(user)

# 使用 unpacking operator
new_user = {**user}

常用方法

Dict Comprehensions
names = ['Jenny', 'Aaron', 'Sam', 'Grace']
ages = [23, 25, 20, 27]
participants = {k:v for k, v in zip(names, ages)} # {'Jenny': 23, 'Aaron': 25, 'Sam': 20, 'Grace': 27}
dict.fromkeys():從其他 iterable object 建立 dictionary
# 根據字串、串列或 Tuple 來建立 Dict 的 key
# classmethod fromkeys(iterable, value=None, /)
dict.fromkeys(['name', 'age'])
dict.fromkeys(['name', 'age'], '') # {'name': '', 'age': ''}
in:判斷 dictionary 中是否有該 key
user = {'name': 'John', 'age': 25}
"name" in user # True
"foo" not in user # False
unpacking operator (**):可以用來合併 dictionary
defaults = {'color': 'red', 'size': 'medium', 'price': 100}
updates = {'color': 'blue', 'price': 150}

final = {**defaults, **updates} # {'color': 'blue', 'size': 'medium', 'price': 150}
迭代 dictionary
user = {'name': 'John', 'age': 25}
len(user) # 2

# 直接迭代 dictionary 會拿到 key
for key in user:
print(k)

# 取得所有 keys
user.keys() # dict_keys(['name', 'age'])
list(user) # ["name", "age"]

user.values() # dict_values(['John', 25])
list(user.values()) # ['John', 25]
for value in user.values():
print(value)


user.items() # dict_items([('name', 'John'), ('age', 25)])
list(user.items()) # [('name', 'John'), ('age', 25)]

for k, v in user.items():
print(f"{k}: {v}")
警告

要特別留意的是,使用 keys()values()、或 items() 拿到的資料並不是真的帶有 key、value 的 list,而是一個「view」讓我們可以透過它看到目前 dictionary 的資料。因此即使把它存成變數,一旦原本的 dictionary 改變,這個變數得到的內容也會改變。

Tuple

基本操作

建立 Tuple
# 使用 ()
single = (1,)
mixed = (1, "hello", True)
empty = ()

# 使用 tuple constructor
list_to_tuple = tuple([1, 2, 3]) # (1, 2, 3)
string_to_tuple = tuple("hello") # ('h', 'e', 'l', 'l', 'o')
range_to_tuple = tuple(range(3)) # (0, 1, 2)

Enum

import enum


# 定義 enum
class Color(str, enum.Enum):
RED = "red"
GREEN = "green"
BLUE = "blue"


print(Color.RED.value) # red


# 把 enum 轉成 list
colors = [color.value for color in Color]
print(colors) # ['red', 'green', 'blue']

Operators

Comparisons Operators

1 < 2 < 3  # True
1 < 2 and 2 < 3 # True
1 == 1 or 2 == 2 # True
not ( 1 == 1 ) # False
not ( 400 > 5000 ) # True

Useful Operators

range(start, stop, step)

# 0, 1, 2
for i in range(3):
print(i)

# 3, 4
for i in range(3,5):
print(i)

# 1,3,5
for i in range(1,6,2):
print(i)

# [0, 2, 4]
list(range(0, 5, 2))

enumerator

把 string 變成 tuple

word = 'abc'

# 0: a
# 1: b
# 2: c
for idx, value in enumerate(word):
print(f'{idx}: {value}')

zip

把多個 list 組成一個 tuple

name = ['Aaron', 'John', 'Mary']
age = [12, 13, 14]
height = [170, 180, 160]

# Aaron is 12 years old with 170 cm
# John is 13 years old with 180 cm
# Mary is 14 years old with 160 cm
for name, age, height in zip(name, age, height):
print(f'{name} is {age} years old with {height} cm')

# <zip object at 0x1023a1300>
zip(name, age, height)

# [('Aaron', 12, 170), ('John', 13, 180), ('Mary', 14, 160)]
list(zip(name, age, height))

in

1 in [1, 2, 3]  # True

'a' in 'abc' # True
'a' in [1, 2, 3] # False

user = {
"name": "John",
"age": 30,
"city": "New York"
}
'name' in user # True
'John' in user.values()

Statements

Conditional Statements

state = 'active'

if state == 'active':
print('The state is active')
elif state == 'inactive':
print('The state is inactive')
else:
print('The state is unknown')

For Loops

Iterate a list

brands = ['Ford', 'BMW', 'Volvo']
for brand in brands:
print(brand)

Combine with conditional statement:

brands = ['Ford', 'BMW', 'Volvo']
for brand in brands:
if brand == 'Volvo':
print(brand + ' - I like it!')
else:
print(brand + ' - I don\'t like it!')

Iterate a string

str = 'Hello World'
for letter in str:
print(letter)

Iterate a tuple

for t in (1, 2, 3):
print(t)

Tuple Unpacking

my_list = [(1, 2), (3, 4), (5, 6), (7, 8), (9, 10)]

for a, b in my_list:
print(a + b)

Iterate an dictionary

user = {
"name": "John",
"age": 30,
"city": "New York"
}

for key in user:
print(f'key: {key}')
print(f'value: {user[key]}')

for key, value in user.items():
print(f'key: {key}')
print(f'value: {value}')

for value in user.values():
print(value)

While Loops

x = 0
while x < 5:
# x = x + 1
x += 1
print(f'The current value of x is {x}')
# could combine with "else"
else:
print('X IS NOT LESS THAN 5')

Basic I/O

Read and Write files

  • mode='r': read only
  • mode='w': write only (will overwrite existing files or create new)
  • mode='a': append only
  • mode='r+': read and write
  • mode='w+': write and read (overwrite existing files or create new)
myfile = open('myfile.txt')
myfile.read()
myfile.seek(0) # reset cursor
myfile.readlines() # returns a list of lines
myfile.close() # close file

# 不用怕忘記 close 的寫法
# create or overwrite a file
with open('myfile.txt', mode="w") as my_new_file:
contents = my_new_file.write(
'this is a text file\nthis is the second line\nthis is the third line')

# read file
with open('myfile.txt', mode='r') as f:
contents = f.read()
print(contents)

# append file
with open('myfile.txt', mode='a') as f:
f.write('\nthis is the fourth line')

with open('myfile.txt', mode='r') as f:
contents = f.read()
print(contents)

Modules and Packages(模組與套件)

定義 Modules 和 Packages

  • Module(模組)指的就是 .py 「檔案」,在 Python 中不需定義要 export 哪些變數,可以自由 import
  • Package(套件)則是一個包含許多 modules 的「資料夾」,要變成 package,只需要把檔案(模組)整理在資料夾中,並在資料夾中加上 __init__.py 的檔案
  • 一個 Package 中還可以有 sub package,只要在該資料夾中同樣加上 __init__.py 的即可。
資訊

簡單來說,在 Python 裡,module 指的是「檔案」,package 指的是「資料夾」。

舉例來說,我們的資料結構如下:

.
├── main_package
│ ├── __init__.py
│ ├── main_module.py # 裡面有定義 report_main_method()
│ └── sub_package
│ ├── __init__.py
│ └── sub_module.py # 裡面有定義 report_sub_method()
├── my_module.py # 裡面有定義 my_function()
└── main.py

如果想要在 main 中使用 my_module 模組中的 report_my_method 方法:

"""
可以直接匯入「整個模組」
"""
import my_module

my_module.report_my_method()


"""
也可以「匯入模組中」的「部分方法」
"""
from my_module import report_my_method

report_my_method()

如果想要在 main 中使用 main_package 套件的 main_module 模組裡 report_main_method 方法,可以這樣匯入:

"""
可以匯入「整個套件」
"""
import main_package

main_package.main_module.report_main_method()

"""
可以匯入「套件中」的「部分模組」
"""

from main_package import main_module

main_module.report_main_method()

"""
也可以匯入套件中的整個「模組」
"""
import main_package.main_module

main_package.main_module.report_main_method()

"""
也可以「匯入模組中」的「部分方法」
"""

from main_package.main_module import report_main_method

report_main_method()

sub_package 中的方法,只需要這樣 import 即可

# package 則是 folder name
from main_package import main_script
main_script.report_main()

from main_package.sub_package import sub_script

my_function()

sub_script.report_sub_script()
資訊

如果使用 from package import item 的寫法,這個 item 可以是 subpackage、function、class 或 variable;如果使用的是 import item.subitem.subsubitem 這種寫法的話,除了最後一個 item(subsubitem)外,其他都必須是 package,而最後這個 item 可以是 module 或 package,但不能是 function、class 或 variable。

  • 匯入的模組也是物件,可以使用 dir(my_module) 來檢視這個模組中有哪些方法可以使用
  • 被匯入的模組(這隻檔案)會被 Python 執行,即使只匯入部分方法也是
  • 使用 sys.path 可以查看 Python 搜尋模組的路徑
  • 使用 sys.modules 可以查看哪些模組被匯入
判斷這支檔案是直接被執行,或是被匯入時執行到

如果要判斷某個檔案是不是直接被執行,而不是透過 import 的方式被載入後才執行,可以使用 __name__ == "__main__",當這個檔案是直接被執行時,會得到 true,否則,它會是模組的名稱。

如果有需要的話,也可以使用 as (alias)來替import 的 module 改名字:

import numpy as np
from matplotlib.pyplot import plot as plt

np.array([1, 2, 3])
plt([1, 2, 3], [4, 5, 6])

明確定義其他人 import module 的方式

因為 module 中的所有變數、方法都可以被 import,所以如果我們希望明確定義哪些是「希望」開放給其他人使用的 module 時,可以搭配 as 定義在 __init__.py 中。例如:

# fastapi/__init__.py

"""FastAPI framework, high performance, easy to learn, fast to code, ready for production"""

__version__ = "0.111.0"

from starlette import status as status

# 其他人在使用時,可以不用寫 "from fastapi import applications"
# 而是寫 "from fastapi import FastAPI"
# from <module> import <method>
from .applications import FastAPI as FastAPI
# ...

Strings

官方文件

Text Sequence Type @ Built-in Types

Functions

一些特殊的屬性

keywords:__name__, __doc__, __annotations__
  • 使用 fn.__name__ 可以的到該函式的名稱
  • 使用 fn.__doc__ 可以檢視該函式的使用說明(如果有寫 Docstring 的話)
  • 使用 fn.__annotations__ 可以檢視函式參數的型別註記

Positional Arguments vs. Keyword Arguments

def greet(first_name, last_name):
return print(f"Hello, {first_name} {last_name}!")


# 使用 positional arguments
greet("John", "Doe")

# 使用 keyword arguments
greet(last_name="Doe", first_name="John")

在定義函式時可以使用 /* 來指定哪些參數只能是 positional arguments、哪些參數只能是 keyword arguments:

  • / 之前的參數一定要用 Positional Arguments
  • * 之後的參數一定要用 Keyword Arguments
def example_function(a, b, /, c, *, d, e):
print(f"a: {a}, b: {b}, c: {c}, d: {d}, e: {e}")


example_function(1, 2, 3, d=4, e=5) # Correct
example_function(1, 2, c=3, d=4, e=5) # Correct

取得所有傳入的參數

keywords: *args**kwargs

在 Python 函式中,可以在參數前面加上 *,以此來捕捉使用者傳入的任意數量的「Positional Arguments」。這些 Positional Arguments 會被打包成一個「元組(tuple)」,讓你可以處理任意數量的輸入:

# 使用 "*args" 可以把所有輸入的參數變成一個 tuple
# "args" 只是一個變數名稱,可以自己取名
def display_items(*items):
print("Items in your collection:")
for idx, item in enumerate(items, 1):
print(f"Item {idx}: {item}")

display_items("Laptop", "Smartphone", "Tablet", "Headphones")

"""
Items in your collection:
Item 1: Laptop
Item 2: Smartphone
Item 3: Tablet
Item 4: Headphones
"""

在參數前面加上 * 可以取得 Positional Arguments;如果要取得 Keyword Arguments 的話,則要在參數前面加上 **,例如 **kwargs這些 Keyword Arguments 會被包成一個 dict

# 在參數前面加上 "**" 可以取得 keyword arguments
def collect_inputs(*args, **kwargs):
print("You entered these positional arguments:", args)
for i, arg in enumerate(args, 1):
print(f"Positional Argument {i}: {arg}")

print("You also entered these keyword arguments:")
for key, value in kwargs.items():
print(f"{key}: {value}")


collect_inputs("apple", "banana", "cherry", name="David", age=25)

"""
You entered these positional arguments: ('apple', 'banana', 'cherry')
Positional Argument 1: apple
Positional Argument 2: banana
Positional Argument 3: cherry
You also entered these keyword arguments:
name: David
age: 25
"""
提示

*args 會是 tuple;**kwargs 會是 dict。

將傳入的參數「解開來」

上面我們是在定義函式的時候使用 ***,在呼叫函式的時候,也可以使用 *,但意義上是不同的,如果在呼叫函式時使用 *,表示要把帶入的參數「解開來」,例如:

fruits = ["apple", "banana", "cherry"]
collect_inputs(*fruits, name="David", age=25)

# 等同於
collect_inputs("apple", "banana", "cherry", name="David", age=25)

使用 * 可以把 list 的資料解開來;如果是想要把 dict 的資料解開來的話,則一樣可以用 **

david = {
"name": "David",
"age": 25
}
collect_inputs("apple", "banana", "cherry", **david)

# 等同於
collect_inputs("apple", "banana", "cherry", name="David", age=25)

當然,也可以兩個一起用:

fruits = ["apple", "banana", "cherry"]
david = {
"name": "David",
"age": 25
}
collect_inputs(*fruits, **david)

# 等同於
collect_inputs("apple", "banana", "cherry", name="David", age=25)

預設值(default values)要留意的地方

在 Python 中,要留意 default value 會被保存在 function 中,如果我們在 function 中 mutate 了 default value,則這個 function 在下一次呼叫時,會用的是新的、被改變過的 default value:

def mutate_default(my_list: list = []):
my_list.append(1)
return my_list

# my_list 的 default value 會被保存在 function 中,不會每次在 function 呼叫時就重新建立新的
mutate_default() # [1]
mutate_default() # [1, 1]
mutate_default() # [1, 1, 1]
提示

在定義函式時,這個預設值的值就已經被建立(決定),而不是在函式被執行時才被決定。(參考:參數預設值 @ 為你自己學 Python

要改善這個問題,其中常見的解法是把預設值設為 None,然後:

def mutate_default(my_list = None):
if my_list is None:
my_list = []
my_list.append(1)
return my_list

lambda

在 Lambda 表達式裡:

  • 不能做賦值或宣告變數
  • 不能使用型別註記

透過 lambda 可以:

  • 把函式指派給某個變數
  • 建立出匿名函式
# lambda [函式參數]: 函式內容(不需要特別加 return)
add = lambda x, y: x + y
result = add(2, 3) # 5

# 一樣可以幫參數帶入預設值
add = lambda x, y=3: x + y
result = add(2) # 5

lambda 特別適合用在一次性的函式,如此就不需要特別幫這個函式命名。例如:

employees = [
{"name": "Alice", "salary": 70000},
{"name": "Bob", "salary": 50000},
{"name": "Charlie", "salary": 120000},
]

# Sorting employees by salary using a lambda function
sorted_employees = sorted(employees, key=lambda x: x["salary"])

如果使用 lambda_fn.__name__ 的話,會拿到的是 '<lambda>' 而不是該 lambda 函式的名稱:

lambda_fn = lambda a: a + 1
lambda_fn.__name__ # '<lambda>'

decorators

generators

def is_prime(num):
if num <= 1:
return False
for i in range(2, int(num ** 0.5) + 1):
if num % i == 0:
return False
return True


def find_primes(n):
for i in range(1, n + 1):
if is_prime(i):
# 使用 yield 會讓程式停在這
yield i
# highlight-send

如果要一次拿出一個 generators 的值:

primes = find_primes(10)
next(primes) # 2
next(primes) # 3
next(primes) # 5
next(primes) # 7
next(primes) # StopIteration

如果要一次拿出所有 generator 的值,並保存在 list 中:

[prime for prime in find_primes(10)]
# [2, 3, 5, 7]

或者可以單純用 for

for prime in find_primes(10):
print(prime)

# 2
# 3
# 5
# 7

Error and Exception Handling

Python 中內建的 Error 多是繼承自 BaseException 這個類別:

TypeError.__mro__  # (<class 'TypeError'>, <class 'Exception'>, <class 'BaseException'>, <class 'object'>)

Raise Error

拋出 Python 內建的錯誤

raise NameError  # 沒有客製化錯誤訊息
raise NameError("Custom Error Message")

拋出 Exception

如果內建的錯誤類別不適用,可以使用更泛用的 Exception 類別:

raise Exception("Custom Error Message")

Try / Except

在 Python 中,如果在 try 中有錯誤發生時,程式會立即終止並跳到 except 中執行:

  • try:把可能會發生錯誤的程式包在裡面
  • except:攔截並處理錯誤,搭配 as 可以取得錯誤的 Error 物件
  • else:如果沒有錯誤則執行這裡
  • finally:不論有沒有錯誤都會執行這裡
try:
print("Hello, World")
except NameError as err: #
# 攔截並處理 NameError 的例外
print("An exception occurred: {}".format(err))
except KeyError as err:
# 攔截並處理 KeyError 的例外
print("An exception occurred: {}".format(err))
except Exception:
# 攔截並處理任何類型的例外
print("An exception occurred")
else:
# 如果沒有發生錯誤才會執行
print("No exception occurred")
finally:
# 不論有沒有發生錯誤都會執行
print("Finally block executed")

攔截並處理特定類型的例外

try:
pass
except NameError as err: # 只攔截處理 NameError
print('A NameError occurred')
print(err)

同時攔截並處理多個類型的例外

except 後使用 tuple 就可以攔截並處理多個不同類型的錯誤:

try:
pass
except (NameError, ZeroDivisionError) as e:
print(e)

如果不同類型的錯誤需要有不同處理,則可以寫成:

try:
pass
except NameError:
print('A NameError occurred')
except KeyError:
print('A KeyError occurred')
except Exception:
print('A unknown Error occurred')

客製化錯誤(User-defined Exceptions)

雖然沒有強制,但一般命名會依照 Python Error 的慣例,以Error 結尾:

class CustomError(Exception):
pass

如果希望這個 Error 能接收客製化的錯誤訊息:

class OutOfStockError(Exception):
def __init__(self, product):
self.product = product

def __str__(self):
return f"The '{self.product}' is out of stock"


def main():
try:
raise OutOfStockError("iPhone")
except OutOfStockError as err:
print(err)

Parallel and Concurrent Python

在使用 Parallel 或 Concurrent 的方式來優化前,應該先考慮有沒有其他方式能優化執行速度,例如使用 NumPypandas,真的不行的話,才會考慮使用這裡提到的 multi-processing、multi-threading 或 asynchronous 的方式。

  • CPU-bound: multi-processing
  • I/O-bound: asyncio > multi-threading

對於 I/O-bound 的任務,只有在不支援 asyncio 的情況下才來使用 multi-threading 的做法。

Asyncio

官方文件
  • asyncio.create_task(ASYNC_FN())
    • 把一個 async function 變成 task,執行 asyncio.create_task 的時候該 async function 就會被排入 event loop 中被執行
  • asyncio.run(ASYNC_FN())
    • 會建立一個 event loop,主要是給最外層沒有 async keyword 的 function(例如 main)使用,否則會得到這個錯誤「SyntaxError: 'await' outside async function」
  • asyncio.gather(ASYNC_FN_1(), ASYNC_FN_2(), ...):類似 JavaScript 中的 Promise.all
注意

Python 的 async 和 JavaScript 類似,都是基於 event loop 處理非同步任務,但 Python 需要 asyncio 來啟動 event loop,且要搭配 asyncio 可用的函式(例如,asyncio.sleep),適合用在 heavy I/O 。如果是 computation heavy 的任務(例如,CPI-bound 運算),即使使用 async 依然會導致 main thread 卡住(blocking),需要的話,需要使用 threadprocess

範例:asyncio 的基本使用

import asyncio

async def foo():
print("Foo start")
await asyncio.sleep(2)
print("Foo done")

async def bar():
print("Bar start")
await asyncio.sleep(1)
print("Bar done")

async def main():
task1 = asyncio.create_task(foo()) # Schedule foo()
task2 = asyncio.create_task(bar()) # Schedule bar()

# 雖然可以直接用 await foo() 但這樣就沒有利用到 concurrently 的效果
# await foo()

await task1 # Wait for foo() to finish
await task2 # Wait for bar() to finish

asyncio.run(main()) # Start the event loop

範例二:使用 await for...in (sequential) 或 asyncio.gather (concurrent)

import asyncio


async def async_hello(name: str) -> str:
"""Simulates an async greeting with a delay."""
await asyncio.sleep(1)
print(f"Hello {name}")
return name


async def main_sequential():
"""Runs tasks sequentially without concurrency."""
results = [await async_hello(name) for name in ["A", "B", "C"]]
print(*results)


async def main_concurrent():
"""Runs tasks concurrently for better performance."""

# 直接帶入 async job
# results = await asyncio.gather(async_hello("A"), async_hello("B"), async_hello("C"))
#
# 或透過 generator 帶入 job(節省記憶體)
# names = ["A", "B", "C"]
# results = await asyncio.gather(*(async_hello(name) for name in names))
#
# 或透過 list 帶入 job(可讀性較高)
tasks = [async_hello("A"), async_hello("B"), async_hello("C")]
results = await asyncio.gather(*tasks)
print(results)


if __name__ == "__main__":
asyncio.run(main_sequential())
asyncio.run(main_concurrent())

Thread

在 Python 中雖然提供 Thread 可以使用,然而在 CPython 中的 Global Interpreter Lock(GIL) 為了確保執行緒安全和記憶體一致性,限制了多 thread 的效能,讓一次只能有一個 thread 被執行,這導致在 CPython 中,對於 CPU-bound tasks 沒辦法實際達到 multi-threading 該有的效能。

提示

Thread 適合用在 I/O-bound 且該函式庫尚未支援 asyncio 的情況,否則的話,直接使用 asyncio 即可。

警告

如果是 I/O-bound(例如 sleep()、網路請求、文件讀寫等),ThreadPoolExecutor 可以提高執行速度

如果是 CPU-bound(例如數據計算、壓縮、加密等),ThreadPoolExecutor 無法提高效能,甚至會因為 GIL 而變慢,這時應該使用 ProcessPoolExecutor。

建立和執行 Thread

from threading import Thread
import time


def do_work(sleep_time: float) -> None:
print("Start worker")
time.sleep(sleep_time)
print("End worker")


# 創建一個執行緒來執行 do_work 函式,並傳遞 2 秒的睡眠時間
worker_thread = Thread(target=do_work, name="t1", args=(2.0,))
print(f"Ident: {worker_thread.ident}") # None
print(f"Alive: {worker_thread.is_alive()}") # False
print(f"Name: {worker_thread.name}") # t1

# 啟動執行緒
worker_thread.start()
print(f"Ident: {worker_thread.ident}") # 系統分配的執行緒 ID,6132363264
print(f"Alive: {worker_thread.is_alive()}") # 執行緒啟動狀態,True
print(f"Name: {worker_thread.name}") # t1

# 等待執行緒完成工作
worker_thread.join() # 讓主執行緒等待 t1 結束

print("主執行緒繼續執行其他任務")

使用 ThreadPoolExecutor

可以使用 executor.submit() 來將 task 提交給 Thread Pool,並使用 .result() 來獲得結果:

  • submit() 允許你在不同的時間提交多個 tasks,並且能夠為每個 task 獲取一個 Future 物件,進一步操作這些任務的狀態和結果
from concurrent.futures import ThreadPoolExecutor
import time


def task(name, delay):
# 定義一個模擬長時間執行的任務
print(f"Task {name} started")
time.sleep(delay)
print(f"Task {name} completed")
return f"Result from {name}"


def main():
start = time.perf_counter()

# 使用 ThreadPoolExecutor 來管理執行緒池
# 創建一個有 3 個執行緒的 ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交任務
futures = [
executor.submit(task, "A", 2),
executor.submit(task, "B", 1),
executor.submit(task, "C", 3)
]

# 等待並獲取結果
for future in futures:
print(future.result())

end = time.perf_counter()
print(f"Execution duration: {end-start}")

或者也可以使用 executor.map()

  • submit() 不同,map() 是一次性提交所有任務並返回結果,它會按順序返回所有結果
from concurrent.futures import ThreadPoolExecutor
import time

# 定義一個簡單的任務
def task(n):
time.sleep(1)
return f"Task {n} completed"

def main():
# 創建 ThreadPoolExecutor,並使用 map() 方法
with ThreadPoolExecutor(max_workers=3) as executor:
results = executor.map(task, range(1, 6)) # 將 1 到 5 作為參數提交給 task 函數

# 輸出結果
for result in results:
print(result)

Process

Multi-processing 適合用在 CPU-bound tasks。

建立新的 Process 比起建立新的 Thread 要花時間,且每個 Process 之間並不會共享記憶體。

建立和執行 Process

這種寫法沒辦法取得 Process 回傳的結果:

import math

numbers = [
18014398777917439,
18014398777917439,
18014398777917439,
18014398777917439,
]


def is_prime(n: int) -> bool:
if n < 2:
return False
if n in (2, 3, 5, 7, 11, 13, 17):
return True
if (
n % 2 == 0
or n % 3 == 0
or n % 5 == 0
or n % 7 == 0
or n % 11 == 0
or n % 13 == 0
or n % 17 == 0
):
return False
upper_limit = int(math.sqrt(n)) + 1
for i in range(19, upper_limit, 2):
if n % i == 0:
return False
return True
import time
from multiprocessing import Process


def main_with_processes():
processes = [Process(target=is_prime, args=(n,)) for n in numbers]

start = time.perf_counter()

[p.start() for p in processes]
[p.join() for p in processes]

end = time.perf_counter()

print(f"time: {end - start}")

[p.close for p in processes]


def main_without_multiprocessing():
start = time.perf_counter()
results = [is_prime(n) for n in numbers]
print(results)

end = time.perf_counter()

print(f"time: {end - start}")


if __name__ == "__main__":
main()

main_without_multiprocessing()

使用 Process Pool 或 Process Pool Executor

使用 Pool 才能取得 Process 的回傳值:

from multiprocessing import Pool


def main_with_process_pool():
start = time.perf_counter()

with Pool() as pool:
result = pool.map(is_prime, numbers)
print(result)

end = time.perf_counter()

print(f"time: {end - start}")

也可以使用 ProcessPoolExecutor

from concurrent.futures import ProcessPoolExecutor


def main_with_process_poll_executor():
start = time.perf_counter()

with ProcessPoolExecutor() as executor:
primes = executor.map(is_prime, numbers)
for number, prime in zip(numbers, primes):
print(f"{number} is prime: {prime}")

end = time.perf_counter()

print(f"time: {end - start}")

Generator

Generator 的特性

  • 不會立即算出 value,而是在要被使用時才會計算出來
  • 相較於 List,如果需要處理大量資料時,因為 generator 不會把所有值都先算好保存在記憶體中,因此相較於 list 可以省下許多記憶體

Generator Expression

# Generator expression to compute squares lazily
squares = (x * x for x in range(5)) # <generator object <genexpr> at 0x...>
print(next(squares)) # 0

# 或者用 for loop 全部 iterate 出來
for num in (x * x for x in range(5)):
print(num)

變數作用域

keywords: globals()locals(), global, nonlocal
參考資料

作用域 @ 為你自己學 Python

在 Python 中,因為沒有用來明確「宣告變數」的 keyword,所以如果指定、了解變數的作用域就是一個很重要的觀念。

一般來說,只要在函式做建立了一個新的變數,這個變數就會是區域變數(local variable),即是外面已經有相同名稱的全域變數也不會撞名:

# 有一個 global variable 叫做 "name"
name = "Aaron"


def change_name():
# 只要在函式中建立了一個新的變數,它就會是區域變數
# 這裡的 name 會是 local variable,而不會去改到 global variable
name = "PJCHENder"
print(name) # PJCHENder


change_name()
print(name) # Aaron

如果希望能夠改到全域變數,需要使用關鍵字 global 來讓 Python 知道,這裡要存取的是全域變數:

# 有一個 global variable 叫做 "name"
name = "Aaron"


def change_name():
# 明確告知 Python 我要使用的是 global variable
global name
# 這裡的 name 就會改到全域的 name
name = "PJCHENder"
print(name) # PJCHENder


change_name()
print(name) # PJCHENder

如果要改的不是「全域變數」,也不是「區域變數」,而是往外一層的「封閉變數」的話,這時候就得用 nonlocal 這個關鍵字:

name = "Aaron"


def outer():
# 這個是 local variable,不會改到 global variable
name = "PJCHEN"

def inner():
# 這裡的 name 是 outer() 的 local variable
nonlocal name
name += "der"

inner()
print(name) # PJCHENder


outer()
print(name) # Aaron

除此之外,使用 globals()locals() 來取得當下的全域變數(global variables)和區域變數(local variables):

print(globals())
print(locals())

常用 Library

time

time.perf_counter_ns() # 可以拿來算函式執行的時間

collections

defaultdict

defaultdict 會自動為不存在的鍵創建一個默認值(default factory)。

defaultdict(list):當 dict 中,該 key 不存在時,會自動建立 key 並以 empty list 為其 value:

from collections import defaultdict

# 假設我們有這些數據
data = [('apple', 10), ('banana', 20), ('apple', 15), ('banana', 25), ('cherry', 30)]

# 使用 defaultdict(list) 來分組
grouped = defaultdict(list)

for fruit, quantity in data:
grouped[fruit].append(quantity)

print(grouped)
# 輸出: defaultdict(<class 'list'>, {'apple': [10, 15], 'banana': [20, 25], 'cherry': [30]})

defaultdict(int):當 dict 中,該 key 不存在時,會自動建立 key 並以 int 0 為其 value:

from collections import defaultdict

# 用來計數的 defaultdict
count_dict = defaultdict(int)

# 計數單詞的出現次數
words = ['apple', 'banana', 'apple', 'orange', 'banana', 'apple']

for word in words:
count_dict[word] += 1

print(count_dict)
# 輸出: defaultdict(<class 'int'>, {'apple': 3, 'banana': 2, 'orange': 1})

dict_with_default = defaultdict(lambda: 'unknown')

from collections import defaultdict

# 用來初始化為 'unknown' 的 defaultdict
dict_with_default = defaultdict(lambda: 'unknown')

# 訪問不存在的鍵時,會自動返回 'unknown'
print(dict_with_default['name']) # 輸出 'unknown'

dict_with_default['name'] = 'Alice'
print(dict_with_default['name']) # 輸出 'Alice'

留言