goroutine と並行処理の怖い話

             

概要

golang の goroutine と channel は非常に強力な機能で、並行処理へのハードルが下がり、手軽に使うことが出来ます。

本当にそうでしょうか。

ハードルは下がる… しかし、下手に扱うと panic, deadlock, race condition, goroutine leak… 怖いもの (嵌りポイント) が多いです。

panic

まず、 defer + recover していない goroutine の中で panic が起こると、メイン goroutine を巻き込んで process が死にます。

package main

import (
	"fmt"
)

func main() {
	defer recover()
	defer fmt.Println("finish")

	fmt.Println("main goroutine")

	ch := make(chan struct{})

	go func() {
		defer close(ch)
		panic("panic in new goroutine")
	}()

	<-ch
}

https://play.golang.org/p/egORpG92URt

上のコードを実行してみると分かりますが、 recover できずに異常終了します。 recover できるのは同じ goroutine の中だけで、別の goroutine からは制御不能ということです。

Tips

panic により process が死んでは困る software (API server など) を実装する際は、 goroutine の生成と defer + recover による panic ハンドリングはセット。 逆に CLI や batch など、異常終了を外部から検知して再度実行すれば良いものは、必ずしも panic をハンドリングする必要はありません。

deadlock

DB 周りでよく聞く話ではありますが goroutine 間でも発生します。 sync package を使う場合や channel によって引き起こされます。

package main

import (
	"fmt"
)

func main() {
	fmt.Println("main goroutine")

	result := make(chan string)

	go func() {
		result <- "done"
	}()

	for res := range result {
		fmt.Println(res)
	}
}

https://play.golang.org/p/0S3Dad6D6Sc

上のコードは何が問題でしょうか。 range で channel が扱えるのは便利ですが、 channel が close されるまで for ループを抜けることが無いため、 deadlock が発生します。

Tips

channel は使い終わったら close します。特に range に渡す時は気をつける必要があります。 また、これは DB でも共通する話ですが、実装する際に Lock する順番と Lock による影響範囲に気を付ける事で deadlock は避けられます。

race condition

複数の goroutine から同時に1つのメモリの値を読み書きすると、再現性の低くデバッグしにくい不具合を引き起こす事があり、 race condition と呼ばれます。他の言語でも thread などを用いた並行処理によっても同じ問題が起こると思います。

package main

import (
	"fmt"
)

var count int

func main() {
	done := make(chan struct{})
	go func() {
		defer close(done)
		count++
	}()

	count++

	<-done
	fmt.Println(count)
}

上のように、グローバル変数に対して同時に読み書きが発生するようなコードは書いてはいけません。 これは Go の Data Race Detector によって検出が可能です。

$ go run --race main.go
==================
WARNING: DATA RACE
Read at 0x0000005b73c0 by goroutine 6:
  main.main.func1()
      /go/src/example/main.go:13 +0x64

Previous write at 0x0000005b73c0 by main goroutine:
  main.main()
      /go/src/example/main.go:16 +0x95

Goroutine 6 (running) created at:
  main.main()
      /go/src/example/main.go:11 +0x69
==================
2
Found 1 data race(s)
exit status 66

Tips

goroutine を跨いだ値のやり取りをする際は sync.Mutex を使った Lock, Unlock による排他制御、もしくは channel を用いた通信を行います。 map は別々の key に対する読み書きでも race condition が起こるので特に注意が必要で、 go 1.9 以降では sync.Map が用意されています。

goroutine leak

生成した goroutine の中で sync.WaitGroup や channel によって待ち状態のまま役目を終えて放置されてしまうと leak します。

待ち状態の場合は CPU 資源をほとんど影響ありませんが、その goroutine から参照されている変数などは GC の対象とならず memory leak を引き起こします。

例えば、次のようなコードで goroutine leak が発生します。 A, B 2つに問い合わせて結果が早い方を採用する実装です。

package main

import (
	"bytes"
	"fmt"
	"math/rand"
	"runtime/pprof"
	"time"
)

