Skip to main content

[Golang] goroutines, channels, and concurrency

此篇為各筆記之整理,非原創內容,資料來源可見文後參考資料。

TL;DR#

  • 從一個 goroutine 切換到另一個 goroutine 的時機點是「當正在執行的 goroutine 阻塞時,就會交給其他 goroutine 做事」
  • unbuffered channel 指的是 buffer size 為 0 的 channel
  • 對於 unbuffered channel 來說,不論是從 channel 讀資料(需等到被寫入),或把資料寫入 channel 中時(需等到被讀出),都會阻塞該 goroutine
  • 對於 buffered channel 來說:
    • 從 channel 讀值時若是 empty buffer 時才會阻塞,否則都是 non-blocking
    • 把資料寫入 channel 中時,寫入 channel 中的 value 數目(n + 1)需要超過 buffer size(n),也就是溢出(overflow)時才會使得該 goroutine 被阻塞;而且一旦 buffer channel 中的值開始被讀取,就會被全部讀完
// go routine
go f(x, y, z)
// channels
// 和 maps, slices, channels 一樣需要在使用前被建立,這裡表示定義的 chan 會回傳 int
ch := make(chan int)
ch <- v // Send v to channel ch.
v, ok := <-ch // Receive from ch, and
// assign value to v.

PJCHENder Gist

goroutines vs threads#

  • goroutines 是由 Go runtime 所管理的輕量化的 thread
  • goroutines 會在相同的 address space 中執行,因此要存取共享的記憶體必須要是同步的(synchronized)。
  • 當我們在執行 Go 程式時,Go runtime 會建立許多 threads,當某一個 goroutine 的 thread 被阻塞時,它會切換去其它 thread 執行其他的 goroutine,這個過程很類似 thread scheduling,但它是由 go runtime 來處理,而且速度更快
  • 傳統的 Apache 伺服器來說,當每分鐘需要處理 1000 個請求時,每個請求如果都要 concurrently 的運作,將會需要建立 1000 個 threads 或者分派到不同的 process 去做,如果 OS 的每個 thread 都需要使用 1MB 的 stack size 的話,就會需要 1GB 的記憶體才能撐得住這樣的流量。但相對於 goroutine 來說,因為 stack size 可以動態增長,因此可以擴充到 1000 個 goroutines,每個 goroutine 只需要 2KB(Go 1.4 之後)的 stack size。
  • 在 Go 1.5 之後,Golang 預設會使用的 CPU 的數目(GOMAXPROCS)將會根據電腦實體 CPU 的數目來決定
  • 使用越多的 CPU 來執行不見得會有更好的效能,因為不同 CPU 之間需要更多時間來進行溝通和資料交換,透過 runtime.GOMAXPROCS(n) 可以改變 go runtime 使用的處理器數目
OS threadgoroutine
由 OS kernel 管理,相依於硬體goroutines 是由 go runtime 管理,不依賴於硬體
OS threads 一般有固定 1-2 MB 的 stack sizegoroutines 的 stack size 約 8KB(自從 Go 1.4 開始為 2KB)
在編譯的時候就決定了 stack 的大小,並且不能增長由於是在 run-time 管理 stack size,透過分配和釋放 heap storage 可以增長到 1GB
不同 thread 之間沒有簡易的溝通媒介,並且溝通時易有延遲goroutine 使用 channels 來和其它的 goroutine 溝通,且低延遲
thread 有 identity,透過 TID 可以辨別 process 中的不同 threadgoroutine 沒有 identity
Thread 有需要 setup 和 teardown cost,需要向 OS 請求資源並在完成時還回去goroutine 是在 go 的 runtime 中建立和摧毀,和 OS threads 相比非常容易,因為 go runtime 已經為 goroutines 建立了 thread pools,因此 OS 並不會留意到 coroutines
threads 需要先被 scheduled,在不同 thread 間切換時的消耗很高,因為 scheduler 需要儲存和還原

資料來源:threads vs goroutines @ gist

concurrency vs parallelism#

Achieving concurrency in Go @

  • Concurrency 指的是開啟很多的 threads 在執行程式碼,但它們並不是「同時」執行,而是透過快速切換來執行(只有一個 CPU 在負責)。
  • Parallelism 指的是開啟很多 threads 「同時」執行程式碼,需要倚靠多個 CPU。

"concurrency is dealing with multiple things at once, parallelism is doing multiple things at once"Achieving concurrency in Go

Goroutines#

Anatomy of goroutines in Go -Concurrency in Go @ rungo

  • 每個 Go 程式預設都會建立一個 goroutine,這被稱作是 main goroutine,也就是函式 main 中執行的內容
  • 所有的 goroutines 都是沒有名稱的(anonymous),因為 goroutine 並沒有 identity
  • 在下面這段程式中,當 main goroutine 開始執行時,go 排程器(scheduler)並不會將控制權交給 printHello 這個 goroutine,因此當 goroutine 執行完畢後,程式會立即中止,而排程器並沒有機會把 printHello 這個 goroutine 加入排程中。
func printHello() {
fmt.Println("Hello World")
}
func main() {
fmt.Println("main execution started")
// call function
go printHello()
fmt.Println("main execution stopped")
}

但我們知道,當 goroutine 被阻塞的時候,就會把控制權交給其他的 goroutine,因此這裡可以試著用 time.Sleep() 來把它阻塞:

func printHello() {
fmt.Println("Hello World")
}
func main() {
fmt.Println("main execution started")
// call function
go printHello()
// block here
time.Sleep(10 * time.Millisecond)
fmt.Println("main execution stopped")
}

anonymous goroutine#

func main() {
fmt.Println("main() started")
c := make(chan string)
// anonymous goroutine
go func(c chan string) {
fmt.Println("Hello " + <-c + "!")
}(c)
c <- "John"
fmt.Println("main() ended")
}

Channels#

var zeroC chan int // channel 的 zero value 是 nil
unbufferedC := make(chan int) // unbuffered channel 的 buffered size 是 0
bufferedC := make(chan int, 3) // capacity 為 3 的 buffered channel

