本ブログはLayerX Tech Advent Calendar 2022 の 12/13 の記事です。
こんにちは、バクラクのOCRなどを開発しているTomoakiです。
コーヒーはラテよりカプチーノ派です。
Machineryとは
MachineryはGo製の非同期タスク管理フレームワークです。
バクラクのバックエンドでは主にGo言語を採用しており、非同期で実行したい処理はメッセージキューとしてAmazon SQS、メッセージの送受信やジョブワーカーの管理にはMachinery(v1)を使用しています。
OCRでも採用している技術スタックは同じで、突然大量のリクエストが来ても対応できるように非同期処理を多用しております。 以下の図のようにファイルがアップロードされてから、OCRの実行結果を返すまでの過程で非同期処理がたくさん実行されています。
OCRの非同期タスクのパイプラインの詳細のついてはこちらを参照ください。 tech.layerx.co.jp
今回起きていた事象
さて本題に入りますが、今回デプロイに伴い古いジョブワーカーをシャットダウンさせる時に、graceful shutdown しているにも関わらず、毎回数件のタスクがなぜか強制終了されてしまうという現象が観測されました。
スピードが求められる経理業務においてこれは望ましくありません。特に月初では1分1秒を争って作業しており、OCRのせいで作業を遅れさせるわけにはいきません。
調査
まずは情報収集です。ログを見てみましょう。
ジョブワーカーがシャットダウンする時のログを見てみると、terminateのシグナルを受信してから0.2秒ほどでGracefull shutdownしています。
タイムアウトになったリクエストを追ってみるとタスクがAWS SQSの可視性タイムアウトと同じ時間後再実行されていました。
SQSのメッセージが可視性タイムアウトになっているとするとメッセージの受信自体はされていそうなので、どうやらタスクの処理が完了しないままなぜかジョブワーカーがシャットダウンしていそうです。
Machineryのコードを見てみましょう。 github.com
terminateのシグナルを受信時の処理は以下のようになっています。
// Goroutine Handle SIGINT and SIGTERM signals go func() { for s := range sig { log.WARNING.Printf("Signal received: %v", s) signalsReceived++ if signalsReceived < 2 { // After first Ctrl+C start quitting the worker gracefully log.WARNING.Print("Waiting for running tasks to finish before shutting down") signalWG.Add(1) go func() { worker.Quit() errorsChan <- ErrWorkerQuitGracefully signalWG.Done() }() } else { // Abort the program when user hits Ctrl+C second time in a row errorsChan <- ErrWorkerQuitAbruptly } } }()
シャットダウンの具体的な処理はworker.Quit()の中身なので、中を見に行くと以下の関数に辿り着きます。
// StopConsuming quits the loop func (b *Broker) StopConsuming() { b.Broker.StopConsuming() b.stopReceiving() // Waiting for any tasks being processed to finish b.processingWG.Wait() // Waiting for the receiving goroutine to have stopped b.receivingWG.Wait() }
少々乱暴に説明すると、Machineryではキューからメッセージを受信するためのgoroutine(Receive)と、受信したメッセージのタスクを処理するgoroutine(Consume)の二つがあります。
上記のStopConsuming()関数を見るとConsumeを止めた後に、Receiveを止めています。
ここで仮説として、メッセージの受信中の間に、terminateのシグナルを受けると、先にメッセージを処理するgoroutine(Consume)を終了させてしまうため、受信中だったメッセージが受信完了しても処理できていないのではないかと考えてみます。
仮説をたてたら検証です。メッセージの受信中にジョブワーカーにterminateのシグナルを送ると実際にどうなるかローカルで検証してみます。
ローカルでデバッグ
デバッグはMachineryをcloneしたものを用意しそれをローカルで動かして利用しました。 go mod edit -replaceコマンドを使えば、レポジトリで使われているMachineryをデバッグ用にcloneしたMachineryに向き先を一括で変えることができるので便利です。
go mod edit -replace github.com/RichardKnop/machinery=<ローカルのMachineryへのパス>/machinery
cloneしたレポジトリにpushしてれば以下でも大丈夫です
go mod edit -replace github.com/RichardKnop/machinery=github.com/<githubのアカウント名>/machinery
ジョブワーカーにはデバッグ用のログを仕込みます。
- タスクを受信開始(start receiving)
- 受信のgoroutineの終了(stop receiving goroutine)
- 処理のgoroutineの終了(stop consuming goroutine)
にそれぞれデバッグ用にログを仕込みました。
また、上記のログに加え今回はメッセージの受信中の時にterminateのシグナルを送りたいので受信開始直後にsleep処理を入れることで受信時間を少し長くさせました。 こうすることで確実にタスクの受信中の間にterminateのシグナルを送れます。
AWS SQSはlocalstackを利用することで仮想的に本番環境を再現させました。デフォルトだと可視性タイムアウトは30秒です。
ローカルで2台(worker1、worker2と呼ぶ)のジョブワーカーを起動し、OCRのリクエストを投げます。 ジョブワーカー(worker1)がタスクの受信を開始したタイミングでそのジョブワーカー(worker1)をシャットダウンさせます。
worker1のログ
① worker1がメッセージの受信開始 INFO: 2022/11/04 09:33:08 sqs.go:91 start receiving WARNING: 2022/11/04 09:33:08 worker.go:101 Signal received: interrupt WARNING: 2022/11/04 09:33:08 worker.go:106 Waiting for running tasks to finish before shutting down INFO: 2022/11/04 09:33:08 sqs.go:121 stop consuming goroutine WARNING: 2022/11/04 09:33:08 broker.go:118 Stop channel INFO: 2022/11/04 09:33:08 sqs.go:124 stop receiving goroutine ② タスクを処理することなくworeker1が終了 {"level":"info","label":"machinery","worker":"main_worker","error":"Worker quit gracefully","id":"1c3c8599-0ad0-49a0-b808-8f20763b7f55","ts":"2022-11-04T09:33:10+09:00","caller":"cmd/worker/main.go:67"}
worker2のログ
③ worker2がメッセージの受信開始 INFO: 2022/11/04 09:33:38 sqs.go:91 start receiving
時系列で見ていきましょう
① worker1がメッセージの受信開始
まずworker1がSQSからメッセージの受信を開始したログが出ました。 このタイミングでworkerにterminiteのシグナルを送ります。 受信にはsleep処理を入れているので確実にメッセージの受信中にterminiteのシグナルが送られます。
② タスクを処理することなくworker1が終了
ログを見るとterminiteのシグナルを受けてすぐにWorker quit gracefullyというログが出ています。 タスクが正常に処理されていた場合は、処理内容のログが出るの出るのですが、それらがないことからタスクが処理されることなくジョブワーカーが終了していることがわかります。
③ worker2がメッセージの受信開始
①からちょうど1分を経過したのちworker2でメッセージの受信が開始しました。これは可視性タイムアウトの30秒と一致しています! 仮説があっている気がしてきました!
修正する
StopConsuming()でConsumeを止めた後に、Receiveを止めているのが問題なので、Receiveを先に止め、Receive中の処理が完全に消化されたらConsumeも止めてあげるようにします。
// StopConsuming quits the loop func (b *Broker) StopConsuming() { - b.Broker.StopConsuming() + b.Broker.StopRetrySQS() b.stopReceiving() - // Waiting for any tasks being processed to finish - b.processingWG.Wait() // Waiting for the receiving goroutine to have stopped b.receivingWG.Wait() + b.Broker.StopConsumingSQS() + // Waiting for any tasks being processed to finish + b.processingWG.Wait() }
修正に伴い周辺に微修正が必要な箇所もあったので詳細は以下のPull Requestをご覧ください。 MachineryはSQS以外のキューにも対応していますが、今回はSQSでしか検証を行っていないので新規でSQS専用の関数を作成しています。
再度メッセージ受信中にジョブワーカーをシャットダウンさせてみましたが可視性タイムアウトは発生しなくなりました!
めでたし!
まとめ
いかがだったでしょうか。 バクラクではGoを書きたいエンジニアを大募集しています!
興味がある方はぜひカジュアル面談しましょう! open.talentio.com