LayerX エンジニアブログ

LayerX の エンジニアブログです。

GCPのCloud Pub/Sub Push SubscriptionをAWSで実現する

この記事はLayerX Tech Advent Calendar 2022の20日目の記事です。


こんにちは。LayerXのバクラク事業部でOpsエンジニアをやっている@civitaspoです。みなさん、GCPのCloud Pub/Sub Push Subscriptionはご存じですか?便利ですよね。この記事では、あの便利さをAWSで実現する方法を紹介したいと思います。

GCPのCloud Pub/Sub Push Subscription とは?

GCPにはCloud Pub/Subというメッセージングサービスがあります。Pub/Subモデルを実現するGCPのマネージドサービスです。Cloud Pub/Subには3つのSubscription Typeがあり、その一つがPush Subscriptionです。

cloud.google.com cloud.google.com

引用したドキュメント内の図を見ると分かるようにPush SubscriptionPull Subscriptionと比較すると非常にシンプルな構成を取ることができます1。SubscriberはHTTPSでリクエストを受け付けられるエンドポイントを用意するだけで良く、そのエンドポイントへのリクエストはPush Subscriptionがメッセージ毎に自動で行ってくれます。メッセージの処理成否をHTTPのステータスコードで表現すれば良いだけなので実装者はメッセージのAck処理等を気にすることなく実装を行えます。プラットフォームに強く依存しないコードになるのも良い点です。

このPush Subscriptionを利用すると複数サービス間における非同期処理を容易に疎結合に実装できるようになります。便利ですよね2

AWSでCloud Pub/Sub Push Subscription相当の機能を実現する

LayerXではサービス提供を主にAWSで行っているため、AWSでCloud Pub/Sub Push Subscription相当の機能を実現する方法を模索しました。

docs.aws.amazon.com

AWSの公式ドキュメントによるとPub/Subモデルの実現にはAmazon SNSを利用するパターンとAmazon Eventbridgeを利用するパターンがあります3。どちらの選択肢もPush Subscription相当の機能を持ち合わせていたので本番導入に向けて調査しました。

Amazon SNSパターン

Amazon SNSはフルマネージドなメッセージングサービスです。Cloud Pub/SubはPub/Subモデルを実現するGCPのマネージドサービスでしたが、Amazon SNSはそのAWS版と言えます。Cloud Pub/Subと異なるのはPull Subscription相当の機能が無いことです。Subscriptionとして定義出来るのはPush Subscription相当の挙動をするもののみです。ただし、HTTPSエンドポイントだけでなくAWS LambdaやEメール送信など様々なサービスとの連携が可能です。公式ドキュメントの図が一番分かりやすいので載せておきます。

引用元:『What is Amazon SNS?』

当初、概念的にCloud Pub/Subとほぼ同様のAmazon SNSを採用する想定で検証を進めていました。しかし、主に可観測性の観点から本番運用が難しいと判断して採用を見送りました。採用を見送った大きな理由を載せておきます。

メトリクスがTopic単位でしか取得出来ない

Amazon SNSで取得可能なメトリクスは以下の公式ドキュメントにまとめられています。

docs.aws.amazon.com

特筆すべきなのはほぼ全てのメトリクスがTopicよりも詳細なDimensionで取得出来ないことです。SMSを送信する用途などであれば細かいメトリクスを取得出来るのですが、SubscriptionでHTTPSエンドポイントを指定している場合はTopicよりも細かい粒度でメトリクスが取得出来ませんでした。そのため単一のTopicに対して複数のSubscriptionを紐付ける構成で問題が発生した場合、どのSubscriptionで問題が発生したのか分からず原因究明が難航、または不可能になる未来が見えました。

配信遅延の検知が難しい

メトリクスがTopic毎にしか見れないことに加えて、配信遅延の検知も難しいことが分かりました。Amazon SNSのメトリクスにはMessage Oldest AgeのようなTopic内のメッセージの処理状況に関するものが存在しません。そのため配信遅延の検知がかなり難しいのが現状です。配信失敗についてはNumberOfNotificationsFailedというメトリクスに試行毎にカウントされるため、配信先の不調による配信遅延はある程度検知が可能です。しかし、直接的に配信遅延の検知を行うわけではないため障害検知が遅れる可能性は多分に残っていました。また障害を検知出来たとしても遅延状況の把握が難しいのは運用を困難する要因となると感じました。

HTTPステータスコード400番台が「成功」扱いになる

