長期間のエージェントワークフローにおけるA2Aストリーミングと非同期タスク

長期実行されるA2Aタスクは、チャットセッションの終了後も継続して実行されます。

目次

ほとんどのAIエージェントのデモは、追加のステップを伴うチャット補完のように振る舞います。プロンプトを送信し、数秒待ってから、1つのレスポンスとして回答を受け取ります。

実際のアジェント(エージェント)の作業は、このパターンに適合しないことがよくあります。リサーチ、コードレビュー、調達分析、インシデント調査、およびマルチステップのプランニングは、数分から数時間かかることもあり、途中での明確化が必要になったり、部分的な結果をストリーミング配信したり、他のエージェントに委任したり、単一のテキスト返信ではなくファイルを作成したりすることがあります。これが、AIシステム クラスタ全体におけるA2Aプロトコルの非同期モデルの重要性が現れるところです。A2Aでは、一発のHTTPレスポンスではなく、ライフサイクルを持つタスクとして長時間実行される作業を扱います。クライアントはServer-Sent Events (SSE) を介して接続を維持したり、タスクの状態をポーリングしたり、接続を開いたまま維持できない場合はプッシュWebhookを登録したりすることができます。

長時間実行されるエージェントワークフローのためのA2Aストリーミングと非同期タスクライフサイクル

この記事では、これらのワークフローの運用設計について説明します。ストリーミング、ポーリング、プッシュの使い分け、input_requiredがヒューマンインザループ(人間が関与する)フローにどのように適合するか、障害処理、および本番環境で計測すべき項目などをカバーします。エージェントカード、メッセージ、パーツ、およびタスクモデルの詳細については、A2Aプロトコルとは? エージェントカードとタスクの説明 を参照してください。

なぜ長時間実行されるA2Aエージェントタスクには非同期設計が必要なのか

エージェントの作業がツール、委任、承認、および大規模なアーティファクトにまたがるようになると、同期的なリクエスト/レスポンスの思考モデルはすぐに崩壊します。エージェントタスクは内部で複数のMCPサーバーを呼び出し、A2A経由でサブタスクを他のエージェントに委任し、人間の承認を待ち、チャンク単位で大規模なアーティファクトを生成し、途中で失敗して部分的な回復が必要になり、複数のホップにわたってトークンコストが累積する場合もあります。HTTP APIはタイムアウト、バックグラウンドジョブ、およびアドホックなステータスエンドポイントでこれを近似することはできますが、A2Aはタスクのアイデンティティと状態をプロトコルに組み込むため、クライアントとゲートウェイが作業を一貫して処理することができます。非同期A2A境界を追加する前に、それらのレイヤーが本番環境のアシスタント内でどのように適合するかについては、AIアシスタントアーキテクチャ:LLM、メモリ、ツール、ルーティング、観測可能性 を参照してください。

私の偏見は実用的なものです:すべてのものに対してタスクを作成しないでください。1行のサマリーにはライフサイクルは必要ありません。作業がステートフルで、監査可能で、長時間実行され、アーティファクトを生成し、または飛行中(処理中)に入力が必要になる場合にタスクを使用します。説明記事からの経験則は依然として有効です:単純な対話はメッセージを返すことができ、複雑な作業はタスクを返す必要があります。

A2Aタスクのライフサイクルと状態遷移

A2Aタスクは、クライアントがいつでもクエリできる状態を通過します。正確な命名は実装によってわずかに異なりますが、プロトコルに従うサーバー間でモデルは安定しています。

stateDiagram-v2 [*] --> submitted submitted --> working working --> input_required input_required --> working working --> completed working --> failed working --> canceled working --> rejected submitted --> rejected input_required --> failed input_required --> canceled completed --> [*] failed --> [*] canceled --> [*] rejected --> [*]

submitted(提出済み)状態は、クライアントが作業を送信し、エージェントが受け付け、またはキューに追加したことを意味します。working(処理中)では、エージェントはアクティブに処理しており、これにはツール呼び出し、委任、または部分的な出力のストリーミングが含まれる場合があります。input_required(入力が必須)状態は、エージェントが追加の入力、明確化、または人間の承認を必要とするために一時停止していることを示しており、失敗状態ではありません。completed(完了)はアーティファクトが利用可能な終端成功であり、failed(失敗)は詳細と部分的なアーティファクトが実装に依存する終端エラーです。canceled(キャンセル済み)はクライアント、ゲートウェイ、または権限を持つ呼び出し元がタスクを停止したことを意味し、rejected(拒否済み)はポリシー、機能の不一致、または認証のためにエージェントがタスクを拒否したことを意味します。