func main() {
	p := pprof.Lookup("goroutine")

	for i := 0; i < 5; i++ {
		fmt.Printf("goroutines: %d\n", p.Count())

		var res *bytes.Buffer

		select {
		case res = <-fetch("http://example.com/A"):
			fmt.Println("faster A than B")
		case res = <-fetch("http://example.com/B"):
			fmt.Println("faster B than A")
		}

		fmt.Printf("%s\n", res.Bytes())
	}
}

func fetch(uri string) <-chan *bytes.Buffer {
	ch := make(chan *bytes.Buffer)

	go func(ch chan<- *bytes.Buffer) {
		// fetch する時間が不規則に変わる場合を想定した仮実装で、深い意味は無いです
		time.Sleep(time.Millisecond * time.Duration(rand.Int63n(10)))
		ch <- bytes.NewBufferString("hello")
	}(ch)

	return ch
}

https://play.golang.org/p/9V1l8_o7Y55

今回は main 関数から呼び出しているため main 関数が終了することによって process が正常終了するので致命的な問題にはなりませんが、 process が何時間も何日も走り続け goroutine leak が積り積もっていくと無視できない規模の memory leak となり、最終的には out of memory であったり、 OOM Killer の餌食となり process は死ぬでしょう。

今回の場合の対処法は2つあり、1つは単純で channel の buffer を 1 に設定します。 fetch method は channel に対して buffer を1つしか送信しない事が明白なので、 1 で十分です。するとどうでしょう。 fetch method で生成されている goroutine の中で channel ch に送信する際に待ちが発生しなくなり goroutine は即座に終了します。 channel ch も参照を失うので GC される事でしょう。

もう1つの方法は、 channel に対して buffer がいくつ送信されるか分からない状態でも使える手法で、一般的にこちらを採用するほうが良いです。

package main

import (
	"bytes"
	"context"
	"fmt"
	"math/rand"
	"runtime/pprof"
	"time"
)

func main() {
	p := pprof.Lookup("goroutine")

	for i := 0; i < 5; i++ {
		fmt.Printf("goroutines: %d\n", p.Count())

		func() {
			var res *bytes.Buffer

			ctx, cancel := context.WithCancel(context.Background())
			defer cancel()

			select {
			case res = <-fetch(ctx, "http://example.com/A"):
				fmt.Println("faster A than B")
			case res = <-fetch(ctx, "http://example.com/B"):
				fmt.Println("faster B than A")
			}

			fmt.Printf("%s\n", res.Bytes())
		}()
	}
}

func fetch(ctx context.Context, uri string) <-chan *bytes.Buffer {
	ch := make(chan *bytes.Buffer)

	go func(ch chan<- *bytes.Buffer) {
		// fetch する時間が不規則に変わる場合を想定した仮実装で、深い意味は無いです
		time.Sleep(time.Millisecond * time.Duration(rand.Int63n(10)))

		select {
		case ch <- bytes.NewBufferString("hello"):
		case <-ctx.Done():
		}
	}(ch)

	return ch
}

https://play.golang.org/p/QZE7xCp8tgZ

context を渡して cancel を伝播させています。goroutine を終了させる処理は各 goroutine 内で実装する必要があります。今回は channel の送信待ちによって終了が妨げされていたので上のように実装することで cancel 時に終了するように実装しています。

Tips

中断される可能性のある処理には context を渡し、呼び出し元からの cancel を伝播して、生成された goroutine それぞれで停止する処理を実装します。

まとめ

いかがだったでしょうか。

method の呼び出しの前に go と付けるだけで並行処理ができる Go ですが、 goroutine だけでなく defer, recover, channel, select, context package (cancel の伝播), sync package (排他制御) といった複数の機能や概念を組み合わせて正しく実装しなければ、予期せぬ不具合を誘発します。

真面目に考えると意外と難しい事が分かるかと思います。 それでは Go で安全な並行処理を。

※本文中、サンプルコード内にミスがあった場合はご指摘いただけると幸いです🙏