跳至主要内容

[Golang] Channels

此篇為各筆記之整理,非原創內容,資料來源可見文後參考資料。

TL;DR

Unbuffered, openUnbuffered, closedBuffered, openBuffered, closenil
Read會等到 channel 中有資料被寫入回傳 zero value,可以使用 v, ok 來確保 channel 還沒被關閉會持續讀取,除非 channel 中沒有任何資料才會停住會回傳 channel 中還沒被讀取的資料。如果 channel 中已經沒有資料,則回傳 zero value,一樣可以用 v, ok 來檢查看看 channel 是否已被關閉一直停住
Write會等到 channel 中的資料被讀取PANIC會持續寫入,除非 channel 的 buffer 滿了才會停住PANIC一直停住
Close可以PANIC可以,channel 中還沒被讀取的資料會被保存下來PANICPANIC

上表資料來源:12. Concurrency in Go | Learning Go, 2nd Edition

  • 使用 channel 的重要原則是:把資料寫進 channel 的那個 goroutine,要負責在沒有資料要繼續寫入的時候,把 channel 關掉;比較特殊的情況時,如果有多個 goroutine 會同時把資料寫進 channel 的話,可能需要搭配 sync.WaitGroup 使用
  • unbuffered channel 指的是 buffer size 為 0 的 channel
  • 對於 unbuffered channel 來說,不論是從 channel 讀資料(需等到被寫入),或把資料寫入 channel 中時(需等到被讀出),都會阻塞該 goroutine
  • 對於 buffered channel 來說:
    • 從 channel 讀值時若是 empty buffer 時才會阻塞,否則都是 non-blocking
    • 把資料寫入 channel 中時,寫入 channel 中的 value 數目(n + 1)需要超過 buffer size(n),也就是溢出(overflow)時才會使得該 goroutine 被阻塞;而且一旦 buffer channel 中的值開始被讀取,就會被全部讀完
// go routine
go f(x, y, z)

// channels
// 和 maps, slices, channels 一樣需要在使用前被建立,這裡表示定義的 chan 會回傳 int
ch := make(chan int)

ch <- v // Send v to channel ch.
v, ok := <-ch // Receive from ch, and
// assign value to v.

PJCHENder Gist

建立 Channel

建立 channel 的方式:

var zeroC chan int   // channel 的 zero value 是 nil
unbufferedC := make(chan int) // unbuffered channel 的 buffered size 是 0
bufferedC := make(chan int, 3) // capacity 為 3 的 buffered channel

如果是要把 channel 當成 function parameter 傳遞的話,可以定義該 channel 應該要是 sender、receiver 或都可以:

  • chan<-:send only channel 的意思是,只能把東西丟(send)進去 channel
  • <-chan:receive only channel 的意思是,只能把東西從 channel 拿(receive)出來
// send only channel 的意思是,只能把東西丟(send)進去 channel
func send(c chan<- int) {
c <- 1
}

// receive only channel 的意思是,只能把東西從 channel 拿(receive)出來
func receive(c <-chan int) {
fmt.Println(<-c)
}
  • channel 的預設值(default value)是 nil

    • 如果試圖讀取 nil channel 的話會 deadlock

    • 如果試圖關閉(close)nil channel 的話會 panic

    • 重複關閉一個已經關閉過的 channel 也會 panic

為了避免無預期 deadlock 或 panic,最好可以有一個 channel owner 的 goroutine,並遵循下面的原則:

  • 在這個 goroutine 中建立 channel
  • 這個建立 channel 的 goroutine 也要負責把資料寫進 channel
  • 這個建立 channel 的 goroutine 也要負責把 channel 給關閉

這個負責把資料寫進 channel、並關掉 channel 的 goroutine 就是這個 channel 的 owner;其他的 goroutine 則都只會從這個 channel 讀資料出來而已。舉例來說:

package main

import "fmt"

func main() {
owner := func() <-chan int {
c := make(chan int)

// channel owner goroutine
go func() {
defer close(c)
for i := 0; i < 5; i++ {
fmt.Println("Send:", i)
c <- i
}
}()

return c
}

consumer := func(ch <-chan int) {
// read values from channel
for v := range ch {
fmt.Printf("Received: %d\n", v)
}
fmt.Println("Done receiving!")
}

ch := owner()
consumer(ch)
}

//Send: 0
//Send: 1
//Received: 0
//Received: 1
//Send: 2
//Send: 3
//Received: 2
//Received: 3
//Send: 4
//Received: 4
//Done receiving!