input_requiredがワークフローを一時停止する時と失敗させる時

input_requiredを例外ではなく、意図的な一時停止として扱ってください。エージェントは、欠落したパラメータ、ポリシーの確認、または高リスクアクションへのマネージャーのサインオフなど、あなたからの何らかの要素なしには進められないことを伝えています。ワークフローは、タスクがfailedまたはrejectedに到達した場合、または呼び出し元が決して届かない入力を待ってタイムアウトを超えた場合に失敗します。したがって、承認が永遠に座り込まないように、人間のステップに対する明示的なタイムアウトを設計する必要があります。

エスカレーションなしで3日間待つ承認は、忍耐強いワークフローではなく、立ち往生したワークフローであり、立ち往生したワークフローはタスクストアを詰まらせ、観測可能性ダッシュボードの読み取りを困難にします。

A2Aタスクをキャンセルできるのは誰か

キャンセルの権限はインシデント中に議論されるのではなく、設計時に定義されるべきです。クライアントは通常、作成したタスクをキャンセルできます。ゲートウェイはテナント、ポリシー違反、または予算制限のために代理でキャンセルすることができます。また、プロトコルとポリシーが許可している場合、上流のエージェントはA2A上でオーケストレーションする際に委任された作業をキャンセルすることができます。誰が、そしてなぜキャンセルしたかをログに記録してください。マルチエージェントチェーンでは、孤立した作業が予期せぬトークン請求の原因となるためです。

input_requiredタスク状態によるヒューマンインザループ

input_requiredはA2Aの最も未活用された設計機能の1つであり、多くのチームはそれが実際にはファーストクラスのワークフロー状態であるにもかかわらず、エラーコードとして扱っています。本番環境では、エージェントが停止すべきケースに遭遇します。例えば、曖昧なリクエストに予算を費やすこと、不可逆的なアクションを実行すること、スコープの確認なしに機密データにアクセスすること、または明示的なユーザー意図を必要とする専門家に委任することなどです。これらをinput_requiredへの意図的な遷移としてモデル化し、何が必要かを説明する明確なメッセージを添えてください。

リスクのあるA2A委任のための承認フロー

エージェントAがA2A経由でエージェントBに委任し、エージェントBが人間の承認のためにinput_requiredに入った場合、次に何が起こるかに同意する必要があるシステムが3つあります。下流のエージェントは一時停止し、必要なものを公開し、オーケストレーターまたはゲートウェイはその一時停止をユーザーに表示し、ユーザーのレスポンスが新しいメッセージを介してタスクを再開します。A2A vs MCP の比較は、エージェント境界を越えた委任がツールアクセスとは異なる問題であり、承認セマンティクスが単一のMCP呼び出し内ではなくタスクレイヤーに属する理由を説明しています。UXが不便だからといって沈黙して自動承認しないでください。高価なミスは通常、モデルの欠如ではなく、利便性のショートカットから来るものです。

一時停止されたA2AタスクのためのUXパターン

ブロッキング待機は、タスクがinput_requiredから抜け出すまでUIがスピナーまたは承認カードを表示することを意味し、短い人間のステップに適しています。ノンブロッキング待機は、クライアントがタスクIDを記録し、ユーザーが他の場所での作業を続行できるようにし、入力が必要になったときにポーリングまたはプッシュを使用して通知することを意味し、モバイル、メールリンクされた承認、またはマルチタブアシスタントには必須です。人間が遅い場合のタイムアウトは、ステップごとにSLAを定義し、N時間後にfailedに遷移するか、別のキューにエスカレートすることを意味します。これは、無制限の待機がタスクストアを詰まらせ、観測可能性ダッシュボードを混乱させるためです。

A2Aゲートウェイがinput_requiredを処理する方法

