Python Getting Started
- :thumbsup: 為你自己學 PYTHON
- Python @ CodeCademy
- Complete-Python-3-Bootcamp @ GitHub
# 在 REPL 環境下載入某支檔案
$ python -i hello.py
Setup & Installation
PIP
- PIP
- Version Specifiers:安裝特定版本的套件
$ pip list # 檢視目前電腦上安裝哪些 python package
$ pip install [package] # 安裝套件
$ pip uninstall [package] # 移除套件
$ pip show [package] # 檢視套件資訊
venv
# 建立 virtual env
# python -m venv [virtual_environment_name]
$ python -m venv .venv # 建立名為 .venv 的 virtual environment
# 啟動 virtual env
# source [virtual_environment_name]/bin/activate
$ source .venv/bin/activate
# 進入 virtual env 後,使用 deactivate 可以離開
> deactivate
# 把目前專案有安裝的套件寫進 requirement.txt 中
$ pip freeze > requirements.txt
# 安裝 requirement.txt 中列出的套件
$ pip install -r requirements.txt
# 解除安裝
$ pip uninstall -r requirements.txt -y
Poetry
參考:[note] Python Poetry @ PJCHENder
VSCode
Getting Started with Python in VS Code @ Youtube
Data Structure Basic
Variable
a = 3
type(a) # int
String
##
# Index and Slice
##
'tinker'[1:4] # 'ink'
'tinker'[1:4:2] # 'ik'
'tinker'[::-1] # 'reknit
##
# Formatting with the .format() method
##
'Good {}, {} Chen!'.format('morning', 'Mr.') # 'Good morning, Mr. Chen!'
'My favorite brand is {}, {}, and {}!'.format('Apple', 'Samsung', 'Google') # 'My favorite brand is Apple, Samsung, and Google!'
'My favorite brand is {2}, {1}, and {0}!'.format('Apple', 'Samsung', 'Google') # 'My favorite brand is Google, Samsung, and Apple!'
'Repeat after me: {0}! {0}! {0}!'.format('Ho') # 'Repeat after me: Ho! Ho! Ho!'
'Good {time}, {title} {name}!'.format(time='morning', title='Mr.', name='Chen') # 'Good morning, Mr. Chen!'
##
# Float Formatting
##
result = 100/777 # 0.1287001287001287
# Old Way, {value:width.precision f}
print("The result was {:1.3f}".format(result)) # The result was 0.129
# Formatted String Literals(f-strings)
name = 'Aaron'
age = 33
print(f'Hello, my name is {name}, and I\'m {age} years old.')
List
[0]*3 # [0, 0, 0]
my_list = [1, 3, 2]
another_list = [4, 6, 5]
# array concat
whole_list = my_list + another_list
whole_list # [1, 3, 2, 4, 6, 5]
# sort
whole_list.sort()
whole_list # [1, 2, 3, 4, 5, 6]
whole_list.reverse()
whole_list # [6, 5, 4, 3, 2, 1]
Dictionary
Mapping Types - Dict @ Python Doc > Built-in Type
Dictionary 基本操作
建立 Dictionary
- dictionary 的 key 必須要是 hashable object,因此
- 可以是 string、number、bool、frozenset、function
- 不行是 list、set、dictionary,因為它們都是 mutable types
能不能是 hashable 的實際上是取決於該資料是不是 immutable 的,因此 tuple 不是 hashable 的,取決於裡面的元素。在 Python 中有一個內建的 hash 函式,用它即可知道某資料是不是 hashable 的。例如,hash((1, 'a', True)) 是可以的;但如果 tuple 裡面有 list,hash((1, 'a', [1, 2])) 則會得到 TypeError: unhashable type: 'list'。
在 Python 內建的 hash 函式中它會回傳 int,基於某些安全性的理由,Python 會保證在同一次執行時,帶入 hash() 中的 input 相同時,能得到相同的 output,但是如果是不同次執行,則相同的 input 不保證會得到相同的 output,所以盡可能不要用 hash() 後的 hash value 來作為 input 是不是相同的判斷。
- dictionary 的 key order 在 Python 3.5+ 後會依照 insert 時的順序被保持
- dictionary 的 value 可以是 any object,例如,lambda、list、dictionary
# 1. 使用 literal
user1 = {'name': 'John', 'age': 25}
# 2-1. 使用 constructor
user2 = dict(name='John', age=25) # 使用 constructor
# 2-2. 使用 tuple 透過 constructor 建立 dictionary
user_from_tuple = (('name', 'John'), ('age', 25))
user3 = dict(user_from_tuple)
# 2.3 使用 list 透過 constructor 建立 dictionary
user_from_list = [['name', 'John'], ['age', 25]]
user4 = dict(user_from_list)
# 3. 使用 comprehension
user_from_list = [['name', 'John'], ['age', 25]]
user5 = {k: v for k, v in user_from_list}
在 Dictionary 中新增或更新 Key-Value
# 使用 []
user = {'name': 'John', 'age': 25}
user["height"] = 180
# dict.setdefault(key, value)
# 如果存在就不動作,如果不存在就新增,類似建立預設值
user.setdefault('name', 'Aaron') # 因為 name 已經存在,所以值依然會是 'John'
user.setdefault('height', 170) # 因為 height 不存在,所以值會是 170
# 使用 update,如果存在就更新,不存在就新增
user.update({
"height": 180,
"width": 76
})
user |= info # update 也可以改成用 |=
# 使用 unpacking operator
new_user = {
**user,
"height": 180,
"width": 76
}
# 使用 |
user = {'name': 'John', 'age': 25}
info = {"height": 180, "width": 76}
new_user = user | info
取值
user = {'name': 'John', 'age': 25}
user["name"] # "John"
user["foo"] # KeyError: 'foo'
# 使用 get:如果沒有該 key 不會噴錯
# get(key, default=None)
user.get("foo") # None
user.get("foo", "default_value") # default_value
刪除 key 與清空
user = {'name': 'John', 'age': 25}
del user["name"] # 刪除 name 這個 key-value pair
user.clear() # 把 user 中的 key-value pairs 清空,變成 empty dictionary
pop, popitem:把 key-value pair 中 dictionary 中取出(後刪除)
# dict.pop(key, [default_value])
user = {'name': 'John', 'age': 25}
user_name = user.pop("name") # "John",並把 user 中的 {"name": "John"} 移除
user_age = user.pop("age", 30) # 25, 如果沒有 age 這個 key,則用 default value(30)
dict.popitem() 是 LIFO,所以可以當作 stack 使用:
# dict.popitem() 把最後面的一組 key-value pair 拿出來
user = {'name': 'John', 'age': 25}
age_info = user.popitem() # ('age', 25)
複製 dictionary
user = {'name': 'John', 'age': 25}
# 使用 copy
new_user = user.copy()
# 使用 dict
new_user = dict(user)
# 使用 unpacking operator
new_user = {**user}
常用方法
Dict Comprehensions
names = ['Jenny', 'Aaron', 'Sam', 'Grace']
ages = [23, 25, 20, 27]
participants = {k:v for k, v in zip(names, ages)} # {'Jenny': 23, 'Aaron': 25, 'Sam': 20, 'Grace': 27}
dict.fromkeys():從其他 iterable object 建立 dictionary
# 根據字串、串列或 Tuple 來建立 Dict 的 key
# classmethod fromkeys(iterable, value=None, /)
dict.fromkeys(['name', 'age'])
dict.fromkeys(['name', 'age'], '') # {'name': '', 'age': ''}
in:判斷 dictionary 中是否有該 key
user = {'name': 'John', 'age': 25}
"name" in user # True
"foo" not in user # False
unpacking operator (**):可以用來合併 dictionary
defaults = {'color': 'red', 'size': 'medium', 'price': 100}
updates = {'color': 'blue', 'price': 150}
final = {**defaults, **updates} # {'color': 'blue', 'size': 'medium', 'price': 150}
迭代 dictionary
user = {'name': 'John', 'age': 25}
len(user) # 2
# 直接迭代 dictionary 會拿到 key
for key in user:
print(k)
# 取得所有 keys
user.keys() # dict_keys(['name', 'age'])
list(user) # ["name", "age"]
user.values() # dict_values(['John', 25])
list(user.values()) # ['John', 25]
for value in user.values():
print(value)
user.items() # dict_items([('name', 'John'), ('age', 25)])
list(user.items()) # [('name', 'John'), ('age', 25)]
for k, v in user.items():
print(f"{k}: {v}")
要特別留意的是,使用 keys()、values()、或 items() 拿到的資料並不是真的帶有 key、value 的 list,而是一個「view」讓我們可以透過它看到目前 dictionary 的資料。因此即使把它存成變數,一旦原本的 dictionary 改變,這個變數得到的內容也會改變。
Tuple
基本操作
建立 Tuple
# 使用 ()
single = (1,)
mixed = (1, "hello", True)
empty = ()
# 使用 tuple constructor
list_to_tuple = tuple([1, 2, 3]) # (1, 2, 3)
string_to_tuple = tuple("hello") # ('h', 'e', 'l', 'l', 'o')
range_to_tuple = tuple(range(3)) # (0, 1, 2)
Enum
import enum
# 定義 enum
class Color(str, enum.Enum):
RED = "red"
GREEN = "green"
BLUE = "blue"
print(Color.RED.value) # red
# 把 enum 轉成 list
colors = [color.value for color in Color]
print(colors) # ['red', 'green', 'blue']
Set
set1 = {1, 2, 3, 4, 5}
set2 = {3, 4, 5, 6, 7}
set1 - set2 # {1, 2}
set2 - set1 # {6, 7}
set1 & set2 # {3, 4, 5}
set1 | set2 # {1, 2, 3, 4, 5, 6, 7}
set1 ^ set2 # {1, 2, 6, 7}
Operators
Comparisons Operators
1 < 2 < 3 # True
1 < 2 and 2 < 3 # True
1 == 1 or 2 == 2 # True
not ( 1 == 1 ) # False
not ( 400 > 5000 ) # True
Useful Operators
range(start, stop, step)
# 0, 1, 2
for i in range(3):
print(i)
# 3, 4
for i in range(3,5):
print(i)
# 1,3,5
for i in range(1,6,2):
print(i)
# [0, 2, 4]
list(range(0, 5, 2))
enumerate
為 iterable 物件的每個元素加上索引
word = 'abc'
# 0: a
# 1: b
# 2: c
for idx, value in enumerate(word):
print(f'{idx}: {value}')
zip
將多個 iterable 的對應元素組合成 tuple
name = ['Aaron', 'John', 'Mary']
age = [12, 13, 14]
height = [170, 180, 160]
# Aaron is 12 years old with 170 cm
# John is 13 years old with 180 cm
# Mary is 14 years old with 160 cm
for name, age, height in zip(name, age, height):
print(f'{name} is {age} years old with {height} cm')
# <zip object at 0x1023a1300>
zip(name, age, height)
# [('Aaron', 12, 170), ('John', 13, 180), ('Mary', 14, 160)]
list(zip(name, age, height))
in
1 in [1, 2, 3] # True
'a' in 'abc' # True
'a' in [1, 2, 3] # False
user = {
"name": "John",
"age": 30,
"city": "New York"
}
'name' in user # True
'John' in user.values()
Statements
Conditional Statements
state = 'active'
if state == 'active':
print('The state is active')
elif state == 'inactive':
print('The state is inactive')
else:
print('The state is unknown')
For Loops
Iterate a list
brands = ['Ford', 'BMW', 'Volvo']
for brand in brands:
print(brand)
Combine with conditional statement:
brands = ['Ford', 'BMW', 'Volvo']
for brand in brands:
if brand == 'Volvo':
print(brand + ' - I like it!')
else:
print(brand + ' - I don\'t like it!')
Iterate a string
str = 'Hello World'
for letter in str:
print(letter)
Iterate a tuple
for t in (1, 2, 3):
print(t)
Tuple Unpacking
my_list = [(1, 2), (3, 4), (5, 6), (7, 8), (9, 10)]
for a, b in my_list:
print(a + b)
Iterate an dictionary
user = {
"name": "John",
"age": 30,
"city": "New York"
}
for key in user:
print(f'key: {key}')
print(f'value: {user[key]}')
for key, value in user.items():
print(f'key: {key}')
print(f'value: {value}')
for value in user.values():
print(value)
While Loops
x = 0
while x < 5:
# x = x + 1
x += 1
print(f'The current value of x is {x}')
# could combine with "else"
else:
print('X IS NOT LESS THAN 5')
Basic I/O
Read and Write files
mode='r': read onlymode='w': write only (will overwrite existing files or create new)mode='a': append onlymode='r+': read and writemode='w+': write and read (overwrite existing files or create new)
myfile = open('myfile.txt')
myfile.read()
myfile.seek(0) # reset cursor
myfile.readlines() # returns a list of lines
myfile.close() # close file
# 不用怕忘記 close 的寫法
# create or overwrite a file
with open('myfile.txt', mode="w") as my_new_file:
contents = my_new_file.write(
'this is a text file\nthis is the second line\nthis is the third line')
# read file
with open('myfile.txt', mode='r') as f:
contents = f.read()
print(contents)
# append file
with open('myfile.txt', mode='a') as f:
f.write('\nthis is the fourth line')
with open('myfile.txt', mode='r') as f:
contents = f.read()
print(contents)
Modules and Packages(模組與套件)
定義 Modules 和 Packages
- Module(模組)指的就是
.py「檔案」,在 Python 中不需定義要 export 哪些變數,可以自由 import - Package(套件)則是一個包含許多 modules 的「資料夾」,要變成 package,只需要把檔案(模組)整理在資料夾中,並在資料夾中加上
__init__.py的檔案 - 一個 Package 中還可以有 sub package,只要在該資料夾中同樣加上
__init__.py的即可。
簡單來說,在 Python 裡,module 指的是「檔案」,package 指的是「資料夾」。
舉例來說,我們的資料結構如下:
.
├── main_package
│ ├── __init__.py
│ ├── main_module.py # 裡面有定義 report_main_method()
│ └── sub_package
│ ├── __init__.py
│ └── sub_module.py # 裡面有定義 report_sub_method()
├── my_module.py # 裡面有定義 my_function()
└── main.py
如果想要在 main 中使用 my_module 模組中的 report_my_method 方法:
"""
可以直接匯入「整個模組」
"""
import my_module
my_module.report_my_method()
"""
也可以「匯入模組中」的「部分方法」
"""
from my_module import report_my_method
report_my_method()
如果想要在 main 中使用 main_package 套件的 main_module 模組裡 report_main_method 方法,可以這樣匯入:
"""
可以匯入「整個套件」
"""
import main_package
main_package.main_module.report_main_method()
"""
可以匯入「套件中」的「部分模組」
"""
from main_package import main_module
main_module.report_main_method()
"""
也可以匯入套件中的整個「模組」
"""
import main_package.main_module
main_package.main_module.report_main_method()
"""
也可以「匯入模組中」的「部分方法」
"""
from main_package.main_module import report_main_method
report_main_method()
和 sub_package 中的方法,只需要這樣 import 即可
# package 則是 folder name
from main_package import main_script
main_script.report_main()
from main_package.sub_package import sub_script
my_function()
sub_script.report_sub_script()
如果使用 from package import item 的寫法,這個 item 可以是 subpackage、function、class 或 variable;如果使用的是 import item.subitem.subsubitem 這種寫法的話,除了最後一個 item(subsubitem)外,其他都必須是 package,而最後這個 item 可以是 module 或 package,但不能是 function、class 或 variable。
- 匯入的模組也是物件,可以使用
dir(my_module)來檢視這個模組中有哪些方法可以使用 - 被匯入的模組(這隻檔案)會被 Python 執行,即使只匯入部分方法也是
- 使用
sys.path可以查看 Python 搜尋模組的路徑(透過sys.path.append('<path>')或sys.path.extend(['<path>'])可以動態添加 Python 尋找模組的路徑) - 使用
sys.modules可以查看哪些模組被匯入
如果要判斷某個檔案是不是直接被執行,而不是透過 import 的方式被載入後才執行,可以使用 __name__ == "__main__",當這個檔案是直接被執行時,會得到 true,否則,它會是模組的名稱。
如果有需要的話,也可以使用 as (alias)來替import 的 module 改名字:
import numpy as np
from matplotlib.pyplot import plot as plt
np.array([1, 2, 3])
plt([1, 2, 3], [4, 5, 6])
明確定義其他人 import module 的方式
因為 module 中的所有變數、方法都可以被 import,所以如果我們希望明確定義哪些是「希望」開放給其他人使用的 module 時,可以搭配 as 定義在 __init__.py 中。例如:
# fastapi/__init__.py
"""FastAPI framework, high performance, easy to learn, fast to code, ready for production"""
__version__ = "0.111.0"
from starlette import status as status
# 其他人在使用時,可以不用寫 "from fastapi import applications"
# 而是寫 "from fastapi import FastAPI"
# from <module> import <method>
from .applications import FastAPI as FastAPI
# ...
Strings
Text Sequence Type @ Built-in Types
Functions
一些特殊的屬性
keywords:__name__, __doc__, __annotations__
- 使用
fn.__name__可以的到該函式的名稱 - 使用
fn.__doc__可以檢視該函式的使用說明(如果有寫 Docstring 的話) - 使用
fn.__annotations__可以檢視函式參數的型別註記
Positional Arguments vs. Keyword Arguments
def greet(first_name, last_name):
return print(f"Hello, {first_name} {last_name}!")
# 使用 positional arguments
greet("John", "Doe")
# 使用 keyword arguments
greet(last_name="Doe", first_name="John")
在定義函式時可以使用 /、* 來指定哪些參數只能是 positional arguments、哪些參數只能是 keyword arguments:
/之前的參數一定要用 Positional Arguments*之後的參數一定要用 Keyword Arguments
def example_function(a, b, /, c, *, d, e):
print(f"a: {a}, b: {b}, c: {c}, d: {d}, e: {e}")
example_function(1, 2, 3, d=4, e=5) # Correct
example_function(1, 2, c=3, d=4, e=5) # Correct
取得所有傳入的參數
keywords: *args、**kwargs
在 Python 函式中,可以在參數前面加上 *,以此來捕捉使用者傳入的任意數量的「Positional Arguments」。這些 Positional Arguments 會被打包成一個「元組(tuple)」,讓你可以處理任意數量的輸入:
# 使用 "*args" 可以把所有輸入的參數變成一個 tuple
# "args" 只是一個變數名稱,可以自己取名
def display_items(*items):
print("Items in your collection:")
for idx, item in enumerate(items, 1):
print(f"Item {idx}: {item}")
display_items("Laptop", "Smartphone", "Tablet", "Headphones")
"""
Items in your collection:
Item 1: Laptop
Item 2: Smartphone
Item 3: Tablet
Item 4: Headphones
"""
在參數前面加上 * 可以取得 Positional Arguments;如果要取得 Keyword Arguments 的話,則要在參數前面加上 **,例如 **kwargs。這些 Keyword Arguments 會被包成一個 dict:
# 在參數前面加上 "**" 可以取得 keyword arguments
def collect_inputs(*args, **kwargs):
print("You entered these positional arguments:", args)
for i, arg in enumerate(args, 1):
print(f"Positional Argument {i}: {arg}")
print("You also entered these keyword arguments:")
for key, value in kwargs.items():
print(f"{key}: {value}")
collect_inputs("apple", "banana", "cherry", name="David", age=25)
"""
You entered these positional arguments: ('apple', 'banana', 'cherry')
Positional Argument 1: apple
Positional Argument 2: banana
Positional Argument 3: cherry
You also entered these keyword arguments:
name: David
age: 25
"""
*args 會是 tuple;**kwargs 會是 dict。
將傳入的參數「解開來」
上面我們是在定義函式的時候使用 * 或 **,在呼叫函式的時候,也可以使用 *,但意義上是不同的,如果在呼叫函式時使用 *,表示要把帶入的參數「解開來」,例如:
fruits = ["apple", "banana", "cherry"]
collect_inputs(*fruits, name="David", age=25)
# 等同於
collect_inputs("apple", "banana", "cherry", name="David", age=25)
使用 * 可以把 list 的資料解開來;如果是想要把 dict 的資料解開來的話,則一樣可以用 **:
david = {
"name": "David",
"age": 25
}
collect_inputs("apple", "banana", "cherry", **david)
# 等同於
collect_inputs("apple", "banana", "cherry", name="David", age=25)
當然,也可以兩個一起用:
fruits = ["apple", "banana", "cherry"]
david = {
"name": "David",
"age": 25
}
collect_inputs(*fruits, **david)
# 等同於
collect_inputs("apple", "banana", "cherry", name="David", age=25)
預設值(default values)要留意的地方
在 Python 中,要留意 default value 會被保存在 function 中,如果我們在 function 中 mutate 了 default value,則這個 function 在下一次呼叫時,會用的是新的、被改變過的 default value:
def mutate_default(my_list: list = []):
my_list.append(1)
return my_list
# my_list 的 default value 會被保存在 function 中,不會每次在 function 呼叫時就重新建立新的
mutate_default() # [1]
mutate_default() # [1, 1]
mutate_default() # [1, 1, 1]
在定義函式時,這個預設值的值就已經被建立(決定),而不是在函式被執行時才被決定。(參考:參數預設值 @ 為你自己學 Python)
要改善這個問題,其中常見的解法是把預設值設為 None,然後:
def mutate_default(my_list = None):
if my_list is None:
my_list = []
my_list.append(1)
return my_list
lambda
在 Lambda 表達式裡:
- 不能做賦值或宣告變數
- 不能使用型別註記
透過 lambda 可以:
- 把函式指派給某個變數
- 建立出匿名函式
# lambda [函式參數]: 函式內容(不需要特別加 return)
add = lambda x, y: x + y
result = add(2, 3) # 5
# 一樣可以幫參數帶入預設值
add = lambda x, y=3: x + y
result = add(2) # 5
lambda 特別適合用在一次性的函式,如此就不需要特別幫這個函式命名。例如:
employees = [
{"name": "Alice", "salary": 70000},
{"name": "Bob", "salary": 50000},
{"name": "Charlie", "salary": 120000},
]
# Sorting employees by salary using a lambda function
sorted_employees = sorted(employees, key=lambda x: x["salary"])
如果使用 lambda_fn.__name__ 的話,會拿到的是 '<lambda>' 而不是該 lambda 函式的名稱:
lambda_fn = lambda a: a + 1
lambda_fn.__name__ # '<lambda>'
decorators
- 函數裝飾器 @ 為你自己學 Python:說明如何建立 decorators
generators
def is_prime(num):
if num <= 1:
return False
for i in range(2, int(num ** 0.5) + 1):
if num % i == 0:
return False
return True
def find_primes(n):
for i in range(1, n + 1):
if is_prime(i):
# 使用 yield 會讓程式停在這
yield i
如果要一次拿出一個 generators 的值:
primes = find_primes(10)
next(primes) # 2
next(primes) # 3
next(primes) # 5
next(primes) # 7
next(primes) # StopIteration
如果要一次拿出所有 generator 的值,並保存在 list 中:
[prime for prime in find_primes(10)]
# [2, 3, 5, 7]
或者可以單純用 for:
for prime in find_primes(10):
print(prime)
# 2
# 3
# 5
# 7
Error and Exception Handling
Python 中內建的 Error 多是繼承自 BaseException 這個類別:
TypeError.__mro__ # (<class 'TypeError'>, <class 'Exception'>, <class 'BaseException'>, <class 'object'>)
Raise Error
拋出 Python 內建的錯誤
raise NameError # 沒有客製化錯誤訊息
raise NameError("Custom Error Message")
拋出 Exception
如果內建的錯誤類別不適用,可以使用更泛用的 Exception 類別:
raise Exception("Custom Error Message")
Try / Except
在 Python 中,如果在 try 中有錯誤發生時,程式會立即終止並跳到 except 中執行:
try:把可能會發生錯誤的程式包在裡面except:攔截並處理錯誤,搭配as可以取得錯誤的 Error 物件else:如果沒有錯誤則執行這裡finally:不論有沒有錯誤都會執行這裡
try:
print("Hello, World")
except NameError as err: #
# 攔截並處理 NameError 的例外
print("An exception occurred: {}".format(err))
except KeyError as err:
# 攔截並處理 KeyError 的例外
print("An exception occurred: {}".format(err))
except Exception:
# 攔截並處理任何類型的例外
print("An exception occurred")
else:
# 如果沒有發生錯誤才會執行
print("No exception occurred")
finally:
# 不論有沒有發生錯誤都會執行
print("Finally block executed")
攔截並處理特定類型的例外
try:
pass
except NameError as err: # 只攔截處理 NameError
print('A NameError occurred')
print(err)
同時攔截並處理多個類型的例外
在 except 後使用 tuple 就可以攔截並處理多個不同類型的錯誤:
try:
pass
except (NameError, ZeroDivisionError) as e:
print(e)
如果不同類型的錯誤需要有不同處理,則可以寫成:
try:
pass
except NameError:
print('A NameError occurred')
except KeyError:
print('A KeyError occurred')
except Exception:
print('A unknown Error occurred')
客製化錯誤(User-defined Exceptions)
雖然沒有強制,但一般命名會依照 Python Error 的慣例,以Error 結尾:
class CustomError(Exception):
pass
如果希望這個 Error 能接收客製化的錯誤訊息:
class OutOfStockError(Exception):
def __init__(self, product):
self.product = product
def __str__(self):
return f"The '{self.product}' is out of stock"
def main():
try:
raise OutOfStockError("iPhone")
except OutOfStockError as err:
print(err)
Parallel and Concurrent Python
在使用 Parallel 或 Concurrent 的方式來優化前,應該先考慮有沒有其他方式能優化執行速度,例如使用 NumPy 或 pandas,真的不行的話,才會考慮使用這裡提到的 multi-processing、multi-threading 或 asynchronous 的方式。
- CPU-bound: multi-processing
- I/O-bound: asyncio > multi-threading
對於 I/O-bound 的任務,只有在不支援 asyncio 的情況下才來使用 multi-threading 的做法。
Asyncio
asyncio.create_task(ASYNC_FN())- 把一個 async function 變成 task,執行
asyncio.create_task的時候該 async function 就會被排入 event loop 中被執行
- 把一個 async function 變成 task,執行
asyncio.run(ASYNC_FN())- 會建立一個 event loop,主要是給最外層沒有
asynckeyword 的 function(例如main)使用,否則會得到這個錯誤「SyntaxError: 'await' outside async function」
- 會建立一個 event loop,主要是給最外層沒有
asyncio.gather(ASYNC_FN_1(), ASYNC_FN_2(), ...):類似 JavaScript 中的Promise.all
Python 的 async 和 JavaScript 類似,都是基於 event loop 處理非同步任務,但 Python 需要 asyncio 來啟動 event loop,且要搭配 asyncio 可用的函式(例如,asyncio.sleep),適合用在 heavy I/O 。如果是 computation heavy 的任務(例如,CPI-bound 運算),即使使用 async 依然會導致 main thread 卡住(blocking),需要的話,需要使用 thread 或 process。
範例:asyncio 的基本使用
import asyncio
async def foo():
print("Foo start")
await asyncio.sleep(2)
print("Foo done")
async def bar():
print("Bar start")
await asyncio.sleep(1)
print("Bar done")
async def main():
task1 = asyncio.create_task(foo()) # Schedule foo()
task2 = asyncio.create_task(bar()) # Schedule bar()
# 雖然可以直接用 await foo() 但這樣就沒有利用到 concurrently 的效果
# await foo()
await task1 # Wait for foo() to finish
await task2 # Wait for bar() to finish
asyncio.run(main()) # Start the event loop
範例二:使用 await for...in (sequential) 或 asyncio.gather (concurrent)
import asyncio
async def async_hello(name: str) -> str:
"""Simulates an async greeting with a delay."""
await asyncio.sleep(1)
print(f"Hello {name}")
return name
async def main_sequential():
"""Runs tasks sequentially without concurrency."""
results = [await async_hello(name) for name in ["A", "B", "C"]]
print(*results)
async def main_concurrent():
"""Runs tasks concurrently for better performance."""
# 直接帶入 async job
# results = await asyncio.gather(async_hello("A"), async_hello("B"), async_hello("C"))
#
# 或透過 generator 帶入 job(節省記憶體)
# names = ["A", "B", "C"]
# results = await asyncio.gather(*(async_hello(name) for name in names))
#
# 或透過 list 帶入 job(可讀性較高)
tasks = [async_hello("A"), async_hello("B"), async_hello("C")]
results = await asyncio.gather(*tasks)
print(results)
if __name__ == "__main__":
asyncio.run(main_sequential())
asyncio.run(main_concurrent())
Thread
在 Python 中雖然提供 Thread 可以使用,然而在 CPython 中的 Global Interpreter Lock(GIL) 為了確保執行緒安全和記憶體一致性,限制了多 thread 的效能,讓一次只能有一個 thread 被執行,這導致在 CPython 中,對於 CPU-bound tasks 沒辦法實際達到 multi-threading 該有的效能。
Thread 適合用在 I/O-bound 且該函式庫尚未支援 asyncio 的情況,否則的話,直接使用 asyncio 即可。
如果是 I/O-bound(例如 sleep()、網路請求、文件讀寫等),ThreadPoolExecutor 可以提高執行速度。
如果是 CPU-bound(例如數據計算、壓縮、加密等),ThreadPoolExecutor 無法提高效能,甚至會因為 GIL 而變慢,這時應該使用 ProcessPoolExecutor。
建立和執行 Thread
from threading import Thread
import time
def do_work(sleep_time: float) -> None:
print("Start worker")
time.sleep(sleep_time)
print("End worker")
# 創建一個執行緒來執行 do_work 函式,並傳遞 2 秒的睡眠時間
worker_thread = Thread(target=do_work, name="t1", args=(2.0,))
print(f"Ident: {worker_thread.ident}") # None
print(f"Alive: {worker_thread.is_alive()}") # False
print(f"Name: {worker_thread.name}") # t1
# 啟動執行緒
worker_thread.start()
print(f"Ident: {worker_thread.ident}") # 系統分配的執行緒 ID,6132363264
print(f"Alive: {worker_thread.is_alive()}") # 執行緒啟動狀態,True
print(f"Name: {worker_thread.name}") # t1
# 等待執行緒完成工作
worker_thread.join() # 讓主執行緒等待 t1 結束
print("主執行緒繼續執行其他任務")
使用 ThreadPoolExecutor
使用 executor.map() 在大多數情況下可以讓程式碼更簡潔,且會自動依照輸入順序返回結果。但要注意一點:如果其中有任務拋出例外,executor.map() 會終止並拋出這個例外,導致後續任務的結果無法被取得。如果你需要在部分任務失敗時仍然獲得其他任務的結果,那麼使用 executor.submit() 配合 as_completed() 的方式會更靈活,因為你可以個別捕捉每個任務的例外並處理。
因此,選擇哪種方式取決於你的需求:
- 若能確保所有任務都能成功或你不需要個別捕捉失敗的任務,則
executor.map()是一個簡潔的選擇。 - 若需要確保部分任務失敗不影響整體處理,則應使用
executor.submit()配合as_completed()來處理例外。
可以使用 executor.submit() 來將 task 提交給 Thread Pool,並使用 .result() 來獲得結果:
submit()允許你在不同的時間提交多個 tasks,並且能夠為每個 task 獲取一個 Future 物件,進一步操作這些任務的狀態和結果- 錯誤攔截:需要特別留意,如果是使用
executor.submit()且沒有使用future.result()來取得結果,如果 thread 裡面發生錯誤,在 main thread 裡不會發現任何異狀
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def task(name, delay):
# 定義一個模擬長時間執行的任務
print(f'Task {name} started')
time.sleep(delay)
print(f'Task {name} completed')
return f'Result from {name}'
def main():
start = time.perf_counter()
# 使用 ThreadPoolExecutor 來管理執行緒池
# 創建一個有 3 個執行緒的 ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交任務
futures = [
executor.submit(task, 'A', 2),
executor.submit(task, 'B', 1),
executor.submit(task, 'C', 3),
]
# 針對不同 thread 的錯誤進行處理
for future in as_completed(futures):
try:
future.result()
except Exception:
print(traceback.format_exc())
end = time.perf_counter()
print(f'Execution duration: {end - start}')
或者也可以使用 executor.map():
- 與
submit()不同,map()是一次性提交所有任務並返回結果,它會按順序返回所有結果 - 使用
map()的話,只要有任何一個 thread 發生錯誤,則 main thread 都會收到這個錯誤,但無法針對 thread 的個別錯誤進行處理
from concurrent.futures import ThreadPoolExecutor
import time
# 定義一個簡單的任務
def task(n):
time.sleep(1)
return f"Task {n} completed"
def main():
# 創建 ThreadPoolExecutor,並使用 map() 方法
with ThreadPoolExecutor(max_workers=3) as executor:
results = executor.map(task, range(1, 6)) # 將 1 到 5 作為參數提交給 task 函數
# 輸出結果
for result in results:
print(result)
Process
Multi-processing 適合用在 CPU-bound tasks。
建立新的 Process 比起建立新的 Thread 要花時間,且每個 Process 之間並不會共享記憶體。
建立和執行 Process
這種寫法沒辦法取得 Process 回傳的結果:
import math
numbers = [
18014398777917439,
18014398777917439,
18014398777917439,
18014398777917439,
]
def is_prime(n: int) -> bool:
if n < 2:
return False
if n in (2, 3, 5, 7, 11, 13, 17):
return True
if (
n % 2 == 0
or n % 3 == 0
or n % 5 == 0
or n % 7 == 0
or n % 11 == 0
or n % 13 == 0
or n % 17 == 0
):
return False
upper_limit = int(math.sqrt(n)) + 1
for i in range(19, upper_limit, 2):
if n % i == 0:
return False
return True
import time
from multiprocessing import Process
def main_with_processes():
processes = [Process(target=is_prime, args=(n,)) for n in numbers]
start = time.perf_counter()
[p.start() for p in processes]
[p.join() for p in processes]
end = time.perf_counter()
print(f"time: {end - start}")
[p.close() for p in processes]
def main_without_multiprocessing():
start = time.perf_counter()
results = [is_prime(n) for n in numbers]
print(results)
end = time.perf_counter()
print(f"time: {end - start}")
if __name__ == "__main__":
main_with_processes()
main_without_multiprocessing()
使用 Process Pool 或 Process Pool Executor
使用 Pool 才能取得 Process 的回傳值:
from multiprocessing import Pool
def main_with_process_pool():
start = time.perf_counter()
with Pool() as pool:
result = pool.map(is_prime, numbers)
print(result)
end = time.perf_counter()
print(f"time: {end - start}")
也可以使用 ProcessPoolExecutor:
from concurrent.futures import ProcessPoolExecutor
def main_with_process_poll_executor():
start = time.perf_counter()
with ProcessPoolExecutor() as executor:
primes = executor.map(is_prime, numbers)
for number, prime in zip(numbers, primes):
print(f"{number} is prime: {prime}")
end = time.perf_counter()
print(f"time: {end - start}")
Generator
Generator 的特性
- 不會立即算出 value,而是在要被使用時才會計算出來
- 相較於 List,如果需要處理大量資料時,因為 generator 不會把所有值都先算好保存在記憶體中,因此相較於 list 可以省下許多記憶體
Generator Expression
# Generator expression to compute squares lazily
squares = (x * x for x in range(5)) # <generator object <genexpr> at 0x...>
print(next(squares)) # 0
# 或者用 for loop 全部 iterate 出來
for num in (x * x for x in range(5)):
print(num)
變數作用域
keywords: globals()、locals(), global, nonlocal
作用域 @ 為你自己學 Python