LayerX エンジニアブログ

LayerX の エンジニアブログです。

Go MachineryでGraceful shutdownできてないバグを直した話

本ブログはLayerX Tech Advent Calendar 2022 の 12/13 の記事です。

こんにちは、バクラクのOCRなどを開発しているTomoakiです。

コーヒーはラテよりカプチーノ派です。

Machineryとは

MachineryはGo製の非同期タスク管理フレームワークです。

github.com

バクラクのバックエンドでは主にGo言語を採用しており、非同期で実行したい処理はメッセージキューとしてAmazon SQS、メッセージの送受信やジョブワーカーの管理にはMachinery(v1)を使用しています。

OCRでも採用している技術スタックは同じで、突然大量のリクエストが来ても対応できるように非同期処理を多用しております。 以下の図のようにファイルがアップロードされてから、OCRの実行結果を返すまでの過程で非同期処理がたくさん実行されています。

https://cdn-ak.f.st-hatena.com/images/fotolife/s/shun_tak/20210517/20210517002850.png

OCRの非同期タスクのパイプラインの詳細のついてはこちらを参照ください。 tech.layerx.co.jp

今回起きていた事象

さて本題に入りますが、今回デプロイに伴い古いジョブワーカーをシャットダウンさせる時に、graceful shutdown しているにも関わらず、毎回数件のタスクがなぜか強制終了されてしまうという現象が観測されました。

スピードが求められる経理業務においてこれは望ましくありません。特に月初では1分1秒を争って作業しており、OCRのせいで作業を遅れさせるわけにはいきません。

調査

まずは情報収集です。ログを見てみましょう。

ジョブワーカーのシャットダウンのログ

ジョブワーカーがシャットダウンする時のログを見てみると、terminateのシグナルを受信してから0.2秒ほどでGracefull shutdownしています。

タイムアウトになったリクエストを追ってみるとタスクがAWS SQSの可視性タイムアウトと同じ時間後再実行されていました。

SQSのメッセージが可視性タイムアウトになっているとするとメッセージの受信自体はされていそうなので、どうやらタスクの処理が完了しないままなぜかジョブワーカーがシャットダウンしていそうです。

Machineryのコードを見てみましょう。 github.com

terminateのシグナルを受信時の処理は以下のようになっています。

// Goroutine Handle SIGINT and SIGTERM signals
        go func() {
            for s := range sig {
                log.WARNING.Printf("Signal received: %v", s)
                signalsReceived++

                if signalsReceived < 2 {
                    // After first Ctrl+C start quitting the worker gracefully
                    log.WARNING.Print("Waiting for running tasks to finish before shutting down")
                    signalWG.Add(1)
                    go func() {
                        worker.Quit()
                        errorsChan <- ErrWorkerQuitGracefully
                        signalWG.Done()
                    }()
                } else {
                    // Abort the program when user hits Ctrl+C second time in a row
                    errorsChan <- ErrWorkerQuitAbruptly
                }
            }
        }()

シャットダウンの具体的な処理はworker.Quit()の中身なので、中を見に行くと以下の関数に辿り着きます。

// StopConsuming quits the loop
func (b *Broker) StopConsuming() {
    b.Broker.StopConsuming()

    b.stopReceiving()

    // Waiting for any tasks being processed to finish
    b.processingWG.Wait()

    // Waiting for the receiving goroutine to have stopped
    b.receivingWG.Wait()
}

少々乱暴に説明すると、Machineryではキューからメッセージを受信するためのgoroutine(Receive)と、受信したメッセージのタスクを処理するgoroutine(Consume)の二つがあります。

上記のStopConsuming()関数を見るとConsumeを止めた後に、Receiveを止めています。

ここで仮説として、メッセージの受信中の間に、terminateのシグナルを受けると、先にメッセージを処理するgoroutine(Consume)を終了させてしまうため、受信中だったメッセージが受信完了しても処理できていないのではないかと考えてみます。

仮説のイメージ図

仮説をたてたら検証です。メッセージの受信中にジョブワーカーにterminateのシグナルを送ると実際にどうなるかローカルで検証してみます。

ローカルでデバッグ

デバッグはMachineryをcloneしたものを用意しそれをローカルで動かして利用しました。 go mod edit -replaceコマンドを使えば、レポジトリで使われているMachineryをデバッグ用にcloneしたMachineryに向き先を一括で変えることができるので便利です。