A2Aゲートウェイを実行している場合、input_requiredイベントを透明にフォワードするか、複数の下流エージェントからの一時停止を1つのユーザープロンプトに集約するか、または特定のスキルがinput_requiredを離れる前に常に承認が必要であることを強制するかを決定します。承認されたアクションの認証とポリシーは専用のセキュリティ記事に属しますが、今のところ、再開されたすべてのタスクが元のリクエストと同じユーザーアイデンティティとスコープを持つことを仮定してください。

同期、SSEストリーミング、ポーリング、またはプッシュ通知の選択

A2Aは複数のインタラクションモードをサポートしており、正しい選択はどのモードが最も現代的に聞こえるかではなく、クライアントの機能とレイテンシニーズに依存します。

モード 最も適した用途 クライアント要件 トレードオフ
同期 (SendMessage, 短いタスク) 素早い作業、即座のメッセージ 単純なHTTPクライアント 遅いエージェントでのタイムアウト
SSEストリーミング ライブ進捗、インクリメンタルなアーティファクト 長寿命な接続 プロキシ、モバイルのバックグラウンド制限
ポーリング (GetTask) バッチクライアント、単純な統合 タイマー + タスクID より高いレイテンシ、より多くのリクエスト
プッシュWebhook モバイル、サーバーレス、数時間かかるジョブ HTTPS受信機 + 検証 非同期の複雑さ、セキュリティ強化

まずエージェントカードの機能フラグを読み取る

モードを選択する前に、エージェントのエージェントカードを読み取ってください。ストリーミングにはcapabilities.streaming: trueが必要であり、プッシュ通知サポートは別途宣伝されます。すべてのエージェントがストリーミングすると仮定するクライアントは、最小限の実装に対して破損します。したがって、ネゴシエーションは儀式的なものではありません。専門家がポーリングベースのステータスチェックのみをサポートする場合のランタイム障害を防ぎます。

A2Aの周りでアシスタント側ポーリングを使用するタイミング

アシスタントランタイムは、ユーザーに生のプロトコル詳細を公開するのではなく、スケジューラーループでA2Aタスクポーリングをラップする場合があります。このパターンは、目覚めて、状態を確認し、動作するバックグラウンドプロセスである一般的なポーリングエージェントと重複します。A2Aに特化したものではない、耐久性のあるスケジューリング、冪等性、およびキューパターンについては、AIアシスタントにおけるポーリングエージェント:11の実装パターン を参照してください。単一の制御プレーンから多数のA2Aタスクをオーケストレーションする際にアシスタントポーリングを使用し、クライアントがエージェント境界に直接接続する場合はネイティブなA2Aストリーミングまたはプッシュを使用します。

A2A Server-Sent Events (SSE) ストリーミング

SSEはA2Aの主要なリアルタイムチャネルです。クライアントはSendStreamingMessageを呼び出し、HTTP接続を開き、タスクが終端または中断状態に達するまでtext/event-streamレスポンスを受信します。各イベントのペイロードはJSON-RPC形状であり、典型的な結果タイプにはタスクスナップショット、ライフサイクル遷移と中間エージェントメッセージのためのTaskStatusUpdateEvent、およびappendlastChunkヒントを備えたチャンク化されたアーティファクト配信のためのTaskArtifactUpdateEventが含まれます。

sequenceDiagram participant Client participant A2A Server Client->>A2A Server: SendStreamingMessage A2A Server-->>Client: HTTP 200 text/event-stream loop Until terminal or input_required A2A Server-->>Client: TaskStatusUpdateEvent A2A Server-->>Client: TaskArtifactUpdateEvent (optional) end A2A Server-->>Client: Close stream Note over Client,A2A Server: On disconnect before terminal state,
client may call SubscribeToTask

ストリーミング進捗更新と部分的なアーティファクト

ストリーミングは、ユーザーが作業が行われているのを見るべきときに輝きます。ステップカウンター(「7つのソース中3つをレビュー済み」)、部分的なテキスト生成、大規模レポートのインクリメンタルなファイルチャンク、またはポーリングなしでworkingからinput_requiredへの状態遷移を意味する場合です。単一の最終的なブロブの周りでではなく、イベントタイプの周りでUIを設計してください。completedが到着したときのみ出力を表示する場合は、ポーリングするのと同等です。

SSE接続のドロップと再購読