Unbuffered Channels#

func main() {
// channel 的 zero value 是 nil
var zeroC chan int
fmt.Println(zeroC)
// 一般建立 channel 的方式
c := make(chan int) // unbuffered channel
fmt.Printf("type of c is %T\n", c) // type of c is chan int
fmt.Printf("value of c is %v\n", c) // value of c is 0xc000062060
}
  • 所有的 unbuffered channel 操作預設都是 blocking 的
    • 當有資料要寫入 channel 時,goroutine 會阻塞住,直到有其他的 goroutine 從該 channel 把值讀出來
    • 當有資料要讀取 channel 中的值時,goroutine 也會阻塞,直到其他 goroutine 把值寫入 channel 中
  • 也就是說,當我們是的把資料寫入 channel 或從 channel 中取出資料時,該 goroutine 都會阻塞住,並且將控制權交給其他可以運行的 goroutines
// 程式來源:https://medium.com/rungo/anatomy-of-channels-in-go-concurrency-in-go-1ec336086adb
func greet(c chan string) {
fmt.Println("Hello " + <-c + "!")
}
func main() {
fmt.Println("main() started")
c := make(chan string)
go greet(c)
// block here (把控制權交給其他 goroutine,這裡也就是 greet)
c <- "John"
fmt.Println("main() stopped")
}

⚠️ 雖然讀取值的時候會 blocking 等到有值出來,但並不表示來得及被 print 出來,以下面的程式為例:

func greet(c chan string) {
fmt.Println(<-c)
fmt.Println(<-c)
}
func main() {
fmt.Println("main() started")
c := make(chan string)
go greet(c)
c <- "John"
c <- "Mike"
fmt.Println("main() stopped")
}
// main() started
// John
// main() stopped

