mizulba
非同期バルク処理とキュー設計
非同期バルクジョブは処理サイズ別レーンと予約済み実行枠で短時間ジョブの待ちを防ぐ
5日前
非同期バルク処理で、数件から数十万件まで同じキュー・同じ worker プールへ流すと、少数の長時間ジョブが worker 枠を占有し、数件の短時間ジョブまで開始待ちになる。FIFO キューのメッセージグループやテナント単位ロックは「同一主体内の直列化」には有効だが、worker 枠そのものを長時間ジョブが占有する問題は解決しない。
判断基準
- 小規模ジョブに対して「いつでも同程度の開始体験」を求めるなら、小規模ジョブ用の queue / topic / priority lane と、そこ専用の予約済み worker 枠を持つ。
- 大規模ジョブは別レーンに流し、低めの並列度・長めの visibility / lease・chunk 処理でスループットを最適化する。
- 同一主体で同時実行したくない業務制約がある場合は、キュー順序だけに寄せず、DB の active lock やユニーク制約でジョブ作成時点に排他を確定する。これにより UI は「待たせる」ではなく即座に競合を返せる。
- cold start が大きい実行基盤を小規模ジョブに使うと開始体験が悪化する。小規模は温かい worker、巨大処理は専用 task / bulk lane という分離が扱いやすい。
落とし穴
- 単一 FIFO キューで MessageGroupId を主体 ID にしても、worker 数が少なく長時間ジョブで埋まれば他主体の短時間ジョブは待つ。
- 優先度付きの自前 scheduler を作ると柔軟だが、lease、再実行、監視、DLQ 相当を自分で持つことになる。まずは size class による少数レーン分離で足りるかを見る。
- DB と外部キューへの二重書き込みは不整合になり得る。重要なジョブでは transactional outbox や未送信イベントのリカバリを用意する。
検証
長時間ジョブを予約済み bulk 枠いっぱいに投入した状態で、小規模ジョブを投入し、開始までの時間が小規模レーンの SLO 内に収まることを確認する。同一主体からの二重投入はジョブ作成時点で競合になり、キュー内で待ち続けないことも確認する。
SQS standard queue の MessageGroupId は fair queues のテナント識別子になる
5日前
Amazon SQS の standard queue では、MessageGroupId を付けると FIFO の順序制御ではなく fair queues のテナント識別子として扱われる。高ボリュームの tenant / client / request type が同じキューに大量メッセージを積んでも、SQS が受信順を調整して他グループの message dwell time を低く保とうとする。
適用条件
- 1 つの standard queue を複数テナント・顧客・リクエスト種別で共有している。
- 一部のグループだけ大量投入または重い処理を行い、他グループの短時間処理が待たされる noisy neighbor 問題を避けたい。
- 厳密な順序保証は不要で、高スループットと低待機時間を優先する。
使い分け
- standard queue +
MessageGroupId: テナント間の公平性を改善する。consumer 側の変更は不要で、同じMessageGroupIdのメッセージも並列処理され得る。 - FIFO queue +
MessageGroupId: 同一グループ内を直列化し順序を守る。head-of-line blocking とスループット制限を受けやすい。 - fair queues は per-tenant の実行数上限ではない。重いジョブが worker 枠を占有する問題には、処理サイズ別レーン、予約済み worker 枠、chunk 化、アプリ側の同時実行制御を併用する。
検証
負荷テストでは、1 グループに大量メッセージを投入した状態で別グループの小さいメッセージを投入し、CloudWatch の fair queues 関連メトリクスや、送信から受信までの dwell time が小さいまま保たれるかを見る。
非同期バルクワーカーで全件を単一トランザクションに包むと内側の非トランザクショナル副作用が不整合になる
3日前
バックグラウンドのバルク処理ワーカーで「全レコードの更新を1つの DB トランザクションで囲んでアトミックにする」と整合性が上がるように見えるが、そのトランザクションの内側で実行される非トランザクショナルな副作用は DB のロールバック対象外であり、途中失敗時に DB だけ巻き戻って外部状態が残る不整合を生む。
非トランザクショナルな副作用の例
- 検索インデックス(OpenSearch/Elasticsearch 等)への upsert/delete
- 外部 API 連携(カレンダー、会議、決済、通知など)
- 別コネクション/別サービスへの書き込み
なぜ危険が増幅されるか(単一更新との差)
- 単一レコード更新でも同じ構造はあるが、巻き込まれるのは1件。バルクでは1件の失敗で数千〜数万件分の DB がロールバックされる一方、それまでに反映済みの外部副作用は全部残る。
- さらにキュー消費ワーカーが「失敗時もメッセージを無条件削除」していると、リトライで再実行されて外部状態が正しい値に上書き収束する余地も消え、恒久不整合になる。
判断基準・対処
- トランザクション内では DB 書込のみに限定し、検索インデックス更新・外部 API 連携はコミット後の別フェーズに出す(commit→外部反映の順)。
- 外部反映は冪等化し、失敗は記録して後追い再実行できるようにする(outbox / 失敗レコード化)。
- トランザクション境界を全件で1つにせず chunk 単位で切ると、ロールバック範囲と保持時間を限定でき、中断後再開もしやすい。
- 「失敗=即終了・リトライなし」を選ぶなら、その前提と外部副作用の不整合可能性を設計として明示する。一時障害(インデックスの 429、デッドロック、lock wait timeout)でリトライしたいなら、失敗時はメッセージを削除せず再配信/DLQ 経路を残す。
検証
処理の途中(外部副作用を出した後)で意図的に例外を投げ、(1) DB がロールバックされること、(2) 外部システム(検索インデックス・外部 API)に部分反映が残ること、(3) リトライ経路があるなら再実行で収束し、無いなら不整合が残ることを確認する。
大きなID配列をAPIで受ける時はスキーマ上限だけでなくbody経路全体の制限を揃える
4日前
大量の ID 配列を JSON request body で受ける API では、OpenAPI やバリデーションの maxItems だけを上げても不十分。実際に通るかは、アプリの body parser limit、proxy / gateway / WAF の body 処理、保存先 payload のサイズ上限、非同期キューへ載せるメッセージサイズのすべてで決まる。
判断基準
- まず代表的な ID 長と最大件数で
JSON.stringify後の byte size を見積もる。数 MB に収まるなら通常の request body として扱いやすいが、既定の 1MB 制限では落ちることが多い。 - body parser limit は、想定 payload サイズに更新値やメタデータ分の余裕を足して設定する。上限だけを大きくしすぎると全 API のメモリ使用量と DoS 面が広がるため、可能なら対象 endpoint の性質に合わせて限定する。
- WAF が body を検査する構成では、巨大 JSON や自由入力値が body ルールに触れる可能性を確認する。ファイルや HTML でなくても、長文・URL・記号を含む更新値を受ける endpoint は誤検知緩和対象になり得る。
- 非同期処理に渡す場合、キューには巨大な ID 配列を直接載せず、task ID や payload 参照だけを送る。キューの message size 上限と再試行コストを避けられる。
検証
最大件数の payload を実際に生成して byte size を測り、body parser の limit 未満であることを確認する。境界テストでは最大件数ちょうどが通り、最大件数 + 1 がバリデーションで落ちることを確認する。WAF 配下では、自由入力値を含む最大級 payload がアプリまで届くかも疎通確認する。