如此,可以確保在寫入資料到 channel 或關閉 channel 時,這個 channel 不會是 nil 的,以此避免 deadlock 或 panic 出現。

Unbuffered Channels

func main() {
// channel 的 zero value 是 nil
var zeroC chan int
fmt.Println(zeroC)

// 一般建立 channel 的方式
c := make(chan int) // unbuffered channel
fmt.Printf("type of c is %T\n", c) // type of c is chan int
fmt.Printf("value of c is %v\n", c) // value of c is 0xc000062060
}
  • 所有的 unbuffered channel 操作預設都是 blocking 的(所以它算是 synchronous 的操作)
    • 當有資料要寫入 channel 時,goroutine 會阻塞住,直到有其他的 goroutine 從該 channel 把值讀出來
    • 當有資料要讀取 channel 中的值時,goroutine 也會阻塞,直到其他 goroutine 把值寫入 channel 中
  • 也就是說,當我們是的把資料寫入 channel 或從 channel 中取出資料時,該 goroutine 都會阻塞住,並且將控制權交給其他可以運行的 goroutines
// 程式來源:https://medium.com/rungo/anatomy-of-channels-in-go-concurrency-in-go-1ec336086adb
func greet(c chan string) {
fmt.Println("Hello " + <-c + "!")
}

func main() {
fmt.Println("main() started")
c := make(chan string)

go greet(c)

// block here (把控制權交給其他 goroutine,這裡也就是 greet)
c <- "John"
fmt.Println("main() stopped")
}

⚠️ 雖然讀取值的時候會 blocking 等到有值出來,但並不表示來得及被 print 出來,以下面的程式為例:

func greet(c chan string) {
fmt.Println(<-c)
fmt.Println(<-c)
}

func main() {
fmt.Println("main() started")

c := make(chan string)

go greet(c)
c <- "John"
c <- "Mike"

fmt.Println("main() stopped")
}

// main() started
// John
// main() stopped