Amazon SNSはSubscriptionでHTTPSエンドポイントを指定した場合、ステータスコードが200~499の間であれば失敗と扱いません。これはドキュメントにも明記されています。

If your endpoint does not respond before the connection times out or if your endpoint returns a status code outside the range of 200–4xx, Amazon SNS will consider the delivery of the message as a failed attempt.

引用元:『Step 1: Make sure your endpoint is ready to process Amazon SNS messages』

全てのログを出力するようにして実際に試してみたところ以下のようなログが取得出来ました4

{
    "notification": {
        "messageMD5Sum": "c9931cbf093fe7469115eca0ad1ce36b",
        "messageId": "d3784a55-b4d4-5f2f-a847-11be199f3276",
        "topicArn": "arn:aws:sns:ap-northeast-1:123456789012:layerx-example-echo-webhook",
        "timestamp": "2022-11-14 02:42:19.692"
    },
    "delivery": {
        "deliveryId": "0dc68e35-2d69-5a1e-88b6-d5de80229e5a",
        "destination": "<https://api.example.com/webhook/echo.v1.EchoService/Echo>",
        "providerResponse": "Bad Request",
        "dwellTimeMs": 113,
        "statusCode": 400
    },
    "status": "SUCCESS"
}

配信先からのステータスコードは400が返っていますが配信ステータスはSUCCESSとなっています。またAmazon SNSの送信失敗メトリクスであるNumberOfNotificationsFailedにもカウントされませんでした。そのため400番台のステータスコードが返ってきていることを検知するにはCloudwatch Logsに全ての送信ログを残す必要があります。400番台のステータスコードが返却された場合はリトライ不要ではあるものの、設定不備を修正して再送信する必要があるケースが多いため、発生を検知できる必要がありました。しかし、そのために全ての送信ログを残してログ監視を行うのは少し運用上の無駄が多すぎるという判断をしました。

まとめ

Amazon SNSは非常に簡単にやりたいことが実装出来ましたが、可観測性の観点から本番導入を見送りました。今回の調査を経てAmazon SNSでHTTPSエンドポイントを利用するのはビジネス上あまりクリティカルではない部分5に留めた方が良いのかなという感想を持ちました。なお、Eメール送信やSMS送信などHTTPSエンドポイント以外の利用に関しては今回調査していないので、今回の調査結果はAmazon SNSに対する限定的な評価であることをご認識頂ければと思います。

Amazon Eventbridgeパターン

Amazon Eventbridge6はさまざまなソースからのデータを橋渡しするサーバーレスのイベントバスサービスです。Amazon Eventbridgeはイベントバスサービスなのでサービス内の用語にはPub/Subモデルに出てくるTopicやSubscriptionの概念が出てきません。しかし、公式ドキュメントにあるようにEventBusをTopic、RuleTargetをSubscriptionと考えればPub/Subモデルのように扱うことができます。

引用元:『Pub/sub pattern - Amazon EventBridge implementation』

最終的にAmazon Eventbridgeを利用する構成を採用しました。Amazon SNSの調査で発覚したような運用上の課題はなく、また今後の拡張性に大きな期待が持てたためです。

本記事ではここから章を変えてAmazon EventbridgeでCloud Pub/Sub Push Subscription相当の機能を実現する方法を紹介しようと思います。なお、本記事ではAmazon Eventbridgeの全ての機能は説明しません。Cloud Pub/Sub Push Subscription相当の機能を実現するのに必要な要素だけを説明します。Amazon Eventbridgeの全体像を把握したい方はServerlessDays Tokyo 2022 Virtualで最高の発表があったのでそちらの資料7をご覧ください。

Amazon EventbridgeでCloud Pub/Sub Push Subscription相当の機能を実現する

ここからAmazon EventbridgeでCloud Pub/Sub Push Subscription相当の機能を実現する方法を紹介します。なお、Amazon Eventbridgeの用語を使用して説明するので、ここまで「メッセージ」と呼んでいたものはここから「イベント」と表記します。

Amazon Eventbridgeで今回の構成を実現するには5つのリソースを定義する必要があります。