ネットワークはドロップし、ラップトップはスリープし、ロードバランサーはSSE接続をアイドルタイムアウトするため、長いストリームは楽観的な仮定ではなく回復ロジックを必要とします。A2AはSubscribeToTaskを提供し、クライアントが進行中のタスクストリームに再接続できるようにします。クライアントSDKはtaskIdをローカルに永続化し、終端状態前のストリームクローズを検出し、バックオフを伴って再購読し、サーバーが重複する状態を再生する場合にイベントを重複排除するべきです。再購読ロジックなしでは、エージェントバックエンドが健全であっても、本番環境での長時間タスクは脆弱に感じられます。

A2Aプッシュ通知とWebhook

プッシュは、SSEが不適切なシナリオ、例えばバックグラウンドにあるモバイルアプリ、サーバーレスハンドラ、または数時間または数日かかるタスクに適合します。クライアントはurl(クライアント側のHTTPS Webhook)、着信POSTを検証するためのオプションのtoken、およびA2AサーバーがWebhookに認証する方法に関するオプションのauthentication詳細を備えたPushNotificationConfigを提供します。設定は初期のSendMessageまたはSendStreamingMessage呼び出しと一緒に乗り物に乗ることも、または既存のタスクに対してCreateTaskPushNotificationConfigを介して後で追加することもできます。

sequenceDiagram participant Client participant A2A Server participant Webhook Client->>A2A Server: SendMessage + PushNotificationConfig A2A Server-->>Client: taskId Note over A2A Server: Task runs asynchronously A2A Server->>Webhook: POST state change notification Webhook->>A2A Server: GetTask(taskId) A2A Server-->>Webhook: Updated Task + artifacts Webhook->>Client: Resume workflow / notify user

重要な更新が発生すると、A2AサーバーはWebhookにPOSTし、クライアントは通常、通知されたtaskIdGetTaskを呼び出して、完全な更新されたタスクとアーティファクトを取得します。プッシュは完全なペイロードトランスポートではなく、シグナルです。

開いたSSE接続よりプッシュが優れる時

クライアントがSSEを維持できない場合(モバイル、エッジ関数)、更新がトークンごとにではなく頻繁ではなくマイルストーンベースである場合、またはサーバーが切断されたワークフローエンジンを起こしたい場合にプッシュを優先してください。ユーザーがライブ進捗を見る場合、アーティファクトが多くの小さなチャンクでストリーミングする場合、または数秒以下のレイテンシが重要な場合にSSEを優先してください。

プッシュ通知をA2Aタスクに相関させる

すべてのプッシュハンドラは、taskId、元のリクエストからのトレースまたは相関ID、イベントタイプまたは状態遷移、および古いイベントを拒否するために通知からのタイムスタンプをログに記録し、伝播するべきです。再生攻撃と重複配信は本番環境で発生するため、冪等なハンドラは必須ではありません。

プッシュエンドポイントのセキュリティ概要

プッシュは、悪意のあるクライアントが内部URLを登録する際のサーバー上のSSRFリスクと、偽のPOSTがWebhookに到着する際のクライアント上のなりすましリスクを導入します。緩和策には、URLホワイトリスト、所有権検証、JWKS付き署名JWT、タイムスタンプチェック、および設定トークンの検証が含まれます。完全な脅威モデル、アイデンティティレイヤー、およびゲートウェイ制御はA2AおよびMCPエージェントセキュリティ:アイデンティティ、委任、および監査証跡 にあります。それを読むまで、Webhook検証を支払いコールバックと同じ深刻さで扱ってください。

非同期A2Aワークフローパターン

ファイアアンドフォロータスク提出

クライアントはタスクを提出し、タスクIDを即座に受信し、切断し、その後でGetTaskをポーリングするか、プッシュを待機します。これはサーバーレスおよびバッチパイプラインのデフォルトパターンですが、サーバーレス呼び出しがIDを忘れると作業を失うため、ユーザーに確認する前にタスクIDを耐久性のあるストレージに永続化するべきです。

input_required後のタスクの再開

input_requiredの後、ユーザーは同じタスクに対して新しいメッセージを送信し、エージェントはworkingに戻ります。6時間後に何が承認されたかを監査する必要があるとき、「承認済み:ベンダーXで進行」は単なる「はい」より優れているため、再開コンテキストが明示的になるようにメッセージを設計してください。

中間アーティファクトを伴う連鎖されたA2A委任