輸出結果只會看到 John,這是因為雖然第二個 <-c 一樣會 blocking 並等待值送入 channel,但是當 Mike 的值送入 channel 中,而 greet goroutine 收到值要 print 出來時,main goroutine 已經執行結束了,因此最終我們看不到 Mike(若想看到 Mike 可以在 main goroutine 使用 time.Sleep(time.Millisecond)

Close Channel(關閉頻道)#

c := make(chan string)
close(c) // 關閉 channel
val, ok := c // ok 如果是 false 表示 channel 已經被關閉
  • 當 channel 已經被關閉時,ok 會是 falsevalue 則會是 zero value
  • ⚠️ 只有 sender 可以使用 close,receiver 使用的話會發生 panic。

Deadlock#

由於 channel 的資料在讀/寫時,goroutine 會阻塞,並且將控制權交給其他可以運行的 goroutines,因此若沒有其他可以運行的 goroutines 時,就會發生 deadlock 的情況,整個程式則會 crash。

也就是說,如果你試著從 channel 中讀資料,但 channel 中並沒有可以被讀取的值時,它會使得當前的 goroutine 阻塞,並期待其他 goroutine 會把值塞入這個 channel,此時「讀取資料」的操作會阻塞。相似地,如果你想要傳送資料到某一個 channel 中,它同樣會阻塞當前的 goroutine,並期待其他的 goroutine 有人去讀取這個值,這時候「寫入資料(send operation)」的操作會被阻塞。

  • 從 channel 中讀不到資料 -> 讀取資料的操作會阻塞 -> deadlock
  • 寫入資料到 channel -> 沒人讀取此 channel 的值 -> 寫入資料的操作會阻塞 -> deadlock
// 只是寫入資料但沒有 channel 讀取 -> deadlock
func main() {
fmt.Println("main() started")
// 只是寫入資料但沒有 channel 讀取
c := make(chan string)
c <- "John"
fmt.Println("main() stopped")
}

相似地:

// 有 goroutine 要讀取 channel 但沒 goroutine 寫入資料到 channel -> deadlock
func greet(c chan string) {
// 要讀取 channel 但沒人寫入資料
fmt.Println("Hello " + <-c + "!")
}
func main() {
fmt.Println("main() started")
c := make(chan string)
// c <- "John"
greet(c)
fmt.Println("main() stopped")
}

Buffered channel#

buffered channel 寫值時,需要在 overflow 時才會 block goroutine#

  • unbuffered channel 指的是 buffered size 為 0 的 channel。
  • unbuffered channel 不論是「從 channel 讀值」(需等到值被其他 goroutine 寫入),或「把值寫入 channel」(需等到值被其他 goroutine 讀出)都會阻塞當下的 goroutine。
  • 當 buffer size 不是 0 的話,就屬於 buffered channel
    • 「從 channel 讀值」時,只有在 buffered 是空的時才會 blocking
    • 「把值寫入 channel」時,該 goroutine 並不會被阻塞,除非該 buffer 已經填滿(full)且溢出(overflow)。當 buffer 已經填滿(full)時,再把新的一筆資料傳入 channel 時會造成溢出(overflow),此時 goroutine 才會被阻塞。
    • 讀值的動作一旦開始,就會一直到 buffer 變成 empty 為止才會結束。也就是說,讀取 channel 的那個 goroutine 需到等到 buffer 完全清空後才會阻塞。

舉例來說,在建立一個 buffered channel 後:

// 這個 chan 只能接收兩個長度的 buffer
// channel := make(chan, [type], [size])
ch := make(chan, int, 2)
  • 寫值:直到該 channel 寫入到 n+1 個值以前,它都不會阻塞當前的 goroutine。
  • 讀值:從該 channel 讀值時,若 buffer 是 empty 才會阻塞當前的 goroutine

以實際的例子來說:

  • 現在 buffered channel 的 size 是 3
  • 在 buffered channel 中寫入 3 個值
  • 由於寫入 channel 的值並沒有超出 buffered channel 的 size,因此 main goroutine 並不會被阻塞,使得 print goroutine 不會有機會取得控制權而被執行
// 程式來源:https://medium.com/rungo/anatomy-of-channels-in-go-concurrency-in-go-1ec336086adb
// 透過 squares goroutine 讀值
func print(c chan int) {
for i := 0; i <= 3; i++ {
fmt.Println(<-c)
}
}
// 在 main goroutine 寫值
func main() {
fmt.Println("main() started")
// 建立 buffered size 為 3 的 channel
c := make(chan int, 3)
go print(c)
// 寫入 3 個值
c <- 1
c <- 2
c <- 3
fmt.Println("main() close")
}
// Output:
// main() started
// main() close

但如果在 main goroutine 中多一個值寫入 channel 中(c <- 4),此時 main goroutine 就會在這裡被 block 住:

  • 在 main goroutine 中,使用了 c <- 4 後,因為超過 buffered channel 的 size,也就是溢出(overflow),因此在這裡會阻塞
  • main goroutine 阻塞後,print goroutine 便有機會執行,一旦 print goroutine 開始讀取 channel 的值後,它就會把該 buffer 中的所有值都讀全部讀完
// 由於 main goroutine 被 block,print goroutine 有機會被執行
// 一旦 receiver 開始讀值,就會把所有 buffer 中的值全部讀完直到清空
func print(c chan int) {
for i := 0; i <= 3; i++ {
fmt.Println(<-c)
}
}
// 在 main goroutine 寫值
func main() {
fmt.Println("main() started")
c := make(chan int, 3)
go print(c)
c <- 1
c <- 2
c <- 3
c <- 4 // 因為超過 buffered size,這裡會 block
fmt.Println("main() close")
}
// main() started
// 1
// 2
// 3
// 4
// main() close

另一個範例:

/* Buffered Channels 即使寫值後,不用等待值被讀取,主程式就會結束 */
func main() {
c := make(chan bool, 1)
go func() {
fmt.Println("GO GO GO") // 有可能因為主程式已經執行完而看不到
// 使用 Buffered Channel 的話,不會等到 channel 中的值讀完才結束主程式
fmt.Printf("Receive value from channel %v\n", <-c)
}()
fmt.Println("Before Receive")
// STEP 1:寫入 channel
c <- true
// c <- false // block here
fmt.Println("After Receive")
}
// Before Receive
// [GO GO GO]
// [Receive value from channel true]
// After Receive

buffered channel 也有 length 和 capacity#

  • 和 slice 很類似,buffered channel 也有 length 和 capacity
    • length 指的是在 channel buffer 中還有多少數量的值還沒被讀取(queued),可以使用 len(channel) 查看
    • capacity 則是指 buffer 實際的 size,可以使用 cap(channel) 查看
// 這段程式之所以不會產生 deadlock 是因為 channel 還沒有出現 overflow,所以不會 block
func main() {
c := make(chan int, 3)
c <- 1
c <- 2
fmt.Printf("Length of channel c is %v and capacity of channel c is %v \n", len(c), cap(c))
}
// Length of channel c is 2 and capacity of channel c is 3

使用 for range 可以讀取 close 後 buffered channel 中的值#

func main() {
c := make(chan int, 3)
c <- 1
c <- 2
c <- 3
close(c)
for val := range c {
fmt.Println(val)
}
}

unidirectional channels#

除了可以建立可讀(read)可寫(write)的 channel 之外,還可以建立「只可讀(receive-only)」或「只可寫(send-only)」的 channel

// 建立只可接收或只可發送的 unidirectional channels
func main() {
roc := make(<-chan int) // receive only channel
soc := make(chan<- int) // send only channel
fmt.Printf("receive only channel type is '%T' \n", roc)
fmt.Printf("send only channel type is '%T' \n", soc)
}
// receive only channel type is '<-chan int'
// send only channel type is 'chan<- int'
  • 透過 unidirectional channels 可以增加型別的安全性(type-safety)
  • 但如果我們希望在某一個 goroutine 中只能從 channel 讀取資料,但在 main goroutine 中可以對這個 channel 讀和寫資料時,可以透過 go 提供的語法來將 bi-directional channel 轉換成 unidirectional channel
// <-chan 表示 receive only channel type
func greet(roc <-chan string) {
fmt.Println("Hello " + <-roc + "!")
// receive only channel 不能傳送資料
// invalid operation: cannot send to receive-only type <-chan string
// roc <- "foo"
}
func main() {
fmt.Println("main() started")
c := make(chan string)
go greet(c)
c <- "John"
fmt.Println("main() stopped")
}

Multiple Channels#

當前的 goroutine 阻塞時,就會切換到其他 goroutine#

// 程式碼修改自:https://medium.com/rungo/anatomy-of-channels-in-go-concurrency-in-go-1ec336086adb
func square(c chan int) {
fmt.Println("[square] wait for testNum")
num := <-c
fmt.Println("[square] sent square to squareChan (blocking)")
c <- num * num // 3. blocking, switch to other go routine
}
func cube(c chan int) {
fmt.Println("[cube] wait for testNum (blocking)")
num := <-c
fmt.Println("[cube] sent square to cubeChan")
c <- num * num * num // blocking
}
func main() {
fmt.Println("[main] main() started")
squareChan := make(chan int)
cubeChan := make(chan int)
go square(squareChan)
go cube(cubeChan)
testNum := 3
fmt.Println("[main] sent testNum to squareChan (blocking)")
squareChan <- testNum // 1. block, switch to other goroutine
fmt.Println("[main] resuming")
fmt.Println("[main] sent testNum to cubeChan")
cubeChan <- testNum
fmt.Println("[main] resuming")
fmt.Println("[main] reading from channels (blocking)")
squareVal, cubVal := <-squareChan, <-cubeChan
fmt.Println(squareVal, cubVal)
fmt.Println("[main] main() stopped")
}

first-class channels#

在 golang 中 channel 是 first-class values,和其他型別一樣,可以被當成是 struct 中的值、function 的參數、回傳值等等。

以下面的例子來說,make(chan chan string) 表示這個 channel 可以傳送和接收另一個(可以傳送和接收 string 的)channel

func greeter(cc chan chan string) {
c := make(chan string)
cc <- c
}
func greet(c chan string) {
fmt.Println("Hello " + <-c + "!")
}
func main() {
fmt.Println("main() started")
// a channel of data type channel of data type string
// 建立一個 channel 可以讀寫另一個(可以讀寫 string)的 channel
cc := make(chan chan string)
go greeter(cc)
c := <-cc
go greet(c)
c <- "John"
fmt.Println("main() stopped")
}
// main() started
// Hello John!
// main() stopped

讀取 Channels 中的資料#

for loop 搭配 close:需要手動 break 迴圈#

使用 for{} 來顯示內容,但需要手動 break loop:

for {
val, ok := <-channel
if ok == false {
// break the loop
break
} else {
// do something with val
fmt.Println(val)
}
}

實際範例:

func squares(c chan int) {
// 把 0 ~ 9 寫入 channel 後便把 channel 關閉
for i := 0; i <= 9; i++ {
c <- i
}
close(c)
}
func main() {
fmt.Println("main() started")
c := make(chan int)
// 發動 squares goroutine
go squares(c)
// 監聽 channel 的值:週期性的 block/unblock main goroutine 直到 squares goroutine close
for {
val, ok := <-c
if ok == false {
fmt.Println(val, ok, "<-- loop broke case channel closed")
break // exit loop
} else {
fmt.Println(val, ok)
}
}
fmt.Println("main() close")
}

for range 搭配 close:會自動關閉迴圈#

  • 如同上一段的程式碼,單純透過 for 需要自行判斷 channel 是否已經 close,如果是的話需要自行使用 break 把 for loop 終止。在 Go 中則提供了 for range loop,只要該 channel 被關閉後,loop 則會自動終止。
  • 需要特別留意,如果是在 main goroutine 中使用 for val := range channel {} 的寫法時,最後 channel 沒有被 close 的話程式會 deadlock。但如果是在其他的 goroutine 中使用,即使沒有 close 也不會 deadlock,但為了不必要的 bug 產生,一般都還是將其關閉。
for val := range channel {
// 當 channel 關閉時會自動 break loop
}

⚠️ for val := range c 後面的 c 直接帶入 channel,不用再使用 <-c(箭頭)。

透過 for i := range c 可以重複取出 channel 的值,直到該 channel 被關閉。和上面一段的單純使用 for loop 的寫法相比,精簡許多:

func squares(c chan int) {
// 把 0 ~ 9 寫入 channel 後便把 channel 關閉
for i := 0; i <= 9; i++ {
c <- i
}
close(c)
}
func main() {
fmt.Println("main() started")
c := make(chan int)
// 發動 squares goroutine
go squares(c)
// 使用 for range 的寫法,一但 channel close,loop 會自動 break
for val := range c {
fmt.Println(val)
}
fmt.Println("main() close")
}

⚠️ 需要特別留意,使用 for val := range channel {} 的寫法時,如果最後 channel 沒有被 close 的話程式會 deadlock。

使用 for val := range c 的其他範例:

//STEP 2:建立 fibonacci 函式
func fibonacci(n int, c chan int) {
x, y := 0, 1
for i := 0; i < n; i++ {
c <- x
x, y = y, x+y
}
// ⚠️ 如果沒有 close 的話,在 STEP 4 的地方並不知道
close(c)
}
func main() {
// STEP 1:建立一個 length 為 10 的 buffered channel
c := make(chan int, 10)
//STEP 3:使用 go routine
go fibonacci(cap(c), c)
// STEP 4:只要 channel 還有資料就輸出
for i := range c {
fmt.Println(i)
}
// ⚠️ 這裡的內容會等到所有 c channel 資料都算出後才會執行到...
}

select case#

switch case 不同,select case 中 case 接收的是 channel(而不是 boolean)。當程式執行到 select 的位置時,會阻塞(blocking)在那,直到有任何一個 case 收到 channel 傳來的資料後(除非有用 default)才會 unblock,因此通常會有另一個 channel 用來實作 Timeout 機制

select {
case res := <-ch1: // 如果需要取用 channel 中的值
fmt.Println(res)
case <-ch1: // 如果不需要取用 channel 中的值
fmt.Println("receive value")
}

⚠️ 和 for value := range channel 不同,select case 中,case 後的 channel 要記得加上 <- 把資料取出來。

select case 運作的流程是:

  • 如果所有的 case 都沒有接收到 channel 傳來的資料,那麼 select 會一直阻塞(block)在那,直到有任何的 case 收到資料後(unblock)才會繼續執行
  • 如果同一時間有多個 case 收到 channel 傳來的資料(有多個 channel 同時 non-blocking),那個會從所有這些 non-blocking 的 cases 中隨機挑選一個,接著才繼續執行

在下面的例子中,由於使用的是 unbuffered channel,因此 channel 只有要 send 或 receive 的動作時都會 block:

// 程式來源:https://medium.com/rungo/anatomy-of-channels-in-go-concurrency-in-go-1ec336086adb
var start time.Time
func init() {
start = time.Now()
}
func service1(c chan string) {
time.Sleep(1 * time.Second)
c <- "Hello from service 1"
}
func service2(c chan string) {
time.Sleep(3 * time.Second)
c <- "Hello from service 2"
}
func main() {
fmt.Println("main() started", time.Since(start))
chan1 := make(chan string)
chan2 := make(chan string)
go service1(chan1)
go service2(chan2)
fmt.Println("[main] select(blocking)")
select {
case res := <-chan1:
fmt.Println("[main] get response from service 1", res, time.Since(start))
case res := <-chan2:
fmt.Println("[main] get response from service 2", res, time.Since(start))
}
fmt.Println("main() stopped", time.Since(start))
}
// main() started 585ns
// [main] select(blocking)
// [main] get response from service 1 Hello from service 1 1.001434164s
// main() stopped 1.001481394s

隨機選取#

上面的程式中使用的是 unbuffered channel,所以對該 channel 任何的 send 或 receive 都會出現阻塞。我們可以使用 buffered channel 來模擬實際上 web service 處理回應的情況:

  • 由於 buffered channel 的 capacity 是 2,但傳入 channel 的 size 並沒有超過 2(沒有 overflow),因此程式會繼續執行而不會發生阻塞(non-blocking)
  • 當 buffered channel 中的有資料時,直到整個 buffer 都被清空為止前,從 buffered channel 讀取資料的動作都是 non-blocking 的,而且在下面的程式碼中又只讀取了一個值出來,因此整個 case 的操作都會是 non-blocking 的
  • 由於 select 中的所有 case 都是 non-blocking 的,因此 select 會從所有的 case 中隨機挑一個加以執行
// 程式碼來源:https://medium.com/rungo/anatomy-of-channels-in-go-concurrency-in-go-1ec336086adb
func main() {
fmt.Println("main() started", time.Since(start))
chan1 := make(chan string, 2)
chan2 := make(chan string, 2)
// buffered channel:因為 channel 中的資料沒有 overflow (> 2),所以不會阻塞
chan1 <- "Value 1"
chan1 <- "Value 2"
chan2 <- "Value 1"
chan2 <- "Value 2"
// buffered channel 中有資料時,讀取資料會是 non-blocking 的
// 由於 select 中的 case 都是 non-blocking 的,因此會隨機挑選一個執行
select {
case res := <-chan1:
fmt.Println("[main] get response from service 1", res, time.Since(start))
case res := <-chan2:
fmt.Println("[main] get response from service 2", res, time.Since(start))
}
fmt.Println("main() stopped", time.Since(start))
}

當 select 中的 case 同時收到 channel 的資料時,會隨機選取一個 channel:

/* Go 語言使用 Select 四大用法:https://blog.wu-boy.com/2019/11/four-tips-with-select-in-golang/ */
func main() {
ch := make(chan int, 1)
ch <- 1
select {
case <-ch:
fmt.Println("random 01")
case <-ch:
fmt.Println("random 02")
}
}

default#

default case 本身是非阻塞的(non-blocking),同時它也會使得 select statement 總是變成 non-blocking,也就是說,不論是 buffered 或 unbuffered channel 都會變成非阻塞的

當有任何資料可以從 channel 中取出時,select 就會執行該 case,但若沒有,就會直接進到 default case。簡單的說,當 channel 本身就有值時,就不會走到 default,但如果 channel 執行的當下沒有值,還需要等其他 goroutine 設值到 channel 的話,就會直接走到 default

💡 簡單的說,當 channel 本身就有值時,就不會走到 default,但如果 channel 執行的當下沒有值,還需要等其他 goroutine 設值到 channel 的話,就會直接走到 default

如果沒有資料送進 channel,也就是註解掉 ch <- 1 的話,程式會出現 panic(fatal error: all goroutines are asleep - deadlock!),這是因為它認為應該要從 channel 取到值,但卻沒有得到任何東西,雖然加上 default 後可以解決,但會使得 select case 不會被阻塞住,導致還沒收到 channel 的訊息前,main goroutine 就執行完畢了

/* Go 語言使用 Select 四大用法:https://blog.wu-boy.com/2019/11/four-tips-with-select-in-golang/ */
func main() {
ch := make(chan int, 1)
//ch <- 1
select {
case <-ch:
fmt.Println("random 01")
case <-ch:
fmt.Println("random 02")
default: // 當沒有 value 從 channel 送出的話,會走 default
fmt.Println("exit: no value from channel")
}
}

Timeout 超時機制:使用 time.After#

單純使用 default case 並不是非常有用,有時我們希望的是有 timeout 的機制,也就是超過一定時間後,沒有收到任何回應時,才做預設的行為,這時候我們可以使用 time.After 來完成:

time.After()time.Tick() 都是會回傳 time.Time 型別的 receive channel(<- channel

/* Go 語言使用 Select 四大用法:https://blog.wu-boy.com/2019/11/four-tips-with-select-in-golang/ */
func main() {
ch := make(chan int)
select {
case <-ch:
fmt.Println("receive value from channel")
// 超過一秒沒有收到主要 channel 的 value,就會收到 time.After 送來的訊息
case <-time.After(1 * time.Second):
fmt.Println("timeout after 1 second")
}
}

以原本的例子來說:

  • service1 會需要花 3 秒
  • service2 會需要花 5 秒
  • time.After() 設定 2 秒,因此在 service1 和 service2 還沒完成前就會觸發 timeout
var start time.Time
func init() {
start = time.Now()
}
func service1(c chan string) {
fmt.Println("service1() started", time.Since(start))
time.Sleep(3 * time.Second)
c <- "Hello from service1"
}
func service2(c chan string) {
fmt.Println("service2() started", time.Since(start))
time.Sleep(5 * time.Second)
c <- "Hello from service2"
}
func main() {
fmt.Println("main() started", time.Since(start))
chan1 := make(chan string, 1)
chan2 := make(chan string, 1)
go service1(chan1)
go service2(chan2)
select {
case res := <-chan1:
fmt.Println("get response from service 1", res, time.Since(start))
case res := <-chan2:
fmt.Println("get response from service 2", res, time.Since(start))
case <-time.After(2 * time.Second):
fmt.Println("No response received", time.Since(start))
}
}

empty select#

如同 for{} 迴圈可以不帶任何條件一樣,select {} 也可以不搭配 case 使用(稱作,empty select)。

從前面的例子中可以看到,因為 select statement 會一直阻塞(blocking),直到其中一個 case unblocks 時,才會繼續往後執行,但因為 empty select 中並沒有任何的 case statement,因此 main goroutine 將會永遠阻塞在那,如果沒有其他 goroutine 可以持續運行的話,最終導致 deadlock。

func service() {
fmt.Println("Hello from service ")
}
func main() {
fmt.Println("main() started")
go service()
// 這個 select 會永遠 block 在這
select {}
fmt.Println("main() stopped")
}
// main() started
// Hello from service
// fatal error: all goroutines are asleep - deadlock!

如果在 main goroutine 使用 empty select 後,main goroutine 將會完全阻塞,需要靠其他的 goroutine 持續運作才不至於進入 deadlock:

// main goroutine 持續 block 的情況下
// 需要靠其他 goroutine 持續運行才不會進入 deadlock
func service() {
for {
fmt.Println("Hello from service ")
time.Sleep(time.Millisecond * 400)
}
}
func main() {
fmt.Println("main() started")
go service()
// 這個 select 會永遠 block 在這
select {}
fmt.Println("main() stopped")
}
// main() started
// Hello from service
// Hello from service
// ...

另外透過 empty select 導致 main goroutine 阻塞的這種方式,可以在 server 啟動兩個不同的 service:

var start time.Time
func init() {
start = time.Now()
}
func service1() {
for {
fmt.Println("Hello from service1 ", time.Since((start)))
time.Sleep(time.Millisecond * 500)
}
}
func service2() {
for {
fmt.Println("Hello from service2 ", time.Since((start)))
time.Sleep(time.Millisecond * 700)
}
}
func main() {
fmt.Println("main() started")
go service1()
go service2()
// 這個 select 會永遠 block 在這,service1 和 service2 輪流輸出訊息
select {}
fmt.Println("main() stopped") // 這行不會被執行到
}

判斷是否超過 channel 的 buffer size#

/* Go 語言使用 Select 四大用法:https://blog.wu-boy.com/2019/11/four-tips-with-select-in-golang/ */
func main() {
// STEP 1:建立一個只能裝 buffer size 為 1 資料
ch := make(chan int, 1)
ch <- 1
select {
case ch <- 2:
fmt.Println("channel value is", <-ch)
fmt.Println("channel value is", <-ch)
default:
// ch 中的內容超過 1 時,但若把 channel buffer size 的容量改成 2,就不會走到 default
fmt.Println("channel blocking")
}
}

使用 for + select 讀取多個 channel 的 value#

// A Tour of Go: https://tour.golang.org/concurrency/6
func main() {
tick := time.Tick(100 * time.Millisecond)
boom := time.After(500 * time.Millisecond)
for {
select {
case <-tick:
fmt.Println("tick.")
case <-boom:
fmt.Println("BOOM!")
return // 如果沒有 return 的話程式將不會結束,一直卡在 for loop 中
default:
fmt.Println(" .")
time.Sleep(50 * time.Millisecond)
}
}
}

使用範例:

/* Go 語言使用 Select 四大用法:https://blog.wu-boy.com/2019/11/four-tips-with-select-in-golang/ */
func main() {
ch1 := make(chan string)
ch2 := make(chan int, 1)
defer func() {
fmt.Println("------ In defer ------")
close(ch1)
close(ch2)
}()
i := 0
//STEP 1:建立一個 Go Routine
go func() {
fmt.Println("In Go Routine")
// STEP 2:透過 for loop 來不停來監控不同 channels 傳回來的資料
LOOP:
for {
// STEP 3:透過 sleep 讓它每 500 毫秒檢查有無 channel 傳訊息進來
time.Sleep(500 * time.Millisecond)
i++
fmt.Printf("In Go Routine, i: %v, time: %v \n", i, time.Now().Unix())
// STEP 4:透過 select 判斷不同 channel 傳入的資料
select {
case m := <-ch1: // STEP 6:當收到 channel 1 傳入的資料時,就 break
fmt.Printf("In Go Routine, get message from channel 1: %v \n", m)
break LOOP
case m := <-ch2: // STEP 7:當收到 channel 2 傳入的資料時,單純輸出訊息
fmt.Printf("In Go Routine, get message from channel 2: %v\n", m)
default: // STEP 5:檢查時,如果沒有 channel 丟資料進 channel 則走 default
fmt.Println("In Go Routine to DEFAULT")
}
}
}()
ch2 <- 666 // STEP 8:在 sleep 前將訊息丟入 channel2
fmt.Println("Start Sleep")
// STEP 9:雖然這裡 sleep,但 go routine 中的 for 迴圈仍然不斷在檢查有無收到訊息
time.Sleep(4 * time.Second)
fmt.Println("After Sleep: send value to channel")
// STEP 10:四秒後把 "stop" 傳進 channel 1,for 迴圈收到訊息後 break
ch1 <- "stop"
fmt.Println("------ End ------")
}

WaitGroup#

WaitGroup 的用法適合用在需要將單一任務拆成許多次任務,待所有任務完成後才繼續執行的情境。

💡 這種做法適合用在單純等待任務完成,而不需要從 goroutine 中取得所需資料的情況,如果會需要從 goroutine 中返回資料,那麼比較好的做法是使用 channel。

使用 sync.WaitGroup package 提供的:

  • var wg sync.WaitGroup 可以建立 waitgroup,預設 counter 是 0
  • wg.Add(delta int) 增加要等待的次數(increment counter),也可以是負值,通常就是要等待完成的 goroutine 數目
  • wg.Done() 會把要等待的次數減 1(decrement counter),可以使用 defer wg.Done()
  • wg.Wait() 會阻塞在這,直到 counter 歸零,也就是所有 WaitGroup 都呼叫過 done 後才往後執行
// 程式來源:https://medium.com/rungo/anatomy-of-channels-in-go-concurrency-in-go-1ec336086adb
var start time.Time
func init() {
start = time.Now()
}
func service(wg *sync.WaitGroup, instance int) {
time.Sleep(time.Duration(instance) * 500 * time.Millisecond)
fmt.Println("Service called on instance", instance, time.Since(start))
wg.Done() // 4. 減少 counter
}
func main() {
fmt.Println("main() started ", time.Since(start))
var wg sync.WaitGroup // 1. 建立 waitgroup(empty struct)
for i := 1; i <= 3; i++ {
wg.Add(1) // 2. 增加 counter
go service(&wg, i) // 一共啟動了 3 個 goroutine
}
wg.Wait() // 3. blocking 直到 counter 為 0
fmt.Println("main() stopped ", time.Since(start))
}

這裡的 wg 需要把 pointer 傳進去 goroutine 中,如果不是傳 pointer 進去而是傳 value 的話,將沒辦法有效把 main goroutine 中的 waitGroup 的 counter 減 1。

// 程式碼修改自 Concurrency Patterns in Go: sync.WaitGroup @ https://www.calhoun.io/
func main() {
notify("Service-1", "Service-2", "Service-3")
}
func notifying(wg *sync.WaitGroup, s string) {
fmt.Printf("Starting to notifying %s...\n", s)
time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
fmt.Printf("Finish notifying %s\n", s)
wg.Done() // 執行 done
}
func notify(services ...string) {
var wg sync.WaitGroup // 建立 WaitGroup
for _, service := range services {
wg.Add(1) // 添加 counter 的次數
go notifying(&wg, service)
}
wg.Wait() // block 在這,直到 counter 歸零後才繼續往後執行
fmt.Println("All service notified!")
}

如果我們需要使用到 goroutine 中回傳的資料,那個應該要使用 channel 而不是 waitGroup,例如:

// 程式碼修改自 Concurrency Patterns in Go: sync.WaitGroup @ https://www.calhoun.io/
func main() {
notify("Service-1", "Service-2", "Service-3")
}
func notifying(res chan string, s string) {
fmt.Printf("Starting to notifying %s...\n", s)
time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
res <- fmt.Sprintf("Finish notifying %s", s)
}
func notify(services ...string) {
res := make(chan string)
var count int = 0
for _, service := range services {
count++
go notifying(res, service)
}
for i := 0; i < count; i++ {
fmt.Println(<-res)
}
fmt.Println("All service notified!")
}

Worker Pool#

worker pool 指的是有許多的 goroutines 同步的進行一個工作。要建立 worker pool,會先建立許多的 worker goroutine,這些 goroutine 中會:

  • 進行相同的 job
  • 有兩個 channel,一個用來接受任務(task channel),一個用來回傳結果(result channel)
  • 都等待 task channel 傳來要進行的 tasks
  • 一但收到 tasks 就可以做事並透過 result channel 回傳結果
// 程式來源:https://medium.com/rungo/anatomy-of-channels-in-go-concurrency-in-go-1ec336086adb
// STEP 3:在 worker goroutines 中會做相同的工作
// tasks is receive only channel
// results is send only channel
func sqrWorker(tasks <-chan int, results chan<- int, instance int) {
// 一旦收到 tasks channel 傳來資料,就可以動工並回傳結果
for num := range tasks {
time.Sleep(500 * time.Millisecond) // 模擬會阻塞的任務
fmt.Printf("[worker %v] Sending result of task %v \n", instance, num)
results <- num * num
}
}
func main() {
fmt.Println("[main] main() started")
// STEP 1:建立兩個 channel,一個用來傳送 tasks,一個用來接收 results
tasks := make(chan int, 10)
results := make(chan int, 10)
// STEP 2 啟動三個不同的 worker goroutines
for i := 1; i <= 3; i++ {
go sqrWorker(tasks, results, i)
}
// STEP 4:發送 5 個不同的任務
for i := 1; i <= 5; i++ {
tasks <- i // non-blocking(因為 buffered channel 的 capacity 是 10)
}
fmt.Println("[main] Wrote 5 tasks")
// STEP 5:發送完任務後把 channel 關閉(非必要,但可減少 bug)
close(tasks)
// STEP 6:等待各個 worker 從 result channel 回傳結果
for i := 1; i <= 5; i++ {
result := <-results // blocking(因為 buffer 是空的)
fmt.Println("[main] Result", i, ":", result)
}
fmt.Println("[main] main() stopped")
}

輸出的結果可能是:

[main] main() started
[main] Wrote 5 tasks
[worker 3] Sending result of task 1
[main] Result 1 : 1
[worker 2] Sending result of task 3
[worker 1] Sending result of task 2
[main] Result 2 : 9
[main] Result 3 : 4
[worker 2] Sending result of task 5
[worker 3] Sending result of task 4
[main] Result 4 : 25
[main] Result 5 : 16
[main] main() stopped

從上面的例子中,可以看到當所有 worker 都剛好 blocking 的時候,控制權就會交回 main goroutine,這時候就可以立即看到計算好的結果。

WorkerGroup 搭配 WaitGroup#

但有些時候,我們希望所有的 tasks 都執行完後才讓 main goroutine 繼續往後做,這時候可以搭配 WaitGroup 使用:

func sqrWorker(wg *sync.WaitGroup, tasks <-chan int, results chan<- int, instance int) {
defer wg.Done()
// 一旦收到 tasks channel 傳來資料,就可以動工並回傳結果
for num := range tasks {
time.Sleep(500 * time.Millisecond) // 模擬會阻塞的任務
fmt.Printf("[worker %v] Sending result of task %v \n", instance, num)
results <- num * num
}
}
func main() {
fmt.Println("[main] main() started")
var wg sync.WaitGroup
tasks := make(chan int, 10)
results := make(chan int, 10)
for i := 1; i <= 3; i++ {
wg.Add(1)
go sqrWorker(&wg, tasks, results, i)
}
for i := 1; i <= 5; i++ {
tasks <- i // non-blocking(因為 buffered channel 的 capacity 是 10)
}
fmt.Println("[main] Wrote 5 tasks")
close(tasks) // 有用 waitGroup 的話這個 close 不能省略
// 直到所有的 worker goroutine 把所有 tasks 都做完後才繼續往後
wg.Wait()
for i := 1; i <= 5; i++ {
result := <-results // blocking(因為 buffer 是空的)
fmt.Println("[main] Result", i, ":", result)
}
fmt.Println("[main] main() stopped")
}

這時會等到所有的 worker 都完工下班後,才開始輸出計算好的結果。搭配 WaitGroup 的好處是可以等到所有 worker 都完工後還讓程式繼續,但相對的會需要花更長的時間在等待所有人完工:

[main] main() started
[main] Wrote 5 tasks
[worker 2] Sending result of task 3
[worker 1] Sending result of task 2
[worker 3] Sending result of task 1
[worker 1] Sending result of task 5
[worker 2] Sending result of task 4
[main] Result 1 : 9
[main] Result 2 : 4
[main] Result 3 : 1
[main] Result 4 : 25
[main] Result 5 : 16
[main] main() stopped

mutex#

在 goroutines 中,由於有獨立的 stack,因此並不會在彼此之間共享資料(也就是在 scope 中的變數);然而在 heap 中的資料是會在不同 goroutine 之間共享的(也就是 global 的變數),在這種情況下,許多的 goroutine 會試著操作相同記憶體位址的資料,導致未預期的結果發生。

從下面的例子中可以看到,我們預期 i 的結果會是 1000,但是因為 race condition 的情況,最終的結果並不會是 1000:

// 程式來源:https://medium.com/rungo/anatomy-of-channels-in-go-concurrency-in-go-1ec336086adb
var i int // i == 0
func worker(wg *sync.WaitGroup) {
i = i + 1
wg.Done()
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go worker(&wg)
}
wg.Wait()
fmt.Println(i) // value i should be 1000 but it did not
}

之所以會有這個情況產生,是因為多個 goroutine 在執行時,在為 i 賦值前(即,i = i + 1)拿到的是同一個值的 i,因此雖然跑了多次 goroutine,但對於 i 來說,值並沒有增加。

為了要避免多個 goroutine 同時取用到一個 heap 中的變數,第一原則是應該要盡可能避免在多個 goroutine 中使用共享的資源(變數)。

如果無法避免會需要操作共用的變數的話,則可以使用 Mutex(mutual exclusion),也就是說在一個時間內只有一個 goroutine(thread)可以對該變數進行操作,在對該變數進行操作前,會先把它「上鎖」,操作完後再進行「解鎖」的動作,當一個變數被上鎖的時候,其他的 goroutine 都不能對該變數進行讀取和寫入

mutex 是 map 型別的方法,被放在 sync package 中,使用 mutex.Lock() 可以上鎖,使用 mutex.Unlock() 可以解鎖:

var i int // i == 0
func worker(wg *sync.WaitGroup, m *sync.Mutex) {
m.Lock() // 上鎖
i = i + 1
m.Unlock() // 解鎖
wg.Done()
}
func main() {
var wg sync.WaitGroup
var m sync.Mutex
for i := 0; i < 1000; i++ {
wg.Add(1)
go worker(&wg, &m) // 把 mutex 的記憶體位址傳入
}
wg.Wait()
fmt.Println(i) // 在使用 mutex 對 heap 中的變數進行上鎖和解鎖後,即可以確保最終的值是 1000
}

⚠️ mutex 和 waitgroup 一樣,都是把「記憶體位址」傳入 goroutine 中使用。

如同前面所說,第一原則應該是要避免 race condition 的方法,也就是不要在 goroutine 中對共用的變數進行操作,在 go 的 CLI 中可以透過下面的指令檢測程式中是否有 race condition 的情況:

$ go run -race program.go

Concurrency Pattern#

Generator#

透過 goroutine 的方式,

// 程式來源:https://medium.com/rungo/anatomy-of-channels-in-go-concurrency-in-go-1ec336086adb
// fib 會回傳 read-only channel
func fib(length int) <-chan int {
c := make(chan int, length)
// run generation concurrently
go func() {
for i, j := 0, 1; i < length; i, j = i+j, i {
c <- i
}
close(c)
}()
// return channel
return c
}
func main() {
for fn := range fib(10) {
fmt.Println("Current fibonacci number is ", fn)
}
}

範例程式碼#

範例 WaitGroup 搭配 Channel#

⚠️ 邏輯上,可以單獨使用 channel 就好,不需要使用 WaitGroup。

func worker(wg *sync.WaitGroup, c chan<- int, i int) {
fmt.Println("[worker] start i:", i)
time.Sleep(time.Second * 1)
defer wg.Done()
c <- i
fmt.Println("[worker] finish i:", i)
}
func main() {
numOfFacilities := 6
var wg sync.WaitGroup
c := make(chan int, numOfFacilities)
for i := 0; i < numOfFacilities; i++ {
fmt.Println("[main] add i: ", i)
wg.Add(1)
go worker(&wg, c, i)
}
wg.Wait()
var numbers []int
for i := 0; i < numOfFacilities; i++ {
numbers = append(numbers, <-c)
}
fmt.Println("[main] ---all finish---", numbers)
defer close(c)
}

範例#

func controller(c chan string, wg *sync.WaitGroup) {
fmt.Println("controller() start and block")
wg.Wait()
fmt.Println("controller() unblock and close channel")
close(c)
fmt.Println("controller() end")
}
func printString(s string, c chan string, wg *sync.WaitGroup) {
fmt.Println(s)
wg.Done()
c <- "Done printing: " + s
}
func main() {
fmt.Println("main() start")
c := make(chan string)
var wg sync.WaitGroup
for i := 1; i <= 4; i++ {
go printString("Hello ~ "+strconv.Itoa(i), c, &wg)
wg.Add(1)
}
go controller(c, &wg)
for message := range c {
fmt.Println(message)
}
fmt.Println("main() end")
}

範例#

// A Tour of Go: https://tour.golang.org/concurrency/2
func sum(s []int, c chan int) {
sum := 0
for _, v := range s {
sum += v
}
// STEP 3:把加總後的值丟回 channel
c <- sum // send sum to c
}
func main() {
s := []int{7, 2, 8, -9, 4, 0}
// STEP 1:建立一個 channel,該 channel 會傳出 int
c := make(chan int)
// STEP 2:使用 goroutine,並把 channel 當成參數傳入
go sum(s[:len(s)/2], c)
go sum(s[len(s)/2:], c)
// STEP 4:從 channel 取得計算好的結果
x, y := <-c, <-c
// ⚠️ 寫在這裡的內容會在 channel 傳回結果後才會被執行...
fmt.Println(x, y, x+y)
}

範例#

package main
import (
"fmt"
"net/http"
)
func main() {
links := []string{
"http://google.com",
"http://facebook.com",
"http://stackoverflow.com",
"http://golang.com",
"http://amazon.com",
}
for _, link := range links {
checkLink(link) // 這裡會被阻塞
}
}
func checkLink(link string) {
_, err := http.Get(link)
if err != nil {
fmt.Println(link, "might be down!")
return
}
fmt.Println(link, "is up!")
}

參考#

Last updated on