リソース名 説明
EventBus イベントを受け取るためのパイプラインです。先ほど紹介したようにPub/Subモデルで言うところのTopicと考えると理解しやすいと思います。
Rule 処理したいイベントの条件を定義するリソースです。Pub/Subモデルで言うところのSubscription Filterです。条件の定義8にはJSONを使用します。
Target Ruleと送信先の組み合わせを定義するリソースです。今回は送信先にHTTPSエンドポイントを使用するので次に説明するAPI Destinationを指定します。
API Destination HTTPSエンドポイントに投げるリクエストを定義します。PathやQuery Paramsに*を使用することができ、Target定義時に動的に値を与えることが可能です。
API Connection HTTPSエンドポイントの接続情報を定義します。Basic 認証・OAuth認証・API Key認証から認証方法を選択します。認証に使用する秘密情報は自動でAWS Secrets Managerに格納されます9

これら5つのリソースを定義することでCloud Pub/SubのPush Subscription相当の機能を実現出来ます。イベントをEventBusに送付し、Targetに定義されたRuleにマッチするイベントがあれば、API DestinationやAPI Connectionの設定に基づいてHTTPSエンドポイントにリクエストを送付される、という流れです。

Cloud Pub/SubでPush Subscriptionを定義する場合にはSubscriptionという一つのリソースとして定義するリソースを、Amazon Eventbridgeで定義する場合はRule/Target/API Destination/API Connectionという4つのリソースに分解して定義します。参考までにterraformで定義する場合のコードを載せておきます。