オーケストレーターがタスクT1を所有し、取得、要約、および検証を専門エージェントに委任するリサーチワークフローを検討してください。それぞれには独自のタスクIDとアーティファクトがあります。

flowchart TD U[User] --> O[Orchestrator Task T1] O -->|A2A| R[Retrieval agent T2] R --> A2[artifact: raw sources] O -->|A2A| S[Summarization agent T3] S --> A3[artifact: draft summary] O -->|A2A| V[Verification agent T4] V --> A4[artifact: fact-check report] O --> F[final artifact: recommendation memo]

各ホップには独自のタスクIDと状態機械があるため、オーケストレーターは下流のタスクを独立してストリーミングまたはポーリングし、次のホップを開始する前に中間アーティファクトを永続化し、T3が完了してもT4がドラフトを拒否する場合に優雅に失敗するべきです。マルチエージェントオーケストレーションパターン は、それらの専門家が1つのランタイム内ではなく個別のサービスとして実行される場合のトポロジー選択をカバーします。部分的進捗は価値があり、明確な理由なしに使用可能なドラフトを削除すべきではありません。

遅延完了のための耐久性のあるタスクストレージ

タスク状態とアーティファクトはプロセス再起動に耐えるべきです。エージェントがKubernetesで実行されている場合、ポッドがタスクの途中で死に、タスクレコードとアーティファクトブロブをエージェントコンテナが独占所有しないストアにバックアップすると仮定してください。

長時間実行されるA2Aワークフローのための障害処理

長時間実行されるワークフローは、タイムアウト、リトライ、部分的なアーティファクト、および安全でないキャンセルを通じて予測可能な方法で失敗し、それぞれにはクライアントコードでのアドホックな処理ではなく、明示的なポリシーが必要です。

ホップごとおよびエンドツーエンドのタイムアウト予算

2つのレベルでタイムアウトを設定してください:エスカレーションまたはキャンセル前の1つのエージェントタスクのためのホップごとの最大値、およびユーザーに表示されるワークフローのためのエンドツーエンドの最大値。ハングする取得エージェントは、ユーザーのブラウザがタイムアウトするまでオーケストレーター全体をブロックすべきではありません。

A2Aタスクのためのリトライと冪等性

冪等性なしのリトライは、二重請求、重複チケット、および繰り返しメールなどの副作用を複製します。プロトコルが許可する場所で安定したクライアントメッセージIDまたは冪等性キーを使用し、ビジネスミューテーションについては実際に機能する分散システムにおける冪等性 に沿ってください。ネットワークのブリップまたは503などの一時的な失敗のみをリトライし、rejectedまたはポリシー失敗を盲目的にリトライしないでください。コストを増幅し、下流のエージェントを煩わせることになります。

部分的なアーティファクト回復ポリシー

タスクが部分的なアーティファクトを生成した後に失敗した場合、明確な「不完全」ラベルを付けて部分的な出力をユーザーに公開するか、最後の良いチェックポイントから再開を許可するか、または医療、法律、または金融の文脈で誤解を招く可能性がある場合に部分的な出力を破棄するかを定義してください。

委任チェーン全体での安全なキャンセル

上流のユーザーが中止したときに下流のタスクをキャンセルし、キャンセルが伝播するように委任グラフを使用し、すでにコストが発生したキャンセルされたタスクをログに記録してください。財務チームはそれらに気づきます。

非同期A2Aワークフローのための観測可能性

境界をまたいで追跡できない限り、マルチエージェント非同期作業をデバッグすることはできません。これは、構造化されていないログに依存するのではなく、各ホップで識別子を相関させることを意味します。最小限の相関フィールドには、ユーザー開始ワークフローごとのトレースID、委任された子を含むエージェントタスクごとのタスクID、ホップを処理したエージェントカードまたはサービスのためのエージェントID、および委任チェーンをリンクする親タスクIDが含まれます。