輸出結果只會看到 John,這是因為雖然第二個 <-c 一樣會 blocking 並等待值送入 channel,但是當 Mike 的值送入 channel 中,而 greet goroutine 收到值要 print 出來時,main goroutine 已經執行結束了,因此最終我們看不到 Mike(若想看到 Mike 可以在 main goroutine 使用 time.Sleep(time.Millisecond)

Close Channel(關閉頻道)

  • 當 Channel 被關閉後,如果對這個 channel 在送值進去,或者重複關閉這個 channel,都會導致 panic;但如果是讀取關閉的 channel 並不會。
  • 如果是 buffered channel 在關閉時,channel 內還有資料未被讀取出來,則這些資料會被依照順序 return 出來。
  • 當 channel(不論 buffered 或 unbuffered)關閉時,如果 channel 沒有未被讀取的資料,則會 return 這個 channel 型別的 zero value。
  • 關閉 channel 是將資料寫進 channel 的那個 goroutine 的責任,不過,如果不是在 for-range loop 這種需要等待 channel close 後才能往下執行的程式中,因為 channel 也只是個變數,所以一般來說,只要這個 channel 不再被取用,go 就會對其做 GC。

判斷 channel 是否已經關閉

c := make(chan string)
close(c) // 關閉 channel
val, ok := c // ok 如果是 false 表示 channel 已經被關閉
  • 當 channel 已經被關閉時,ok 會是 falsevalue 則會是 zero value
  • ⚠️ 只有 sender 可以使用 close,receiver 使用的話會發生 panic。

Deadlock

由於 channel 的資料在讀/寫時,goroutine 會阻塞,並且將控制權交給其他可以運行的 goroutines,因此若沒有其他可以運行的 goroutines 時,就會發生 deadlock 的情況,整個程式則會 crash。

也就是說,如果你試著從 channel 中讀資料,但 channel 中並沒有可以被讀取的值時,它會使得當前的 goroutine 阻塞,並期待其他 goroutine 會把值塞入這個 channel,此時「讀取資料」的操作會阻塞。相似地,如果你想要傳送資料到某一個 channel 中,它同樣會阻塞當前的 goroutine,並期待其他的 goroutine 有人去讀取這個值,這時候「寫入資料(send operation)」的操作會被阻塞。

  • 從 channel 中讀不到資料 -> 讀取資料的操作會阻塞 -> deadlock
  • 寫入資料到 channel -> 沒人讀取此 channel 的值 -> 寫入資料的操作會阻塞 -> deadlock
// 只是寫入資料但沒有 channel 讀取 -> deadlock
func main() {
fmt.Println("main() started")

// 只是寫入資料但沒有 channel 讀取
c := make(chan string)
c <- "John"

fmt.Println("main() stopped")
}

相似地:

// 有 goroutine 要讀取 channel 但沒 goroutine 寫入資料到 channel -> deadlock
func greet(c chan string) {
// 要讀取 channel 但沒人寫入資料
fmt.Println("Hello " + <-c + "!")
}

func main() {
fmt.Println("main() started")

c := make(chan string)
// c <- "John"
greet(c)

fmt.Println("main() stopped")
}

Buffered channel

Buffered Channel 在 sender 和 receiver 中:

  • 會指定 buffer 的大小(capacity)
  • 它是屬於 asynchronous channel,也就是說 sender 和 receiver 不需要同時在 channel 上,只要 buffer 沒有滿,sender 就可以繼續寫入資料,程式就不會被阻塞;只要 buffer 不是空的,receiver 都可以讀取資料,程式就不會被阻塞。
  • in-memory FIFO queue

buffered channel 寫值時,需要在 overflow 時才會 block goroutine

  • unbuffered channel 指的是 buffered size 為 0 的 channel。
  • unbuffered channel 不論是「從 channel 讀值」(需等到值被其他 goroutine 寫入),或「把值寫入 channel」(需等到值被其他 goroutine 讀出)都會阻塞當下的 goroutine。
  • 當 buffer size 不是 0 的話,就屬於 buffered channel
    • 「從 channel 讀值」時,只有在 buffered 是空的時才會 blocking
    • 「把值寫入 channel」時,該 goroutine 並不會被阻塞,除非該 buffer 已經填滿(full)且溢出(overflow)。當 buffer 已經填滿(full)時,再把新的一筆資料傳入 channel 時會造成溢出(overflow),此時 goroutine 才會被阻塞。
    • 讀值的動作一旦開始,就會一直到 buffer 變成 empty 為止才會結束。也就是說,讀取 channel 的那個 goroutine 需到等到 buffer 完全清空後才會阻塞。

舉例來說,在建立一個 buffered channel 後:

// 這個 chan 只能接收兩個長度的 buffer
// channel := make(chan [type], [size])
ch := make(chan int, 2)
  • 寫值:直到該 channel 寫入到 n+1 個值以前,它都不會阻塞當前的 goroutine。
  • 讀值:從該 channel 讀值時,若 buffer 是 empty 才會阻塞當前的 goroutine

以實際的例子來說:

  • 現在 buffered channel 的 size 是 3
  • 在 buffered channel 中寫入 3 個值
  • 由於寫入 channel 的值並沒有超出 buffered channel 的 size,因此 main goroutine 並不會被阻塞,使得 print goroutine 不會有機會取得控制權而被執行
// 程式來源:https://medium.com/rungo/anatomy-of-channels-in-go-concurrency-in-go-1ec336086adb
// 透過 squares goroutine 讀值
func print(c chan int) {
for i := 0; i <= 3; i++ {
fmt.Println(<-c)
}
}

// 在 main goroutine 寫值
func main() {
fmt.Println("main() started")

// 建立 buffered size 為 3 的 channel
c := make(chan int, 3)

go print(c)

// 寫入 3 個值
c <- 1
c <- 2
c <- 3

fmt.Println("main() close")
}

// Output:
// main() started
// main() close

但如果在 main goroutine 中多一個值寫入 channel 中(c <- 4),此時 main goroutine 就會在這裡被 block 住:

  • 在 main goroutine 中,使用了 c <- 4 後,因為超過 buffered channel 的 size,也就是溢出(overflow),因此在這裡會阻塞
  • main goroutine 阻塞後,print goroutine 便有機會執行,一旦 print goroutine 開始讀取 channel 的值後,它就會把該 buffer 中的所有值都讀全部讀完
// 由於 main goroutine 被 block,print goroutine 有機會被執行
// 一旦 receiver 開始讀值,就會把所有 buffer 中的值全部讀完直到清空
func print(c chan int) {
for i := 0; i <= 3; i++ {
fmt.Println(<-c)
}
}

// 在 main goroutine 寫值
func main() {
fmt.Println("main() started")

c := make(chan int, 3)
go print(c)

c <- 1
c <- 2
c <- 3
c <- 4 // 因為超過 buffered size,這裡會 block

fmt.Println("main() close")
}

// main() started
// 1
// 2
// 3
// 4
// main() close

另一個範例:

/* Buffered Channels 即使寫值後,不用等待值被讀取,主程式就會結束 */
func main() {
c := make(chan bool, 1)

go func() {
fmt.Println("GO GO GO") // 有可能因為主程式已經執行完而看不到

// 使用 Buffered Channel 的話,不會等到 channel 中的值讀完才結束主程式
fmt.Printf("Receive value from channel %v\n", <-c)
}()

fmt.Println("Before Receive")

// STEP 1:寫入 channel
c <- true
// c <- false // block here

fmt.Println("After Receive")
}

// Before Receive
// [GO GO GO]
// [Receive value from channel true]
// After Receive

buffered channel 也有 length 和 capacity

  • 和 slice 很類似,buffered channel 也有 length 和 capacity
    • length 指的是在 channel buffer 中還有多少數量的值還沒被讀取(queued),可以使用 len(channel) 查看
    • capacity 則是指 buffer 實際的 size,可以使用 cap(channel) 查看
// 這段程式之所以不會產生 deadlock 是因為 channel 還沒有出現 overflow,所以不會 block
func main() {
c := make(chan int, 3)
c <- 1
c <- 2
fmt.Printf("Length of channel c is %v and capacity of channel c is %v \n", len(c), cap(c))
}

// Length of channel c is 2 and capacity of channel c is 3

使用 for range 可以讀取 close 後 buffered channel 中的值

func main() {
c := make(chan int, 3)
c <- 1
c <- 2
c <- 3
close(c)

for val := range c {
fmt.Println(val)
}
}

使用 Buffered Channel 的時機

  • 當你知道你同時有多少個 goroutine 在執行
  • 或是,但你希望限制 goroutine 的數量
  • 又或著,你希望限制能有多少數量的 tasks 被 queue 起來

Unidirectional channels

除了可以建立可讀(read)可寫(write)的 channel 之外,還可以建立「只可讀(receive-only)」或「只可寫(send-only)」的 channel

// 建立只可接收或只可發送的 unidirectional channels
func main() {
roc := make(<-chan int) // receive only channel
soc := make(chan<- int) // send only channel

fmt.Printf("receive only channel type is '%T' \n", roc)
fmt.Printf("send only channel type is '%T' \n", soc)
}
// receive only channel type is '<-chan int'
// send only channel type is 'chan<- int'
  • 透過 unidirectional channels 可以增加型別的安全性(type-safety)
  • 但如果我們希望在某一個 goroutine 中只能從 channel 讀取資料,但在 main goroutine 中可以對這個 channel 讀和寫資料時,可以透過 go 提供的語法來將 bi-directional channel 轉換成 unidirectional channel
// <-chan 表示 receive only channel type
func greet(roc <-chan string) {
fmt.Println("Hello " + <-roc + "!")

// receive only channel 不能傳送資料
// invalid operation: cannot send to receive-only type <-chan string
// roc <- "foo"
}

func main() {
fmt.Println("main() started")
c := make(chan string)

go greet(c)

c <- "John"
fmt.Println("main() stopped")
}

Multiple Channels

當前的 goroutine 阻塞時,就會切換到其他 goroutine

// 程式碼修改自:https://medium.com/rungo/anatomy-of-channels-in-go-concurrency-in-go-1ec336086adb
func square(c chan int) {
fmt.Println("[square] wait for testNum")
num := <-c
fmt.Println("[square] sent square to squareChan (blocking)")
c <- num * num // 3. blocking, switch to other go routine
}

func cube(c chan int) {
fmt.Println("[cube] wait for testNum (blocking)")
num := <-c
fmt.Println("[cube] sent square to cubeChan")
c <- num * num * num // blocking
}

func main() {
fmt.Println("[main] main() started")

squareChan := make(chan int)
cubeChan := make(chan int)

go square(squareChan)
go cube(cubeChan)

testNum := 3

fmt.Println("[main] sent testNum to squareChan (blocking)")
squareChan <- testNum // 1. block, switch to other goroutine

fmt.Println("[main] resuming")

fmt.Println("[main] sent testNum to cubeChan")
cubeChan <- testNum

fmt.Println("[main] resuming")

fmt.Println("[main] reading from channels (blocking)")
squareVal, cubVal := <-squareChan, <-cubeChan

fmt.Println(squareVal, cubVal)
fmt.Println("[main] main() stopped")
}

First-class channels

在 golang 中 channel 是 first-class values,和其他型別一樣,可以被當成是 struct 中的值、function 的參數、回傳值等等。

以下面的例子來說,make(chan chan string) 表示這個 channel 可以傳送和接收另一個(可以傳送和接收 string 的)channel

func greeter(cc chan chan string) {
c := make(chan string)
cc <- c
}

func greet(c chan string) {
fmt.Println("Hello " + <-c + "!")
}

func main() {
fmt.Println("main() started")

// a channel of data type channel of data type string
// 建立一個 channel 可以讀寫另一個(可以讀寫 string)的 channel
cc := make(chan chan string)

go greeter(cc)

c := <-cc

go greet(c)
c <- "John"

fmt.Println("main() stopped")
}
// main() started
// Hello John!
// main() stopped

讀取 Channels 中的資料

for loop 搭配 close:需要手動 break 迴圈

使用 for{} 來顯示內容,但需要手動 break loop:

for {
val, ok := <-channel
if ok == false {
// break the loop
break
} else {
// do something with val
fmt.Println(val)
}
}

實際範例:

func squares(c chan int) {
// 把 0 ~ 9 寫入 channel 後便把 channel 關閉
for i := 0; i <= 9; i++ {
c <- i
}

close(c)
}

func main() {
fmt.Println("main() started")
c := make(chan int)

// 發動 squares goroutine
go squares(c)

// 監聽 channel 的值:週期性的 block/unblock main goroutine 直到 squares goroutine close
for {
val, ok := <-c
if ok == false {
fmt.Println(val, ok, "<-- loop broke case channel closed")
break // exit loop
} else {
fmt.Println(val, ok)
}
}

fmt.Println("main() close")
}

for range 搭配 close:會自動結束迴圈

  • 在上一段程式碼中,如果單純透過 for 來讀取 channel 中的資料,需要自行判斷 channel 是否已經 close,如果已經 close,則需要自行使用 break 把 for loop 終止。在 Go 中則提供了 for range loop,只要該 channel 被關閉後,loop 則會自動終止。

  • Blocking:如果 channel 中沒有資料,則該 goroutine 會等在那裡,直到「有新的資料進來」或「channel 被關閉」

  • 跳出迴圈的條件

    • channel 被關閉
    • 遇到 breakreturn
  • 需要特別留意,如果是在 main goroutine 中使用 for val := range channel {} 的寫法時,最後 channel 沒有被 close 的話程式會 deadlock。但如果是在其他的 goroutine 中使用,即使沒有 close 也不會 deadlock,但為了不必要的 bug 產生,一般都還是將其關閉比較好。

// 使用 range 時,只會拿到 value,不會拿到 ok(第二個 return value)
for val := range channel {
// 當 channel 關閉時會自動 break loop
}
注意

for val := range c 後面的 c 直接帶入 channel,不用再使用 <-c(箭頭)。

透過 for i := range c 可以重複取出 channel 的值,直到該 channel 被關閉。和上面一段的單純使用 for loop 的寫法相比,精簡許多:

func squares(c chan int) {
// 把 0 ~ 9 寫入 channel 後便把 channel 關閉
for i := 0; i <= 9; i++ {
c <- i
}

close(c)
}

func main() {
fmt.Println("main() started")
c := make(chan int)

// 發動 squares goroutine
go squares(c)

// 使用 for range 的寫法,一但 channel close,loop 會自動 break
for val := range c {
fmt.Println(val)
}

fmt.Println("main() close")
}
注意

需要特別留意,使用 for val := range channel {} 的寫法時,如果最後 channel 沒有被 close 的話程式會 deadlock。

使用 for val := range c 的其他範例:

//STEP 2:建立 fibonacci 函式
func fibonacci(n int, c chan int) {

x, y := 0, 1
for i := 0; i < n; i++ {
c <- x
x, y = y, x+y
}

// ⚠️ 如果沒有 close 的話,在 STEP 4 的地方並不知道
close(c)
}

func main() {
// STEP 1:建立一個 length 為 10 的 buffered channel
c := make(chan int, 10)

//STEP 3:使用 go routine
go fibonacci(cap(c), c)

// STEP 4:只要 channel 還有資料就輸出
for i := range c {
fmt.Println(i)
}

// ⚠️ 這裡的內容會等到所有 c channel 資料都算出後才會執行到...
}

select case

switch case 不同,select case 中 case 接收的是 channel(而不是 boolean)。當程式執行到 select 的位置時,會阻塞(blocking)在那,直到有任何一個 case 收到 channel 傳來的資料後(除非有用 default)才會 unblock,因此通常會有另一個 channel 用來實作 Timeout 機制

select {
case res := <-ch1: // 如果需要取用 channel 中的值
fmt.Println(res)
case <-ch1: // 如果不需要取用 channel 中的值
fmt.Println("receive value")
}
注意

for value := range channel 不同,select case 中,case 後的 channel 要記得加上 <- 把資料取出來。

select case 運作的流程是:

  • 如果所有的 case 都沒有接收到 channel 傳來的資料,那麼 select 會一直阻塞(block)在那,直到有任何的 case 收到資料後才會繼續執行
  • 如果同一時間有多個 case 收到 channel 傳來的資料,那將會從所有這些 non-blocking 的 cases 中隨機挑選一個,接著才繼續執行

在下面的例子中,由於使用的是 unbuffered channel,因此 channel 只要有 send 或 receive 的動作時都會 block:

// 程式來源:https://medium.com/rungo/anatomy-of-channels-in-go-concurrency-in-go-1ec336086adb
var start time.Time

func init() {
start = time.Now()
}

func service1(c chan string) {
time.Sleep(1 * time.Second)
c <- "Hello from service 1"
}

func service2(c chan string) {
time.Sleep(3 * time.Second)
c <- "Hello from service 2"
}

func main() {
fmt.Println("main() started", time.Since(start))

chan1 := make(chan string)
chan2 := make(chan string)

go service1(chan1)
go service2(chan2)

fmt.Println("[main] select(blocking)")
select {
case res := <-chan1:
fmt.Println("[main] get response from service 1", res, time.Since(start))
case res := <-chan2:
fmt.Println("[main] get response from service 2", res, time.Since(start))
}

fmt.Println("main() stopped", time.Since(start))
}

// main() started 585ns
// [main] select(blocking)
// [main] get response from service 1 Hello from service 1 1.001434164s
// main() stopped 1.001481394s

隨機選取

上面的程式中使用的是 unbuffered channel,所以對該 channel 任何的 send 或 receive 都會出現阻塞。

我們可以使用 buffered channel 來模擬實際上 web service 處理回應的情況:

  • 由於 buffered channel 的 capacity 是 2,但傳入 channel 的 size 並沒有超過 2(沒有 overflow),因此程式會繼續執行而不會發生阻塞(non-blocking)
  • 當 buffered channel 中有資料時,直到整個 buffer 都被清空為止前,從 buffered channel 讀取資料的動作都是 non-blocking 的,而且在下面的程式碼中又只讀取了一個值出來,因此整個 case 的操作都會是 non-blocking 的
  • 由於 select 中的所有 case 都是 non-blocking 的,因此 select 會從所有的 case 中隨機挑一個加以執行
// 程式碼來源:https://medium.com/rungo/anatomy-of-channels-in-go-concurrency-in-go-1ec336086adb
func main() {
fmt.Println("main() started", time.Since(start))

chan1 := make(chan string, 2)
chan2 := make(chan string, 2)

// buffered channel:因為 channel 中的資料沒有 overflow (> 2),所以不會阻塞
chan1 <- "Value 1"
chan1 <- "Value 2"

chan2 <- "Value 1"
chan2 <- "Value 2"

// buffered channel 中有資料時,讀取資料會是 non-blocking 的
// 由於 select 中的 case 都是 non-blocking 的,因此會隨機挑選一個執行
select {
case res := <-chan1:
fmt.Println("[main] get response from service 1", res, time.Since(start))
case res := <-chan2:
fmt.Println("[main] get response from service 2", res, time.Since(start))
}

fmt.Println("main() stopped", time.Since(start))
}

當 select 中的 case 同時收到 channel 的資料時,會隨機選取一個 channel:

/* Go 語言使用 Select 四大用法:https://blog.wu-boy.com/2019/11/four-tips-with-select-in-golang/ */
func main() {
ch := make(chan int, 1)

ch <- 1
select {
case <-ch:
fmt.Println("random 01")
case <-ch:
fmt.Println("random 02")
case <-ch:
fmt.Println("random 03")
}
}

default:建立一個 unblocking 的 select

default case 本身是非阻塞的(non-blocking),同時它也會使得 select statement 總是變成 non-blocking,也就是說,不論是 buffered 或 unbuffered channel 都會變成非阻塞的

當有任何資料可以從 channel 中取出時,select 就會執行該 case,但若沒有,就會直接進到 default case。簡單的說,當 channel 本身就有值時,就不會走到 default,但如果 channel 執行的當下沒有值,還需要等其他 goroutine 設值到 channel 的話,就會直接走到 default

提示

簡單的說,當 channel 本身就有值時,就不會走到 default,但如果 channel 執行的當下沒有值,還需要等其他 goroutine 設值到 channel 的話,就會直接走到 default

如果沒有資料送進 channel,也就是註解掉 ch <- 1 的話,程式會出現 panic(fatal error: all goroutines are asleep - deadlock!),這是因為它認為應該要從 channel 取到值,但卻沒有得到任何東西,雖然加上 default 後可以解決,但會使得 select case 不會被阻塞住,導致還沒收到 channel 的訊息前,main goroutine 就執行完畢了

/* Go 語言使用 Select 四大用法:https://blog.wu-boy.com/2019/11/four-tips-with-select-in-golang/ */

func main() {
ch := make(chan int, 1)

//ch <- 1
select {
case <-ch:
fmt.Println("random 01")
case <-ch:
fmt.Println("random 02")
default: // 當沒有 value 從 channel 送出的話,會走 default
fmt.Println("exit: no value from channel")
}
}

Timeout 超時機制:使用 time.After

單純使用 default case 並不是非常有用,有時我們希望的是有 timeout 的機制,也就是超過一定時間後,沒有收到任何回應時,才做預設的行為,這時候我們可以使用 time.After 來完成:

time.After()time.Tick() 都是會回傳 time.Time 型別的 receive channel(<- channel

/* Go 語言使用 Select 四大用法:https://blog.wu-boy.com/2019/11/four-tips-with-select-in-golang/ */

func main() {
ch := make(chan int)

select {
case <-ch:
fmt.Println("receive value from channel")

// 超過一秒沒有收到主要 channel 的 value,就會收到 time.After 送來的訊息
case <-time.After(1 * time.Second):
fmt.Println("timeout after 1 second")
}
}

以原本的例子來說:

  • service1 會需要花 3 秒
  • service2 會需要花 5 秒
  • time.After() 設定 2 秒,因此在 service1 和 service2 還沒完成前就會觸發 timeout
var start time.Time

func init() {
start = time.Now()
}

func service1(c chan string) {
fmt.Println("service1() started", time.Since(start))
time.Sleep(3 * time.Second)
c <- "Hello from service1"
}

func service2(c chan string) {
fmt.Println("service2() started", time.Since(start))
time.Sleep(5 * time.Second)
c <- "Hello from service2"
}

func main() {
fmt.Println("main() started", time.Since(start))

chan1 := make(chan string, 1)
chan2 := make(chan string, 1)

go service1(chan1)
go service2(chan2)

select {
case res := <-chan1:
fmt.Println("get response from service 1", res, time.Since(start))
case res := <-chan2:
fmt.Println("get response from service 2", res, time.Since(start))
case <-time.After(2 * time.Second):
fmt.Println("No response received", time.Since(start))
}
}

Empty Select

如同 for{} 迴圈可以不帶任何條件一樣,select {} 也可以不搭配 case 使用(稱作,empty select)。

從前面的例子中可以看到,因為 select statement 會一直阻塞(blocking),直到其中一個 case unblocks 時,才會繼續往後執行,但因為 empty select 中並沒有任何的 case statement,因此 main goroutine 將會永遠阻塞在那,如果沒有其他 goroutine 可以持續運行的話,最終導致 deadlock。

func service() {
fmt.Println("Hello from service ")
}

func main() {
fmt.Println("main() started")

go service()

// 這個 select 會永遠 block 在這
select {}

fmt.Println("main() stopped")
}

// main() started
// Hello from service
// fatal error: all goroutines are asleep - deadlock!

如果在 main goroutine 使用 empty select 後,main goroutine 將會完全阻塞,需要靠其他的 goroutine 持續運作才不至於進入 deadlock:

// main goroutine 持續 block 的情況下
// 需要靠其他 goroutine 持續運行才不會進入 deadlock
func service() {

for {
fmt.Println("Hello from service ")
time.Sleep(time.Millisecond * 400)
}
}

func main() {
fmt.Println("main() started")

go service()

// 這個 select 會永遠 block 在這
select {}

fmt.Println("main() stopped")
}

// main() started
// Hello from service
// Hello from service
// ...

另外透過 empty select 導致 main goroutine 阻塞的這種方式,可以在 server 啟動兩個不同的 service:

var start time.Time

func init() {
start = time.Now()
}

func service1() {
for {
fmt.Println("Hello from service1 ", time.Since((start)))
time.Sleep(time.Millisecond * 500)
}
}

func service2() {
for {
fmt.Println("Hello from service2 ", time.Since((start)))
time.Sleep(time.Millisecond * 700)
}
}

func main() {
fmt.Println("main() started")

go service1()
go service2()

// 這個 select 會永遠 block 在這,service1 和 service2 輪流輸出訊息
select {}

fmt.Println("main() stopped") // 這行不會被執行到
}

判斷是否超過 channel 的 buffer size

/* Go 語言使用 Select 四大用法:https://blog.wu-boy.com/2019/11/four-tips-with-select-in-golang/ */

func main() {
// STEP 1:建立一個只能裝 buffer size 為 1 資料
ch := make(chan int, 1)
ch <- 1

select {
case ch <- 2:
fmt.Println("channel value is", <-ch)
fmt.Println("channel value is", <-ch)
default:
// ch 中的內容超過 1 時,但若把 channel buffer size 的容量改成 2,就不會走到 default
fmt.Println("channel blocking")
}
}

for-select loop:讀取多個 channel 的 value

// A Tour of Go: https://tour.golang.org/concurrency/6
func main() {
tick := time.Tick(100 * time.Millisecond)
boom := time.After(500 * time.Millisecond)

for {
select {
case <-tick:
fmt.Println("tick.")
case <-boom:
fmt.Println("BOOM!")
return // 如果沒有 return 的話程式將不會結束,一直卡在 for loop 中
default:
fmt.Println(" .")
time.Sleep(50 * time.Millisecond)
}
}
}
警告

for-select loop 裡如果有 default case,一般來說可能是程式寫錯,因為一旦其他 case 沒有辦法 READ 或 WRITE,則這個 default 就會一直被重複執行到,除非有在某個 case 中呼叫 return 來跳出這個 function,或用 break <LOOP_NAME> 來跳出這個 for loop,否則將導致無窮迴圈,或造成 CPU overhead。

使用範例:

/* Go 語言使用 Select 四大用法:https://blog.wu-boy.com/2019/11/four-tips-with-select-in-golang/ */

func main() {
ch1 := make(chan string)
ch2 := make(chan int, 1)

defer func() {
fmt.Println("------ In defer ------")
close(ch1)
close(ch2)
}()

i := 0

//STEP 1:建立一個 Go Routine
go func() {
fmt.Println("In Go Routine")

// STEP 2:透過 for loop 來不停來監控不同 channels 傳回來的資料
LOOP:
for {
// STEP 3:透過 sleep 讓它每 500 毫秒檢查有無 channel 傳訊息進來
time.Sleep(500 * time.Millisecond)
i++
fmt.Printf("In Go Routine, i: %v, time: %v \n", i, time.Now().Unix())

// STEP 4:透過 select 判斷不同 channel 傳入的資料
select {
case m := <-ch1: // STEP 6:當收到 channel 1 傳入的資料時,就 break
fmt.Printf("In Go Routine, get message from channel 1: %v \n", m)

break LOOP
case m := <-ch2: // STEP 7:當收到 channel 2 傳入的資料時,單純輸出訊息
fmt.Printf("In Go Routine, get message from channel 2: %v\n", m)

default: // STEP 5:檢查時,如果沒有 channel 丟資料進 channel 則走 default
fmt.Println("In Go Routine to DEFAULT")
}
}
}()

ch2 <- 666 // STEP 8:在 sleep 前將訊息丟入 channel2

fmt.Println("Start Sleep")

// STEP 9:雖然這裡 sleep,但 go routine 中的 for 迴圈仍然不斷在檢查有無收到訊息
time.Sleep(4 * time.Second)

fmt.Println("After Sleep: send value to channel")

// STEP 10:四秒後把 "stop" 傳進 channel 1,for 迴圈收到訊息後 break
ch1 <- "stop"

fmt.Println("------ End ------")
}

範例程式碼

範例

// A Tour of Go: https://tour.golang.org/concurrency/2

func sum(s []int, c chan int) {
sum := 0
for _, v := range s {
sum += v
}

// STEP 3:把加總後的值丟回 channel
c <- sum // send sum to c
}

func main() {
s := []int{7, 2, 8, -9, 4, 0}

// STEP 1:建立一個 channel,該 channel 會傳出 int
c := make(chan int)

// STEP 2:使用 goroutine,並把 channel 當成參數傳入
go sum(s[:len(s)/2], c)
go sum(s[len(s)/2:], c)

// STEP 4:從 channel 取得計算好的結果
x, y := <-c, <-c

// ⚠️ 寫在這裡的內容會在 channel 傳回結果後才會被執行...

fmt.Println(x, y, x+y)
}

範例

package main

import (
"fmt"
"net/http"
)

func main() {
links := []string{
"http://google.com",
"http://facebook.com",
"http://stackoverflow.com",
"http://golang.com",
"http://amazon.com",
}

for _, link := range links {
checkLink(link) // 這裡會被阻塞
}
}

func checkLink(link string) {
_, err := http.Get(link)
if err != nil {
fmt.Println(link, "might be down!")
return
}
fmt.Println(link, "is up!")
}

參考