[Golang] Channels
此篇為各筆記之整理,非原創內容,資料來源可見文後參考資料。
TL;DR
Unbuffered, open | Unbuffered, closed | Buffered, open | Buffered, close | nil | |
---|---|---|---|---|---|
Read | 會等到 channel 中有資料被寫入 | 回傳 zero value,可以使用 v, ok 來確保 channel 還沒被關閉 | 會持續讀取,除非 channel 中沒有任何資料才會停住 | 會回傳 channel 中還沒被讀取的資料。如果 channel 中已經沒有資料,則回傳 zero value,一樣可以用 v, ok 來檢查看看 channel 是否已被關閉 | 一直停住 |
Write | 會等到 channel 中的資料被讀取 | PANIC | 會持續寫入,除非 channel 的 buffer 滿了才會停住 | PANIC | 一直停住 |
Close | 可以 | PANIC | 可以,channel 中還沒被讀取的資料會被保存下來 | PANIC | PANIC |
- 使用 channel 的重要原則是:把資料寫進 channel 的那個 goroutine,要負責在沒有資料要繼續寫入的時候,把 channel 關掉;比較特殊的情況時,如果有多個 goroutine 會同時把資料寫進 channel 的話,可能需要搭配
sync.WaitGroup
使用 - unbuffered channel 指的是 buffer size 為 0 的 channel
- 對於 unbuffered channel 來說,不論是從 channel 讀資料(需等到被寫入),或把資料寫入 channel 中時(需等到被讀出),都會阻塞該 goroutine
- 對於 buffered channel 來說:
- 從 channel 讀值時若是 empty buffer 時才會阻塞,否則都是 non-blocking
- 把資料寫入 channel 中時,寫入 channel 中的 value 數目(n + 1)需要超過 buffer size(n),也就是溢出(overflow)時才會使得該 goroutine 被阻塞;而且一旦 buffer channel 中的值開始被讀取,就會被全部讀完
// go routine
go f(x, y, z)
// channels
// 和 maps, slices, channels 一樣需要在使用前被建立,這裡表示定義的 chan 會回傳 int
ch := make(chan int)
ch <- v // Send v to channel ch.
v, ok := <-ch // Receive from ch, and
// assign value to v.
建立 Channel
建立 channel 的方式:
var zeroC chan int // channel 的 zero value 是 nil
unbufferedC := make(chan int) // unbuffered channel 的 buffered size 是 0
bufferedC := make(chan int, 3) // capacity 為 3 的 buffered channel
如果是要把 channel 當成 function parameter 傳遞的話,可以定義該 channel 應該要是 sender、receiver 或都可以:
chan<-
:send only channel 的意思是,只能把東西丟(send)進去 channel<-chan
:receive only channel 的意思是,只能把東西從 channel 拿(receive)出來
// send only channel 的意思是,只能把東西丟(send)進去 channel
func send(c chan<- int) {
c <- 1
}
// receive only channel 的意思是,只能把東西從 channel 拿(receive)出來
func receive(c <-chan int) {
fmt.Println(<-c)
}
-
channel 的預設值(default value)是
nil
-
如果試圖讀取 nil channel 的話會 deadlock
-
如果試圖關閉(close)
nil channel
的話會 panic -
重複關閉一個已經關閉過的 channel 也會 panic
-
為了避免無預期 deadlock 或 panic,最好可以有一個 channel owner 的 goroutine,並遵循下面的原則:
- 在這個 goroutine 中建立 channel
- 這個建立 channel 的 goroutine 也要負責把資料寫進 channel
- 這個建立 channel 的 goroutine 也要負責把 channel 給關閉
這個負責把資料寫進 channel、並關掉 channel 的 goroutine 就是這個 channel 的 owner;其他的 goroutine 則都只會從這個 channel 讀資料出來而已。舉例來說:
package main
import "fmt"
func main() {
owner := func() <-chan int {
c := make(chan int)
// channel owner goroutine
go func() {
defer close(c)
for i := 0; i < 5; i++ {
fmt.Println("Send:", i)
c <- i
}
}()
return c
}
consumer := func(ch <-chan int) {
// read values from channel
for v := range ch {
fmt.Printf("Received: %d\n", v)
}
fmt.Println("Done receiving!")
}
ch := owner()
consumer(ch)
}
//Send: 0
//Send: 1
//Received: 0
//Received: 1
//Send: 2
//Send: 3
//Received: 2
//Received: 3
//Send: 4
//Received: 4
//Done receiving!
如此,可以確保在寫入資料到 channel 或關閉 channel 時,這個 channel 不會是 nil
的,以此避免 deadlock 或 panic 出現。
Unbuffered Channels
func main() {
// channel 的 zero value 是 nil
var zeroC chan int
fmt.Println(zeroC)
// 一般建立 channel 的方式
c := make(chan int) // unbuffered channel
fmt.Printf("type of c is %T\n", c) // type of c is chan int
fmt.Printf("value of c is %v\n", c) // value of c is 0xc000062060
}
- 所有的 unbuffered channel 操作預設都是 blocking 的(所以它算是 synchronous 的操作)
- 當有資料要寫入 channel 時,goroutine 會阻塞住,直到有其他的 goroutine 從該 channel 把值讀出來
- 當有資料要讀取 channel 中的值時,goroutine 也會阻塞,直到其他 goroutine 把值寫入 channel 中
- 也就是說,當我們是的把資料寫入 channel 或從 channel 中取出資料時,該 goroutine 都會阻塞住,並且將控制權交給其他可以運行的 goroutines
// 程式來源:https://medium.com/rungo/anatomy-of-channels-in-go-concurrency-in-go-1ec336086adb
func greet(c chan string) {
fmt.Println("Hello " + <-c + "!")
}
func main() {
fmt.Println("main() started")
c := make(chan string)
go greet(c)
// block here (把控制權交給其他 goroutine,這裡也就是 greet)
c <- "John"
fmt.Println("main() stopped")
}
⚠️ 雖然讀取值的時候會 blocking 等到有值出來,但並不表示來得及被 print 出來,以下面的程式為例:
func greet(c chan string) {
fmt.Println(<-c)
fmt.Println(<-c)
}
func main() {
fmt.Println("main() started")
c := make(chan string)
go greet(c)
c <- "John"
c <- "Mike"
fmt.Println("main() stopped")
}
// main() started
// John
// main() stopped
輸出結果只會看到 John
,這是因為雖然第二個 <-c
一樣會 blocking 並等待值送入 channel,但是當 Mike 的值送入 channel 中,而 greet goroutine 收到值要 print 出來時,main goroutine 已經執行結束了,因此最終我們看不到 Mike(若想看到 Mike 可以在 main goroutine 使用 time.Sleep(time.Millisecond)
)
Close Channel(關閉頻道)
- 當 Channel 被關閉後,如果對這個 channel 在送值進去,或者重複關閉這個 channel,都會導致 panic;但如果是讀取關閉的 channel 並不會。
- 如果是 buffered channel 在關閉時,channel 內還有資料未被讀取出來,則這些資料會被依照順序 return 出來。
- 當 channel(不論 buffered 或 unbuffered)關閉時,如果 channel 沒有未被讀取的資料,則會 return 這個 channel 型別的 zero value。
- 關閉 channel 是將資料寫進 channel 的那個 goroutine 的責任,不過,如果不是在
for-range
loop 這種需要等待 channel close 後才能往下執行的程式中,因為 channel 也只是個變數,所以一般來說,只要這個 channel 不再被取用,go 就會對其做 GC。
判斷 channel 是否已經關閉
c := make(chan string)
close(c) // 關閉 channel
val, ok := c // ok 如果是 false 表示 channel 已經被關閉
- 當 channel 已經被關閉時,
ok
會是false
,value
則會是 zero value - ⚠️ 只有 sender 可以使用
close
,receiver 使用的話會發生 panic。
Deadlock
由於 channel 的資料在讀/寫時,goroutine 會阻塞,並且將控制權交給其他可以運行的 goroutines,因此若沒有其他可以運行的 goroutines 時,就會發生 deadlock
的情況,整個程式則會 crash。
也就是說,如果你試著從 channel 中讀資料,但 channel 中並沒有可以被讀取的值時,它會使得當前的 goroutine 阻塞,並期待其他 goroutine 會把值塞入這個 channel,此時「讀取資料」的操作會阻塞。相似地,如果你想要傳送資料到某一個 channel 中,它同樣會阻塞當前的 goroutine,並期待其他的 goroutine 有人去讀取這個值,這時候「寫入資料(send operation)」的操作會被阻塞。
- 從 channel 中讀不到資料 -> 讀取資料的操作會阻塞 -> deadlock
- 寫入資料到 channel -> 沒人讀取此 channel 的值 -> 寫入資料的操作會阻塞 -> deadlock
// 只是寫入資料但沒有 channel 讀取 -> deadlock
func main() {
fmt.Println("main() started")
// 只是寫入資料但沒有 channel 讀取
c := make(chan string)
c <- "John"
fmt.Println("main() stopped")
}
相似地:
// 有 goroutine 要讀取 channel 但沒 goroutine 寫入資料到 channel -> deadlock
func greet(c chan string) {
// 要讀取 channel 但沒人寫入資料
fmt.Println("Hello " + <-c + "!")
}
func main() {
fmt.Println("main() started")
c := make(chan string)
// c <- "John"
greet(c)
fmt.Println("main() stopped")
}
Buffered channel
Buffered Channel 在 sender 和 receiver 中:
- 會指定 buffer 的大小(capacity)
- 它是屬於 asynchronous channel,也就是說 sender 和 receiver 不需要同時在 channel 上,只要 buffer 沒有滿,sender 就可以繼續寫入資料,程式就不會被阻塞;只要 buffer 不是空的,receiver 都可以讀取資料,程式就不會被阻塞。
- in-memory FIFO queue
buffered channel 寫值時,需要在 overflow 時才會 block goroutine
- unbuffered channel 指的是 buffered size 為 0 的 channel。
- unbuffered channel 不論是「從 channel 讀值」(需等到值被其他 goroutine 寫入),或「把值寫入 channel」(需等到值被其他 goroutine 讀出)都會阻塞當下的 goroutine。
- 當 buffer size 不是 0 的話,就屬於 buffered channel
- 「從 channel 讀值」時,只有在 buffered 是空的時才會 blocking
- 「把值寫入 channel」時,該 goroutine 並不會被阻塞,除非該 buffer 已經填滿(full)且溢出(overflow)。當 buffer 已經填滿(full)時,再把新的一筆資料傳入 channel 時會造成溢出(overflow),此時 goroutine 才會被阻塞。
- 讀值的動作一旦開始,就會一直到 buffer 變成 empty 為止才會結束。也就是說,讀取 channel 的那個 goroutine 需到等到 buffer 完全清空後才會阻塞。
舉例來說,在建立一個 buffered channel 後:
// 這個 chan 只能接收兩個長度的 buffer
// channel := make(chan [type], [size])
ch := make(chan int, 2)
- 寫值:直到該 channel 寫入到 n+1 個值以前,它都不會阻塞當前的 goroutine。
- 讀值:從該 channel 讀值時,若 buffer 是 empty 才會阻塞當前的 goroutine。
以實際的例子來說:
- 現在 buffered channel 的 size 是 3
- 在 buffered channel 中寫入 3 個值
- 由於寫入 channel 的值並沒有超出 buffered channel 的 size,因此 main goroutine 並不會被阻塞,使得 print goroutine 不會有機會取得控制權而被執行
// 程式來源:https://medium.com/rungo/anatomy-of-channels-in-go-concurrency-in-go-1ec336086adb
// 透過 squares goroutine 讀值
func print(c chan int) {
for i := 0; i <= 3; i++ {
fmt.Println(<-c)
}
}
// 在 main goroutine 寫值
func main() {
fmt.Println("main() started")
// 建立 buffered size 為 3 的 channel
c := make(chan int, 3)
go print(c)
// 寫入 3 個值
c <- 1
c <- 2
c <- 3
fmt.Println("main() close")
}
// Output:
// main() started
// main() close
但如果在 main goroutine 中多一個值寫入 channel 中(c <- 4
),此時 main goroutine 就會在這裡被 block 住:
- 在 main goroutine 中,使用了
c <- 4
後,因為超過 buffered channel 的 size,也就是溢出(overflow),因此在這裡會阻塞 - main goroutine 阻塞後,print goroutine 便有機會執行,一旦 print goroutine 開始讀取 channel 的值後,它就會把該 buffer 中的所有值都讀全部讀完
// 由於 main goroutine 被 block,print goroutine 有機會被執行
// 一旦 receiver 開始讀值,就會把所有 buffer 中的值全部讀完直到清空
func print(c chan int) {
for i := 0; i <= 3; i++ {
fmt.Println(<-c)
}
}
// 在 main goroutine 寫值
func main() {
fmt.Println("main() started")
c := make(chan int, 3)
go print(c)
c <- 1
c <- 2
c <- 3
c <- 4 // 因為超過 buffered size,這裡會 block
fmt.Println("main() close")
}
// main() started
// 1
// 2
// 3
// 4
// main() close
另一個範例:
/* Buffered Channels 即使寫值後,不用等待值被讀取,主程式就會結束 */
func main() {
c := make(chan bool, 1)
go func() {
fmt.Println("GO GO GO") // 有可能因為主程式已經執行完而看不到
// 使用 Buffered Channel 的話,不會等到 channel 中的值讀完才結束主程式
fmt.Printf("Receive value from channel %v\n", <-c)
}()
fmt.Println("Before Receive")
// STEP 1:寫入 channel
c <- true
// c <- false // block here
fmt.Println("After Receive")
}
// Before Receive
// [GO GO GO]
// [Receive value from channel true]
// After Receive
buffered channel 也有 length 和 capacity
- 和 slice 很類似,buffered channel 也有 length 和 capacity
length
指的是在 channel buffer 中還有多少數量的值還沒被讀取(queued),可以使用len(channel)
查看capacity
則是指 buffer 實際的 size,可以使用cap(channel)
查看
// 這段程式之所以不會產生 deadlock 是因為 channel 還沒有出現 overflow,所以不會 block
func main() {
c := make(chan int, 3)
c <- 1
c <- 2
fmt.Printf("Length of channel c is %v and capacity of channel c is %v \n", len(c), cap(c))
}
// Length of channel c is 2 and capacity of channel c is 3
使用 for range 可以讀取 close 後 buffered channel 中的值
func main() {
c := make(chan int, 3)
c <- 1
c <- 2
c <- 3
close(c)
for val := range c {
fmt.Println(val)
}
}
使用 Buffered Channel 的時機
- 當你知道你同時有多少個 goroutine 在執行
- 或是,但你希望限制 goroutine 的數量
- 又或著,你希望限制能有多少數量的 tasks 被 queue 起來
Unidirectional channels
除了可以建立可讀(read)可寫(write)的 channel 之外,還可以建立「只可讀(receive-only)」或「只可寫(send-only)」的 channel:
// 建立只可接收或只可發送的 unidirectional channels
func main() {
roc := make(<-chan int) // receive only channel
soc := make(chan<- int) // send only channel
fmt.Printf("receive only channel type is '%T' \n", roc)
fmt.Printf("send only channel type is '%T' \n", soc)
}
// receive only channel type is '<-chan int'
// send only channel type is 'chan<- int'
- 透過 unidirectional channels 可以增加型別的安全性(type-safety)
- 但如果我們希望在某一個 goroutine 中只能從 channel 讀取資料,但在 main goroutine 中可以對這個 channel 讀和寫資料時,可以透過 go 提供的語法來將 bi-directional channel 轉換成 unidirectional channel
// <-chan 表示 receive only channel type
func greet(roc <-chan string) {
fmt.Println("Hello " + <-roc + "!")
// receive only channel 不能傳送資料
// invalid operation: cannot send to receive-only type <-chan string
// roc <- "foo"
}
func main() {
fmt.Println("main() started")
c := make(chan string)
go greet(c)
c <- "John"
fmt.Println("main() stopped")
}
Multiple Channels
當前的 goroutine 阻塞時,就會切換到其他 goroutine
// 程式碼修改自:https://medium.com/rungo/anatomy-of-channels-in-go-concurrency-in-go-1ec336086adb
func square(c chan int) {
fmt.Println("[square] wait for testNum")
num := <-c
fmt.Println("[square] sent square to squareChan (blocking)")
c <- num * num // 3. blocking, switch to other go routine
}
func cube(c chan int) {
fmt.Println("[cube] wait for testNum (blocking)")
num := <-c
fmt.Println("[cube] sent square to cubeChan")
c <- num * num * num // blocking
}
func main() {
fmt.Println("[main] main() started")
squareChan := make(chan int)
cubeChan := make(chan int)
go square(squareChan)
go cube(cubeChan)
testNum := 3
fmt.Println("[main] sent testNum to squareChan (blocking)")
squareChan <- testNum // 1. block, switch to other goroutine
fmt.Println("[main] resuming")
fmt.Println("[main] sent testNum to cubeChan")
cubeChan <- testNum
fmt.Println("[main] resuming")
fmt.Println("[main] reading from channels (blocking)")
squareVal, cubVal := <-squareChan, <-cubeChan
fmt.Println(squareVal, cubVal)
fmt.Println("[main] main() stopped")
}
First-class channels
在 golang 中 channel 是 first-class values,和其他型別一樣,可以被當成是 struct
中的值、function 的參數、回傳值等等。
以下面的例子來說,make(chan chan string)
表示這個 channel 可以傳送和接收另一個(可以傳送和接收 string 的)channel
func greeter(cc chan chan string) {
c := make(chan string)
cc <- c
}
func greet(c chan string) {
fmt.Println("Hello " + <-c + "!")
}
func main() {
fmt.Println("main() started")
// a channel of data type channel of data type string
// 建立一個 channel 可以讀寫另一個(可以讀寫 string)的 channel
cc := make(chan chan string)
go greeter(cc)
c := <-cc
go greet(c)
c <- "John"
fmt.Println("main() stopped")
}
// main() started
// Hello John!
// main() stopped
讀取 Channels 中的資料
for loop 搭配 close:需要手動 break 迴圈
使用 for{}
來顯示內容,但需要手動 break loop:
for {
val, ok := <-channel
if ok == false {
// break the loop
break
} else {
// do something with val
fmt.Println(val)
}
}
實際範例:
func squares(c chan int) {
// 把 0 ~ 9 寫入 channel 後便把 channel 關閉
for i := 0; i <= 9; i++ {
c <- i
}
close(c)
}
func main() {
fmt.Println("main() started")
c := make(chan int)
// 發動 squares goroutine
go squares(c)
// 監聽 channel 的值:週期性的 block/unblock main goroutine 直到 squares goroutine close
for {
val, ok := <-c
if ok == false {
fmt.Println(val, ok, "<-- loop broke case channel closed")
break // exit loop
} else {
fmt.Println(val, ok)
}
}
fmt.Println("main() close")
}
for range 搭配 close:會自動結束迴圈
-
在上一段程式碼中,如果單純透過
for
來讀取 channel 中的資料,需要自行判斷 channel 是否已經 close,如果已經 close,則需要自行使用break
把 for loop 終止。在 Go 中則提供了for range
loop,只要該 channel 被關閉後,loop 則會自動終止。 -
Blocking:如果 channel 中沒有資料,則該 goroutine 會等在那裡,直到「有新的資料進來」或「channel 被關閉」
-
跳出迴圈的條件
- channel 被關閉
- 遇到
break
或return
-
需要特別留意,如果是在 main goroutine 中使用
for val := range channel {}
的寫法時,最後 channel 沒有被 close 的話程式會 deadlock。但如果是在其他的 goroutine 中使用,即使沒有close
也不會 deadlock,但為了不必要的 bug 產生,一般都還是將其關閉比較好。
// 使用 range 時,只會拿到 value,不會拿到 ok(第二個 return value)
for val := range channel {
// 當 channel 關閉時會自動 break loop
}
for val := range c
後面的 c
直接帶入 channel,不用再使用 <-c
(箭頭)。
透過 for i := range c
可以重複取出 channel 的值,直到該 channel 被關閉。和上面一段的單純使用 for
loop 的寫法相比,精簡許多:
func squares(c chan int) {
// 把 0 ~ 9 寫入 channel 後便把 channel 關閉
for i := 0; i <= 9; i++ {
c <- i
}
close(c)
}
func main() {
fmt.Println("main() started")
c := make(chan int)
// 發動 squares goroutine
go squares(c)
// 使用 for range 的寫法,一但 channel close,loop 會自動 break
for val := range c {
fmt.Println(val)
}
fmt.Println("main() close")
}
需要特別留意,使用 for val := range channel {}
的寫法時,如果最後 channel 沒有被 close 的話程式會 deadlock。
使用 for val := range c
的其他範例:
//STEP 2:建立 fibonacci 函式
func fibonacci(n int, c chan int) {
x, y := 0, 1
for i := 0; i < n; i++ {
c <- x
x, y = y, x+y
}
// ⚠️ 如果沒有 close 的話,在 STEP 4 的地方並不知道
close(c)
}
func main() {
// STEP 1:建立一個 length 為 10 的 buffered channel
c := make(chan int, 10)
//STEP 3:使用 go routine
go fibonacci(cap(c), c)
// STEP 4:只要 channel 還有資料就輸出
for i := range c {
fmt.Println(i)
}
// ⚠️ 這裡的內容會等到所有 c channel 資料都算出後才會執行到...
}
select case
和 switch case
不同,select case
中 case 接收的是 channel(而不是 boolean)。當程式執行到 select
的位置時,會阻塞(blocking)在那,直到有任何一個 case 收到 channel 傳來的資料後(除非有用 default
)才會 unblock,因此通常會有另一個 channel 用來實作 Timeout 機制。
select {
case res := <-ch1: // 如果需要取用 channel 中的值
fmt.Println(res)
case <-ch1: // 如果不需要取用 channel 中的值
fmt.Println("receive value")
}
和 for value := range channel
不同,select case
中,case
後的 channel 要記得加上 <-
把資料取出來。
select case
運作的流程是:
- 如果所有的 case 都沒有接收到 channel 傳來的資料,那麼 select 會一直阻塞(block)在那,直到有任何的 case 收到資料後才會繼續執行
- 如果同一時間有多個 case 收到 channel 傳來的資料,那將會從所有這些 non-blocking 的 cases 中隨機挑選一個,接著才繼續執行
在下面的例子中,由於使用的是 unbuffered channel,因此 channel 只要有 send 或 receive 的動作時都會 block:
// 程式來源:https://medium.com/rungo/anatomy-of-channels-in-go-concurrency-in-go-1ec336086adb
var start time.Time
func init() {
start = time.Now()
}
func service1(c chan string) {
time.Sleep(1 * time.Second)
c <- "Hello from service 1"
}
func service2(c chan string) {
time.Sleep(3 * time.Second)
c <- "Hello from service 2"
}
func main() {
fmt.Println("main() started", time.Since(start))
chan1 := make(chan string)
chan2 := make(chan string)
go service1(chan1)
go service2(chan2)
fmt.Println("[main] select(blocking)")
select {
case res := <-chan1:
fmt.Println("[main] get response from service 1", res, time.Since(start))
case res := <-chan2:
fmt.Println("[main] get response from service 2", res, time.Since(start))
}
fmt.Println("main() stopped", time.Since(start))
}
// main() started 585ns
// [main] select(blocking)
// [main] get response from service 1 Hello from service 1 1.001434164s
// main() stopped 1.001481394s
隨機選取
上面的程式中使用的是 unbuffered channel,所以對該 channel 任何的 send 或 receive 都會出現阻塞。
我們可以使用 buffered channel 來模擬實際上 web service 處理回應的情況:
- 由於 buffered channel 的 capacity 是 2,但傳入 channel 的 size 並沒有超過 2(沒有 overflow),因此程式會繼續執行而不會發生阻塞(non-blocking)
- 當 buffered channel 中有資料時,直到整個 buffer 都被清空為止前,從 buffered channel 讀取資料的動作都是 non-blocking 的,而且在下面的程式碼中又只讀取了一個值出來,因此整個 case 的操作都會是 non-blocking 的
- 由於 select 中的所有 case 都是 non-blocking 的,因此 select 會從所有的 case 中隨機挑一個加以執行
// 程式碼來源:https://medium.com/rungo/anatomy-of-channels-in-go-concurrency-in-go-1ec336086adb
func main() {
fmt.Println("main() started", time.Since(start))
chan1 := make(chan string, 2)
chan2 := make(chan string, 2)
// buffered channel:因為 channel 中的資料沒有 overflow (> 2),所以不會阻塞
chan1 <- "Value 1"
chan1 <- "Value 2"
chan2 <- "Value 1"
chan2 <- "Value 2"
// buffered channel 中有資料時,讀取資料會是 non-blocking 的
// 由於 select 中的 case 都是 non-blocking 的,因此會隨機挑選一個執行
select {
case res := <-chan1:
fmt.Println("[main] get response from service 1", res, time.Since(start))
case res := <-chan2:
fmt.Println("[main] get response from service 2", res, time.Since(start))
}
fmt.Println("main() stopped", time.Since(start))
}
當 select 中的 case 同時收到 channel 的資料時,會隨機選取一個 channel:
/* Go 語言使用 Select 四大用法:https://blog.wu-boy.com/2019/11/four-tips-with-select-in-golang/ */
func main() {
ch := make(chan int, 1)
ch <- 1
select {
case <-ch:
fmt.Println("random 01")
case <-ch:
fmt.Println("random 02")
case <-ch:
fmt.Println("random 03")
}
}
default:建立一個 unblocking 的 select
default
case 本身是非阻塞的(non-blocking),同時它也會使得 select
statement 總是變成 non-blocking,也就是說,不論是 buffered 或 unbuffered channel 都會變成非阻塞的。
當有任何資料可以從 channel 中取出時,select
就會執行該 case,但若沒有,就會直接進到 default
case。簡單的說,當 channel 本身就有值時,就不會走到 default
,但如果 channel 執行的當下沒有值,還需要等其他 goroutine 設值到 channel 的話,就會直接走到 default
。
簡單的說,當 channel 本身就有值時,就不會走到 default
,但如果 channel 執行的當下沒有值,還需要等其他 goroutine 設值到 channel 的話,就會直接走到 default
。
如果沒有資 料送進 channel,也就是註解掉 ch <- 1
的話,程式會出現 panic(fatal error: all goroutines are asleep - deadlock!
),這是因為它認為應該要從 channel 取到值,但卻沒有得到任何東西,雖然加上 default
後可以解決,但會使得 select case
不會被阻塞住,導致還沒收到 channel 的訊息前,main goroutine 就執行完畢了:
/* Go 語言使用 Select 四大用法:https://blog.wu-boy.com/2019/11/four-tips-with-select-in-golang/ */
func main() {
ch := make(chan int, 1)
//ch <- 1
select {
case <-ch:
fmt.Println("random 01")
case <-ch:
fmt.Println("random 02")
default: // 當沒有 value 從 channel 送出的話,會走 default
fmt.Println("exit: no value from channel")
}
}
Timeout 超時機制:使用 time.After
單純使用 default
case 並不是非常有用,有時我們希望的是有 timeout 的機制,也就是超過一定時間後,沒有收到任何回應時,才做預設的行為,這時候我們可以使用 time.After
來完成:
time.After()
和time.Tick()
都是會回傳time.Time
型別的 receive channel(<- channel
)
/* Go 語言使用 Select 四大用法:https://blog.wu-boy.com/2019/11/four-tips-with-select-in-golang/ */
func main() {
ch := make(chan int)
select {
case <-ch:
fmt.Println("receive value from channel")
// 超過一秒沒有收到主要 channel 的 value,就會收到 time.After 送來的訊息
case <-time.After(1 * time.Second):
fmt.Println("timeout after 1 second")
}
}
以原本的例子來說:
- service1 會需要花 3 秒
- service2 會需要花 5 秒
time.After()
設定 2 秒,因此在 service1 和 service2 還沒完成前就會觸發 timeout
var start time.Time
func init() {
start = time.Now()
}
func service1(c chan string) {
fmt.Println("service1() started", time.Since(start))
time.Sleep(3 * time.Second)
c <- "Hello from service1"
}
func service2(c chan string) {
fmt.Println("service2() started", time.Since(start))
time.Sleep(5 * time.Second)
c <- "Hello from service2"
}
func main() {
fmt.Println("main() started", time.Since(start))
chan1 := make(chan string, 1)
chan2 := make(chan string, 1)
go service1(chan1)
go service2(chan2)
select {
case res := <-chan1:
fmt.Println("get response from service 1", res, time.Since(start))
case res := <-chan2:
fmt.Println("get response from service 2", res, time.Since(start))
case <-time.After(2 * time.Second):
fmt.Println("No response received", time.Since(start))
}
}
Empty Select
如同 for{}
迴圈可以不帶任何條件一樣,select {}
也可以不搭配 case 使用(稱作,empty select)。
從前面的例子中可以看到,因為 select
statement 會一直阻塞(blocking),直到其中一個 case
unblocks 時,才會繼續往後執行,但因為 empty select 中並沒有任何的 case
statement,因此 main goroutine 將會永遠阻塞在那,如果沒有其他 goroutine 可以持續運行的話,最終導致 deadlock。
func service() {
fmt.Println("Hello from service ")
}
func main() {
fmt.Println("main() started")
go service()
// 這個 select 會永遠 block 在這
select {}
fmt.Println("main() stopped")
}
// main() started
// Hello from service
// fatal error: all goroutines are asleep - deadlock!
如果在 main goroutine 使用 empty select 後,main goroutine 將會完全阻塞,需要靠其他的 goroutine 持續運作才不至於進入 deadlock:
// main goroutine 持續 block 的情況下
// 需要靠其他 goroutine 持續運行才不會進入 deadlock
func service() {
for {
fmt.Println("Hello from service ")
time.Sleep(time.Millisecond * 400)
}
}
func main() {
fmt.Println("main() started")
go service()
// 這個 select 會永遠 block 在這
select {}
fmt.Println("main() stopped")
}
// main() started
// Hello from service
// Hello from service
// ...
另外透過 empty select 導致 main goroutine 阻塞的這種方式,可以在 server 啟動兩個不同的 service:
var start time.Time
func init() {
start = time.Now()
}
func service1() {
for {
fmt.Println("Hello from service1 ", time.Since((start)))
time.Sleep(time.Millisecond * 500)
}
}
func service2() {
for {
fmt.Println("Hello from service2 ", time.Since((start)))
time.Sleep(time.Millisecond * 700)
}
}
func main() {
fmt.Println("main() started")
go service1()
go service2()
// 這個 select 會永遠 block 在這,service1 和 service2 輪流輸出訊息
select {}
fmt.Println("main() stopped") // 這行不會被執行到
}
判斷是否超過 channel 的 buffer size
/* Go 語言使用 Select 四大用法:https://blog.wu-boy.com/2019/11/four-tips-with-select-in-golang/ */
func main() {
// STEP 1:建立一個只能裝 buffer size 為 1 資料
ch := make(chan int, 1)
ch <- 1
select {
case ch <- 2:
fmt.Println("channel value is", <-ch)
fmt.Println("channel value is", <-ch)
default:
// ch 中的內容超過 1 時,但若把 channel buffer size 的容量改成 2,就不會走到 default
fmt.Println("channel blocking")
}
}
for-select loop:讀取多個 channel 的 value
// A Tour of Go: https://tour.golang.org/concurrency/6
func main() {
tick := time.Tick(100 * time.Millisecond)
boom := time.After(500 * time.Millisecond)
for {
select {
case <-tick:
fmt.Println("tick.")
case <-boom:
fmt.Println("BOOM!")
return // 如果沒有 return 的話程式將不會結束,一直卡在 for loop 中
default:
fmt.Println(" .")
time.Sleep(50 * time.Millisecond)
}
}
}
在 for-select
loop 裡如果有 default
case,一般來說可能是程式寫錯,因為一旦其他 case 沒有辦法 READ 或 WRITE,則這個 default
就會一直被重複執 行到,除非有在某個 case 中呼叫 return
來跳出這個 function,或用 break <LOOP_NAME>
來跳出這個 for loop,否則將導致無窮迴圈,或造成 CPU overhead。
使用範例:
/* Go 語言使用 Select 四大用法:https://blog.wu-boy.com/2019/11/four-tips-with-select-in-golang/ */
func main() {
ch1 := make(chan string)
ch2 := make(chan int, 1)
defer func() {
fmt.Println("------ In defer ------")
close(ch1)
close(ch2)
}()
i := 0
//STEP 1:建立一個 Go Routine
go func() {
fmt.Println("In Go Routine")
// STEP 2:透過 for loop 來不停來監控不同 channels 傳回來的資料
LOOP:
for {
// STEP 3:透過 sleep 讓它每 500 毫秒檢查有無 channel 傳訊息進來
time.Sleep(500 * time.Millisecond)
i++
fmt.Printf("In Go Routine, i: %v, time: %v \n", i, time.Now().Unix())
// STEP 4:透過 select 判斷不同 channel 傳入的資料
select {
case m := <-ch1: // STEP 6:當收到 channel 1 傳入的資料時,就 break
fmt.Printf("In Go Routine, get message from channel 1: %v \n", m)
break LOOP
case m := <-ch2: // STEP 7:當收到 channel 2 傳入的資料時,單純輸出訊息
fmt.Printf("In Go Routine, get message from channel 2: %v\n", m)
default: // STEP 5:檢查時,如果沒有 channel 丟資料進 channel 則走 default
fmt.Println("In Go Routine to DEFAULT")
}
}
}()
ch2 <- 666 // STEP 8:在 sleep 前將訊息丟入 channel2
fmt.Println("Start Sleep")
// STEP 9:雖然這裡 sleep,但 go routine 中的 for 迴圈仍然不斷在檢查有無收到訊息
time.Sleep(4 * time.Second)
fmt.Println("After Sleep: send value to channel")
// STEP 10:四秒後把 "stop" 傳進 channel 1,for 迴圈收到訊息後 break
ch1 <- "stop"
fmt.Println("------ End ------")
}
範例程式碼
範例
// A Tour of Go: https://tour.golang.org/concurrency/2
func sum(s []int, c chan int) {
sum := 0
for _, v := range s {
sum += v
}
// STEP 3:把加總後的值丟回 channel
c <- sum // send sum to c
}
func main() {
s := []int{7, 2, 8, -9, 4, 0}
// STEP 1:建立一個 channel,該 channel 會傳出 int
c := make(chan int)
// STEP 2:使用 goroutine,並把 channel 當成參數傳入
go sum(s[:len(s)/2], c)
go sum(s[len(s)/2:], c)
// STEP 4:從 channel 取得計算好的結果
x, y := <-c, <-c
// ⚠️ 寫在這裡的內容 會在 channel 傳回結果後才會被執行...
fmt.Println(x, y, x+y)
}
範例
package main
import (
"fmt"
"net/http"
)
func main() {
links := []string{
"http://google.com",
"http://facebook.com",
"http://stackoverflow.com",
"http://golang.com",
"http://amazon.com",
}
for _, link := range links {
checkLink(link) // 這裡會被阻塞
}
}
func checkLink(link string) {
_, err := http.Get(link)
if err != nil {
fmt.Println(link, "might be down!")
return
}
fmt.Println(link, "is up!")
}
參考
- 👍 Anatomy of Channels in Go - Concurrency in Go @ rungo
- 👍 Achieving concurrency in Go @ rungo
- 👍 Anatomy of goroutines in Go -Concurrency in Go @ rungo
- Parallel vs concurrent in Node.js @ By Panu Pitkamaki
- Go: The Complete Developer's Guide (Golang) @ Udemy by Stephen Grider
- Go 語言使用 Select 四大用法 @ AppleBoy
- 在 Go 語言內管理 Concurrency 的三種方式 @ appleboy