タイムスタンプを伴ってすべての状態遷移をログに記録し、PIIポリシーが適用される場合、必ずしも完全なコンテンツではなく、サイズとハッシュを伴ってアーティファクト作成イベントをログに記録してください。ホップごとのコストとレイテンシを属性付けしてください。マルチエージェントワークフローは請求書が届くまでトークン支出を隠し、タスクごとのコストラベルは「どの専門家が費用のかかるのか?」という質問に答えられるようにします。メトリクス、トレーシングバックエンド、およびLLM固有の計装パターンについては、LLMシステムのための観測可能性 およびそれらのシグナルが本番テレメトリスタックにどのように適合するかを示すより広範な観測可能性 柱を参照してください。 ユーザーが「エージェントがなぜそれをしたのか?」と尋ねたとき、あなたの答えは、オーケストレーター、A2Aホップ、MCPツール呼び出し、およびすべてのinput_required一時停止にまたがるトレースであるべきであり、肩をすくめてログをgrepするものではありません。

A2Aストリーミングと非同期タスクのための本番チェックリスト

長時間実行されるA2Aパスを本番環境に出荷する前に、以下の領域を確認してください。

エージェントカードと機能

  • capabilities.streamingは実際のSSEサポートを反映している
  • 実装されている場合、プッシュ通知サポートが文書化されている
  • 人間の承認を必要とするスキルは、期待されるinput_required動作を文書化している

クライアントモード

  • SSEクライアントはSubscribeToTaskを介して再購読を処理する
  • ポーリング間隔は負荷の下でバックオフする
  • プッシュWebhookは真正性を検証し、古いイベントを拒否する

耐久性

  • タスク状態はエージェントプロセスの再起動に耐える
  • アーティファクトはエフェメラルなコンテナファイルシステムの外に保存されている
  • 中間アーティファクトは部分的な回復のために利用可能

障害とポリシー

  • ホップごとおよびエンドツーエンドのタイムアウト予算が定義されている
  • ミューティング操作のためのリトライが冪等である
  • キャンセルは委任エッジ全体に伝播する

観測可能性

  • 各ホップにトレースID + タスクID + エージェントID
  • 状態遷移がログに記録されている
  • タスクごとまたはエージェントごとのコスト属性

負荷テスト

  • リバースプロキシを介したSSE(バッファリングはストリームを壊す)
  • 開いた接続でメモリリークなしの並行長時間タスク
  • Webhookの過負荷なしのプッシュ洪水処理

結論

A2Aの価値は、作業が単一の同期的なAPI呼び出しに適合しない場合に最も明確に現れます。ストリーミング、非同期タスク、プッシュ通知、および明示的なタスク状態は、プロトコルがリサーチ、委任、承認、および大規模なアーティファクトなどの実際のエージェントワークロードを、すべてが1つのHTTPラウンドトリップで完了すると偽るのではなく、処理する方法です。機能する最も単純なモードから始め、ユーザーがライブ進捗を必要とするときにSSEを追加し、接続が開いたまま維持できないときにプッシュを追加し、input_requiredを失敗ではなくファーストクラスの設計ツールとして扱い、マルチエージェント非同期ワークフローがそれらを説明する能力を追い越さないように各ホープを計装してください。

よくある質問

A2Aストリーミングをポーリングの代わりにいつ使用するべきですか? クライアントが開いたHTTP接続を保持でき、低レイテンシ進捗更新またはインクリメンタルなアーティファクトを必要とする場合にストリーミングを使用してください。接続が信頼できない場合、クライアントがバッチ指向である場合、または長時間実行されるタスクの定期的なステータスチェックのみを必要とする場合にポーリングを使用してください。

A2Aタスクでinput_requiredは何を意味しますか? エージェントがより多くの情報または人間の承認を必要とする一時停止状態です。エラーとして扱うのではなく、それを中心にUXとタイムアウトを明示的に設計してください。

A2Aプッシュ通知はどのように機能しますか? HTTPS Webhookを備えたPushNotificationConfigを登録します。サーバーは重要な更新時にPOSTします。クライアントはGetTaskを呼び出して完全な状態とアーティファクトを取得します。

失敗したA2Aタスクをどのようにリトライすべきですか? 冪等性キーで一時的な失敗をリトライし、タイムアウト予算を尊重し、rejectedまたはポリシー失敗などの終端状態を盲目的にリトライしないでください。

長時間実行されるA2Aワークフローのために何をログに記録すべきですか? ホップ全体でトレースID、タスクID、およびエージェントIDを相関させます。状態遷移、アーティファクト、委任、承認、およびステップごとのコストをログに記録し、完全なワークフローを再構築できるようにしてください。

出典

購読する

システム、インフラ、AIエンジニアリングの新記事をお届けします。