go mod edit -replace github.com/RichardKnop/machinery=<ローカルのMachineryへのパス>/machinery

cloneしたレポジトリにpushしてれば以下でも大丈夫です

go mod edit -replace github.com/RichardKnop/machinery=github.com/<githubのアカウント名>/machinery

ジョブワーカーにはデバッグ用のログを仕込みます。

  • タスクを受信開始(start receiving)
  • 受信のgoroutineの終了(stop receiving goroutine)
  • 処理のgoroutineの終了(stop consuming goroutine)

にそれぞれデバッグ用にログを仕込みました。

また、上記のログに加え今回はメッセージの受信中の時にterminateのシグナルを送りたいので受信開始直後にsleep処理を入れることで受信時間を少し長くさせました。 こうすることで確実にタスクの受信中の間にterminateのシグナルを送れます。

AWS SQSはlocalstackを利用することで仮想的に本番環境を再現させました。デフォルトだと可視性タイムアウトは30秒です。

ローカルで2台(worker1、worker2と呼ぶ)のジョブワーカーを起動し、OCRのリクエストを投げます。 ジョブワーカー(worker1)がタスクの受信を開始したタイミングでそのジョブワーカー(worker1)をシャットダウンさせます。

worker1のログ

① worker1がメッセージの受信開始
INFO: 2022/11/04 09:33:08 sqs.go:91 start receiving
WARNING: 2022/11/04 09:33:08 worker.go:101 Signal received: interrupt
WARNING: 2022/11/04 09:33:08 worker.go:106 Waiting for running tasks to finish before shutting down
INFO: 2022/11/04 09:33:08 sqs.go:121 stop consuming goroutine
WARNING: 2022/11/04 09:33:08 broker.go:118 Stop channel
INFO: 2022/11/04 09:33:08 sqs.go:124 stop receiving goroutine
② タスクを処理することなくworeker1が終了
{"level":"info","label":"machinery","worker":"main_worker","error":"Worker quit gracefully","id":"1c3c8599-0ad0-49a0-b808-8f20763b7f55","ts":"2022-11-04T09:33:10+09:00","caller":"cmd/worker/main.go:67"}

worker2のログ

③ worker2がメッセージの受信開始
INFO: 2022/11/04 09:33:38 sqs.go:91 start receiving

時系列で見ていきましょう

時系列でのworker1とworker2の処理

① worker1がメッセージの受信開始

まずworker1がSQSからメッセージの受信を開始したログが出ました。 このタイミングでworkerにterminiteのシグナルを送ります。 受信にはsleep処理を入れているので確実にメッセージの受信中にterminiteのシグナルが送られます。

② タスクを処理することなくworker1が終了

ログを見るとterminiteのシグナルを受けてすぐにWorker quit gracefullyというログが出ています。 タスクが正常に処理されていた場合は、処理内容のログが出るの出るのですが、それらがないことからタスクが処理されることなくジョブワーカーが終了していることがわかります。

③ worker2がメッセージの受信開始

①からちょうど1分を経過したのちworker2でメッセージの受信が開始しました。これは可視性タイムアウトの30秒と一致しています! 仮説があっている気がしてきました!

修正する

StopConsuming()でConsumeを止めた後に、Receiveを止めているのが問題なので、Receiveを先に止め、Receive中の処理が完全に消化されたらConsumeも止めてあげるようにします。

修正後のイメージ

// StopConsuming quits the loop
func (b *Broker) StopConsuming() {
-   b.Broker.StopConsuming()
+   b.Broker.StopRetrySQS()

    b.stopReceiving()

-   // Waiting for any tasks being processed to finish
-   b.processingWG.Wait()

    // Waiting for the receiving goroutine to have stopped
    b.receivingWG.Wait()

+   b.Broker.StopConsumingSQS()

+   // Waiting for any tasks being processed to finish
+   b.processingWG.Wait()
}

修正に伴い周辺に微修正が必要な箇所もあったので詳細は以下のPull Requestをご覧ください。 MachineryはSQS以外のキューにも対応していますが、今回はSQSでしか検証を行っていないので新規でSQS専用の関数を作成しています。

github.com

再度メッセージ受信中にジョブワーカーをシャットダウンさせてみましたが可視性タイムアウトは発生しなくなりました!

めでたし!

まとめ

いかがだったでしょうか。 バクラクではGoを書きたいエンジニアを大募集しています!

興味がある方はぜひカジュアル面談しましょう! open.talentio.com