resource "aws_cloudwatch_event_bus" "sample" {
  name = "sample"
}
resource "aws_cloudwatch_event_connection" "sample_webhook" {
  name               = "sample-webhook"
  authorization_type = "API_KEY"
  auth_parameters {
    api_key {
      key   = "x-sample-api-key"
      value = "dummy" # note: This value will be stored to Secrets Manager.
    }
  }
resource "aws_cloudwatch_event_api_destination" "sample_webhook" {
  name                = "sample-webhook"
  invocation_endpoint = "https://api.sample.com/webhook/*"
  http_method         = "POST"
  connection_arn      = aws_cloudwatch_event_connection.sample_webhook.arn
}
resource "aws_cloudwatch_event_rule" "sample" {
  for_each       = toset(["eventA", "eventB", "eventC"])
  name           = each.key
  event_bus_name = "sample" # note: `aws_cloudwatch_event_bus.sample` dose not have the `name` attribute.
  event_pattern  = jsonencode({ "detail-type" = [each.key]})
}
resource "aws_cloudwatch_event_target" "sample" {
  for_each = {
    for v in [
      {
        "event_name" = "eventA"
        "path"       = "echoA"
      },
      {
        "event_name" = "eventB"
        "path"       = "echoB"
      },
      {
        "event_name" = "eventB"
        "path"       = "echoB"
      },
    ]: "${v.event_name}--${v.path}" => v
  }
  rule           = aws_cloudwatch_event_rule.samle[each.value.event_name].name
  event_bus_name = "sample" # note: `aws_cloudwatch_event_bus.sample` dose not have the `name` attribute.
  target_id      = each.key
  arn            = aws_cloudwatch_event_api_destination.sample_webhook.arn
  role_arn       = aws_iam_role.eventbridge_sample_webhook_invoker.arn
  http_target {
    path_parameter_values = [each.value.path]
  }
}
resource "aws_iam_role" "eventbridge_sample_webhook_invoker" {
  name               = "eventbridge-sample-webhook-invoker"
  assume_role_policy = data.aws_iam_policy_document.eventbridge_trust_policy.json
}
resource "aws_iam_role_policy" "eventbridge_sample_webhook_invoker" {
  name   = "eventbridge-sample-webhook-invoker"
  role   = aws_iam_role.eventbridge_sample_webhook_invoker.id
  policy = data.aws_iam_policy_document.eventbridge_sample_webhook_invoker.json
}
data "aws_iam_policy_document" "eventbridge_trust_policy" {
  statement {
    effect  = "Allow"
    actions = ["sts:AssumeRole"]
    principals {
      type        = "Service"
      identifiers = ["events.amazonaws.com"]
    }
  }
}
data "aws_iam_policy_document" "eventbridge_sample_webhook_invoker" {
  statement {
    sid       = "APIDestinationAccess"
    effect    = "Allow"
    actions   = ["events:InvokeApiDestination"]
    resources = [aws_cloudwatch_event_api_destination.sample_webhook.arn]
  }
}

可観測性の担保

Amazon Eventbridgeでは以下のメトリクスが取得可能です。

docs.aws.amazon.com

メトリクスはRule毎に取得可能なのでCloud Pub/Subで言うところのSubscription毎にメトリクスが取得出来ると言えます。また、HTTPSエンドポイントが400番台のステータスコードを返した場合はFailedInvocationメトリクスにカウントされます10。配信遅延の検知に関してもIngestionToInvocationStartLatencyメトリクスとRuleごとのFailedInvocationメトリクスを駆使して実現出来ます11。ただし、Ruleごとの配信遅延状況を把握するのはまだ難しい状況なので、Ruleごとに処理できていない最古のイベントと現在時刻との差分がメトリクスとして提供されることを願っています。

Amazon EventbirdgeはAWS X-Rayを使用した分散トレーシングもサポートしています。

aws.amazon.com

弊社ではAWS X-RayではなくDatadog APMを使用して分散トレーシングを実現しているため、EventにDatadogのトレースIDを載せて送信することで分散トレーシングに参加させています。

イベントの保存と再生

Amazon EventbridgeはEventBusに送信されたイベントの保存(Archive)再生(Replay)をサポートしています。

docs.aws.amazon.com

イベントの保存に関しては設定次第で無期限に保存することも可能です。イベントの再生には保存されたイベントを使用します。Ruleと再生対象時間を指定することで過去の特定のイベントを何度でも再生することができます。Amazon Eventbridgeが機能としてイベントの再生を提供しているため、再実行に職人芸が不要になります。トラブルが発生してもコードを修正してから誰でも容易に再実行できるのは運用負荷を下げる一要因になるのでありがたい機能です。

「Amazon Eventbridge無限課金編!ガハハハ!」と言える余裕を。

Amazon Eventbridgeはトラブルシューティングに関する公式ドキュメントを提供しています。その中に「無限ループの防止」という項目があります。

docs.aws.amazon.com

Amazon EventbridgeではRuleとTargetの設定、及びイベントソースの設定ミスによりイベントの無限ループが発生する可能性があります。特に今回の利用で想定しているようなAPI間イベントを受け渡しするケースでは意図せず無限ループが発生することが想定されます。無限ループが発生すると無限に課金され続けるので「Amazon Eventbridge無限課金編!ガハハハ!」とか言っている余裕はありません。そのため、 Invocationメトリクスの異常増加検知やBillingの異常増加検知に加えて、コードを静的解析して無限ループが発生していることを検知する仕組みを導入しようと考えています。

その他

その他、LayerXでEventbirdgeを利用するにあたって行っている設定や想定について箇条書きしておきます。

  • Amazon Eventbrigdeからリクエストを投げるHTTPS Endpointは公開エンドポイントである必要があります。そのため、エンドポイントにはAWSのIP許可とAPI Key認証を入れてAmazon Eventbridgeからのリクエストのみを通す設定を行っています。
  • サービス間通信にはgRPCを使用しているため、Amazon Eventbridgeがリクエストを投げるHTTPS EndpointはリクエストをgRPCサービスへプロキシしています。その際処理を効率化するため送信するイベントにはprotocol buffersのバイナリをBase58でエンコードした文字列を格納しています。
  • 順序保証が必要な要件が出てきたらAmazon SQSFIFOキューAmazon Eventbridge Pipesを組み合わせて実現する想定しています。

おわりに

この記事ではGCPのCloud Pub/SubにおけるPush Subscription相当の機能をAWSで実現する方法について書きました。そして、Amazon Eventbridgeを用いることでそれが実現出来ることを書きました。この記事が似たようなことをやりたいと思っている方々へ届くことを願っています。


  1. BigQuery Subscriptionも大きく分けるとPush Subscriptionに分類されるでしょう。
  2. 個人の感想です。
  3. 他にもAmazon Kinesisを利用する方法がありますが実現したいことに対してコストがかかりすぎるので検討から外しました。
  4. topicArnとdestinationのURLはサンプル用のものに変更しています。
  5. たとえばCI/CDの通知など
  6. Amazon EventbridgeはかつてAmazon CloudWatch Eventsと呼ばれていたサービスでAWS内のリソース変更イベントを橋渡しするサービスでした。
  7. 神資料: 見せてやるよ、EventBridge の本気ってやつをな / The art of EventBridge - Speaker Deck
  8. Amazon EventBridge event patterns - Amazon EventBridge
  9. 逆にAWS Secrets ManagerのARNを指定したりすることはできません。
  10. 『API destinations - API destination error codes』にあるようにステータスコード400番台は429を除きリトライされません。
  11. 24時間処理されなかったイベントはDLQに格納されるのでDLQの監視も必須でしょう。