K8s と Kafka 環境における Apache Flink:PyFlink、Go、運用、およびマネージド価格設定

ステートフルストリーミング、チェックポイント、K8s、PyFlink、Go。

目次

Apache Flink は、有界および無界のデータストリームに対して状態付きの計算を行うためのフレームワークです。

チームは、イベントタイムセマンティクス(ウォーターマーク)による正確低遅延なストリーミング、フォールトトレランス(チェックポイント)、制御されたアップグレード(セーブポイント)、および運用面(メトリクスと REST)のためにそれを採用します。

Apache Flink ストリーム処理

本ガイドは、DevOps および Go/Python の開発者を対象としています。デプロイモデル(セルフマネージドとマネージド)を比較し、コアアーキテクチャを説明し、Kubernetes(Helm と Operator)およびスタンドアロンセットアップをカバーし、Flink と Spark、Kafka Streams、Beam、ストリーミングデータベースを対比し、LLM と AI 向けのパイプラインを含む PyFlink と Go の統合パターンを示します。

オブジェクトストレージ、データベース、メッセージングを含むデータインフラストラクチャのパターンに関するより広い文脈については、「AI システムのためのデータインフラストラクチャ:オブジェクトストレージ、データベース、検索、AI データアーキテクチャ」を参照してください。

Apache Flink は明確に状態付きストリーム処理エンジンとして位置づけられています。ロジックをオペレータのパイプラインとしてモデル化し、Flink はそれを管理された状態と時間セマンティクスを持つ分散データフローとして実行します。最新の Flink ドキュメントでは、プロジェクトは「有界および無界のデータストリーム上の状態付き計算のためのフレームワークおよび分散処理エンジン」として自身を説明しています。

実践的な DevOps/ソフトウェアエンジニアリングの観点から、Flink は以下の性質の少なくとも 1 つが必要である場合に適しています。

低遅延で正確性保証付きの結合/集約/ enrichが必要な場合、通常は Flink のイベントタイム処理を使用します。ここで「時間」とは、イベントが発生した時間(到着した時間ではない)であり、ウォーターマークがパイプライン内でのイベントタイムの進捗を伝達します。

大規模な状態付き計算(ローリングカウンター、セッション、不正検知ルール、特徴量エンジニアリング)が必要な場合、Flink は状態をプログラミングモデルの第一級市民として扱い、チェックポイントングを通じてフォールトトレラントにします。

運用上堅牢なストリーミング(障害、ローリングアップグレード、再起動)が必要な場合、Flink は状態とストリームの位置をチェックポイントし、ジョブが「障害が発生しない実行」として同じセマンティクスで回復し、継続できるようにします。

DevOps、Go、Python、AI チームにおける典型的なユースケース

Flink は「データパイプライン & ETL」、「ストリーミング分析」、「イベント駆動型アプリケーション」で広く使用されています(Flink ドキュメントで使用されているカテゴリ)。

DevOps + Go/Python スタックの場合、典型的なパターンは以下のようになります。

Go サービスがイベントを Kafka に生成します。Flink はそれらのイベントを消費し、状態付き処理(重複排除、ウィンドウ集約、enrich など)を実行し、派生した事実を Kafka またはデータベースに書き込みます。Flink のオペレータとチェックポイントメカニズムは、これらの状態付きパイプラインをプロダクションセーフにするために存在します。

ML/LLM チームの場合、PyFlink は明確に「機械学習予測」や Python UDF 内への機械学習モデルの読み込みなどのシナリオを呼び出しており、これは依存関係管理の動機として、「Flink ジョブをオンライン推論/特徴量エンジニアリングランタイム」とするパターンの直接的な推奨です。

Flink のランタイムは 2 つのプロセスタイプで構成されます:JobManagerTaskManagers。ドキュメントでは、クライアントがデータフローを JobManager に提出し、その後、クライアントは接続を切断(デタッチモード)するか、接続を維持(アタッチモード)できることを強調しています。

JobManagerは分散実行を調整します:スケジューリング、タスク完了/障害への対応、チェックポイントの調整、リカバリの調整。内部では、ResourceManager(スロット/リソース)、Dispatcher(REST + Web UI + ジョブごとの JobMaster 作成)、JobMaster(1 つのジョブを管理)を含みます。

TaskManagersはオペレータ/タスクを実行し、データストリームの交換/バッファリングを行います。最小スケジューリング単位はタスクスロットであり、複数のオペレータが 1 つのスロット内で実行できます(オペレータチェーンとスロット共有がこれに影響します)。

パフォーマンスとコスト制御のためのオペレータチェーンとタスクスロット

Flink はオペレータサブタスクをタスクにチェーンし、各タスクは単一スレッドで実行されます。これは、スレッドハンドオーバーとバッファリングのオーバーヘッドを減らし、スループットを増加させ、遅延を減少させるパフォーマンス最適化として説明されています。

スロットは運用上重要であり、リソーススケジューリング/分離の単位です。Flink は、各 TaskManager が 1 つ以上のタスクスロットを持つことができると述べており、スロットはスロットごとに管理メモリを予約しますが、CPU は分離しません

イベントタイム処理、ウォーターマーク、および遅延データ

Flink は複数の時間概念(イベントタイム、イグネスションタイム、プロセッシングタイム)をサポートし、ウォーターマークを使用してイベントタイムの進捗をモデル化します。

イベントタイムで動作するには、Flink はイベントにタイムスタンプを割り当て、ウォーターマークを生成する必要があります。公式の「ウォーターマークの生成」ドキュメントでは、タイムスタンプ割り当てとウォーターマーク生成がコアのビルディングブロックであり、WatermarkStrategy が一般的な戦略を構成するための標準的な方法であると説明しています。

本番システムにおけるフォールトトレランス:チェックポイントとセーブポイント

チェックポイントが存在する理由は、「Flink のすべての関数とオペレータが状態付きになり得る」ためです。状態はフォールトトレラントになるためにチェックポイントされなければなりません。チェックポイントは状態とストリームの位置の回復を可能にし、実行を「障害が発生しないセマンティクス」で再開できるようにします。

Flink は明確にセーブポイントを「Flink のチェックポイントメカニズムを通じて作成された、ストリーミングジョブの実行状態の一貫したイメージ」として定義し、停止/再開、フォーク、またはジョブの更新に使用されます。セーブポイントは安定したストレージ(例:HDFS、S3)上に保存されます。

公式の「チェックポイントとセーブポイント」ページは、バックアップとリカバリログの違いのように違いを枠組み立てます:チェックポイントは頻繁で軽量であり、Flink によって障害回復のために管理されます。セーブポイントはユーザー管理であり、アップグレードのような制御された操作に使用されます。

オープンソースの Flink ランタイムはライセンスの観点からは「無料」ですが、プロダクションではインフラと運用努力に対して支払います。

Flink は一般的なリソースマネージャ(例:YARN と Kubernetes)との統合を設計されており、スタンドアロンクラスタまたはライブラリとしても実行できます。

コンピューティングとメモリのコストは、JobManager と TaskManagers、および並列性/スロットレイアウトによって決定されます。Flink の構成ドキュメントは、分散セットアップのコアノブとして jobmanager.memory.process.sizetaskmanager.memory.process.sizetaskmanager.numberOfTaskSlotsparallelism.default を明確に呼び出しています。

ローカルディスクは、状態付きジョブの隠れたコスト要因としてよくあります。Flink は、io.tmp.dirs がローカルデータ(RocksDB ファイル、スピルされた中間結果、キャッシュされた JAR など)を保存し、このデータが削除されると「重いリカバリ操作」を強制する可能性があるため、定期的にパージされないストレージ上に置くべきであると述べています。

永続的なオブジェクト/ファイルストレージのコストは、チェックポイント/セーブポイントディレクトリによって決定されます。Flink 2.x 構成では、チェックポイントとセーブポイントは execution.checkpointing.direxecution.checkpointing.savepoint-dir を介して構成され、s3://…hdfs://… のような URI を受け付けます。

マネージドサービスは運用コストを削減しますが、プラットフォーム料金と制約を追加します。詳細はプロバイダによって異なります。

Amazon Managed Service for Apache Flink は、KPU(1 vCPU + 4 GB メモリ/KPU)で請求され、期間と KPU 数で 1 秒単位で課金されます。AWS はまた、アプリケーションごとに追加の「オーケストレーション」KPU と別々のストレージ/バックアップ料金を請求します。

Confluent Cloud for Apache Flink は使用量ベースでサーバーレスです:コンピューティングプールを作成し、ステートメントが実行されている間消費された CFU 分で請求されます。請求ページには、CFU 時間あたり $0.21(リージョンによる)の例が含まれており、コンピューティングプールの最大値を設定して支出を制限できることを強調しています。

Aiven と Alibaba Cloud は市場で注目すべきマネージド Flink プロバイダですが、公開価格と請求詳細はプラン/リージョンによって異なり、計算機または営業連絡が必要になる場合があります。正確なコストは、現在のドキュメントからリージョン+プランを引用しない限り未定として扱ってください。

Ververica は Flink 围绕のセルフマネージドとマネージドのデプロイオプションの両方を提供しており、公開ページはデプロイ選択とマネージドサービス位置づけを強調していますが、正確な価格設定は通常「連絡先/価格詳細」フローを介して処理されるため、特定の数字は通常公開では未定です。

デプロイオプション 最適化される状況 運用複雑性 主な利点 主なリスク/トレードオフ
スタンドアロンクラスタ(VM/ベアメタル) 小規模チーム、固定容量 中–高 完全制御;最もシンプルなメンタルモデル HA、オートスケーリング、アップグレードは DIY(より多くの手間)
Flink Kubernetes Operator を使用した Kubernetes ほとんどの現代のプラットフォームチーム 宣言的デプロイ;制御ループによるライフサイクル管理;Operator は Application/Session/Job デプロイをサポート Kubernetes + Operator 専門知識が必要
ネイティブ Kubernetes(Operator なし) 直接統合を望む K8s チーム 中–高 直接リソース統合;Flink-on-K8s ドキュメントで説明されている動的 TaskManager 割り当て/解放 Operator よりも独自自動化が必要
YARN Hadoop 中心のプラットフォーム YARN リソース管理と統合 Hadoop スタックの複雑さ
AWS Managed Service for Apache Flink AWS ネイティブデータスタック 低–中 マネージドオーケストレーション + スケーリングオプション;予測可能な請求単位(KPU) プラットフォーム結合;追加のアプリごとのオーバーヘッド KPU + ストレージ料金
Confluent Cloud for Apache Flink Kafka ファーストの企業、SQL ファーストのストリームアプリ サーバーレス使用量請求;CFU 分会計;支出を制限するためのコンピューティングプール CFU コスト + Kafka ネットワークコスト;サービス固有の API
Ververica マネージドオファリング Flink 専門運用を必要とする企業 低–中 「Flink 専門家」マネージドサービス位置づけ 価格設定は通常非透明(未定)

マネージドプロバイダとコスト表

価格はリージョンと時間によって変化します。あなたのリージョンの正確な数字が必要な場合は、これを起点として扱い、プロバイダの現在の価格ページで確認してください(引用されていないリージョンは未定)。

プロバイダ 「プラン」形状 請求単位 例としてのコンピューティング価格 注目すべき追加コスト要因
Amazon Managed Service for Apache Flink マネージドルランタイム KPU (1 vCPU + 4 GB) 例:KPU 時間あたり $0.11 (US East N. Virginia) アプリごとに +1 オーケストレーション KPU;実行中ストレージ;オプションの永続バックアップ
Confluent Cloud for Apache Flink サーバーレス SQL/処理 CFU 分/CFU 時間 例:CFU 時間あたり $0.21 (リージョンによる) Kafka ネットワークレートも適用;支出を制限するためのコンピューティングプール最大値
Ververica (マネージド) マネージド「Unified Streaming Data Platform」 未定 (公開ページ) 未定 プラットフォーム機能/SLA;価格設定は通常営業経由(未定)
Aiven for Apache Flink マネージドサービス 時間使用量請求モデル (プラットフォーム全体) プラン/リージョンなしでは未定 プランティア + クラウドリージョン + アドオン(未定)
Alibaba Cloud Realtime Compute for Apache Flink マネージド/サーバーレス ハイブリッド請求 (従量課金 + サブスクリプション混合) リージョン/ワークスペース詳細なしでは未定 CU ベースの制限とワークスペースモデル(詳細は様々;ここでは未定)

Flink は忙しいエコシステムに位置しています。「最良」の選択は、遅延、状態性、運用の好み、および作成モデルに依存します。

ツール 何なのか ストリーミング実行モデル 状態と exactly-once の話 得意分野 典型的な痛点
Apache Flink 状態付き計算のための分散ストリーム処理エンジン ウォーターマークによる連続ストリーミング + イベントタイム チェックポイントベースのフォールトトレランス;制御されたアップグレードのためのセーブポイント 低遅延状態付きパイプライン;複雑なイベントタイムロジック 状態、チェックポイント、アップグレードの正しい運用には規律が必要
Apache Spark Structured Streaming DataFrames/Datasets を中心に構築された Spark のストリーミングエンジン デフォルトマイクロバッチモデル(連続モードは別で議論) 分析パイプラインに強い;状態は存在するが通常より高い遅延 統一されたバッチ+ストリーム API;Spark エコシステム マイクロバッチ遅延と「ストリーミングは増分バッチである」というメンタルモデル
Kafka Streams Kafka 上でストリーム処理アプリを構築するためのライブラリ 1 レコードずつの処理 exactly-once 処理セマンティクス (EOS) をサポート シンプルな Kafka ネイティブアプリ;JVM サービスへの埋め込み JVM のみ;大規模分散コンピューティングパターンには柔軟性不足
Apache Beam 統一プログラミングモデル + SDK;ランナ(Flink、Spark、Dataflow など)で実行 ランナに依存;Beam パイプラインはランナジョブに変換 セマンティクスはランナ能力マトリクスに依存(ランナ固有) 移植性、マルチ言語パイプライン;エンジンロックイン回避 運用調整は結局ランナ固有になる
Materialize 「ライブデータレイヤー」/ストリーミング SQL DB;データ到着時に結果を増分的に更新 連続的な増分ビュー保守 製品ドキュメントで強い一貫性主張(詳細は製品固有) アプリ/AI エージェントへの最新派生ビュー提供 Flink ジョブとは異なる運用モデル;一般的なオペレータ API ランタイムではない
RisingWave ストリーム処理がマテリアライズドビューとして表現されるストリーミングデータベース 連続マテリアライズドビュー保守 SQL ファースト;エンジン固有セマンティクス Flink ジョブ構築なしの SQL 中心ストリームアプリ 任意のコードヘビーパイプラインには柔軟性不足

有用なヒューリスティック:複雑な状態付きストリーミングジョブのランタイムで、イベントタイム、オペレータロジック、デプロイに対する深い制御を望む場合、Flink は主要な候補です。SQL ファーストの増分ビューでサービングを望む場合、ストリーミングデータベースが代替案かもしれません。サービスに埋め込まれたライブラリを望む場合、Kafka Streams は競争力があります。エンジン間の 1 つの移植可能なパイプライン定義を望む場合、Beam は魅力的です。

AWS を使用したクラウドネイティブイベント駆動アーキテクチャの場合、AWS Kinesis を使用したイベント駆動マイクロサービスの構築 は、リアルタイム処理とサービス分離のための Kinesis Data Streams パターンをカバーしています。

このセクションは意図的に実用的です:構成、デプロイ、および Go/Python サービスが通常どのように Flink と相互作用するか。

Go サービス + Kafka + Flink + サービングレイヤー

Flink は、高ボリュームのイベントを耐久性のあるシグナル(カウンター、セッション、異常、enriched レコード)に変換する「状態付きミドル」であることがよくあります。チェックポイントと状態バックエンドは、そのミドルをプロダクションで信頼性高くするものです。

重要なバージョン注意Flink 2.0 以降、サポートされている構成ファイルは conf/config.yaml であり、以前の flink-conf.yaml は「もはやサポートされていません」。

小さなセルフマネージドクラスタのための最小限(説明用)の conf/config.yaml

# conf/config.yaml (Flink 2.x スタイル)
rest:
  address: flink-jobmanager.example.internal
  port: 8081

jobmanager:
  rpc:
    address: flink-jobmanager.example.internal
    port: 6123
  memory:
    process:
      size: 2048m

taskmanager:
  memory:
    process:
      size: 4096m
  numberOfTaskSlots: 2

parallelism:
  default: 2

# チェックポイントデフォルト(ジョブはコードでオーバーライド可能)
state:
  backend:
    type: rocksdb
execution:
  checkpointing:
    dir: s3://my-bucket/flink/checkpoints
    savepoint-dir: s3://my-bucket/flink/savepoints
    interval: 60 s

# パージされる tmp ディレクトリを避ける(RocksDB ファイル、キャッシュされた JAR など)
io:
  tmp:
    dirs: ["/var/lib/flink/tmp"]

これらのキーの理由:Flink の構成参照は、rest.*jobmanager.rpc.* のディスカバリ詳細、プロセスメモリキー、スロット/並列性キー、および state.backend.typeexecution.checkpointing.direxecution.checkpointing.savepoint-direxecution.checkpointing.interval を含むデフォルトチェックポイント設定を明確にドキュメント化しています。

io.tmp.dirs の選択は運用上重要であり、Flink はローカル RocksDB ファイルとキャッシュされたアーティファクトにこれを使用します。これを削除すると、重いリカバリの原因になります。

Flink 1.x を使用している場合(一部のマネージド環境ではまだ一般的です)、flink-conf.yaml を見つけることがあります。これは Flink 2.x ユーザーにとってレガシーです。

# conf/flink-conf.yaml (レガシ 1.x スタイル;Flink 2.x ではサポートされていません)
jobmanager.rpc.address: flink-jobmanager
rest.port: 8081
taskmanager.numberOfTaskSlots: 2
parallelism.default: 2

# レガシチェックポイントキーはバージョンによって異なります;説明用として扱ってください。
state.backend.type: rocksdb
state.checkpoints.dir: s3://my-bucket/flink/checkpoints
state.savepoints.dir: s3://my-bucket/flink/savepoints

移行する場合、Flink は flink-conf.yamlconfig.yaml に変換するための移行スクリプト(bin/migrate-config-file.sh)を提供します。

Flink Kubernetes Operator は Flink アプリケーションのライフサイクル管理のためのコントロールプレーンとして機能し、Helm を使用してインストールされます。

公式 Operator Helm ドキュメントから、ソースツリーチャートから、または Apache ホストのチャートリポジトリからインストールできます:

# ソースツリーのバンドルされたチャートからインストール
helm install flink-kubernetes-operator helm/flink-kubernetes-operator

# Apache ダウンロード Helm リポジトリからインストール(<OPERATOR-VERSION> を置き換える)
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-<OPERATOR-VERSION>/
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator

これらの正確なコマンドは、Operator の Helm インストールドキュメントで示されています。

FlinkDeployment CR 例(説明用)

これは、通常カスタマイズする統合ポイント(イメージ、リソース、チェックポイント場所、ログ/メトリクス)を示す簡略化された例です。Operator は制御ループを通じてこの望ましい状態を調整します。

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: realtime-sessions
  namespace: flink
spec:
  image: my-registry.example.com/flink/realtime-sessions:2026-03-06
  flinkVersion: v2_2
  serviceAccount: flink
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.backend.type: "rocksdb"
    execution.checkpointing.dir: "s3://my-bucket/flink/checkpoints/realtime-sessions"
    execution.checkpointing.savepoint-dir: "s3://my-bucket/flink/savepoints/realtime-sessions"
    execution.checkpointing.interval: "60 s"
    rest.port: "8081"
  jobManager:
    resource:
      cpu: 1
      memory: "2048m"
  taskManager:
    resource:
      cpu: 2
      memory: "4096m"
  job:
    jarURI: local:///opt/flink/usrlib/realtime-sessions.jar
    parallelism: 4
    upgradeMode: savepoint
    state: running

upgradeMode: savepoint パターンは、安全な状態付きアップグレードを望む場合に一般的です。セーブポイントは停止/再開/フォーク/更新ワークフローのために設計されており、安定したストレージ場所を指します。

PyFlink は Apache Flink の Python API であり、ML パイプラインと ETL を含むスケーラブルなバッチ/ストリームワークロードのために明確に提案されています。

PyFlink から JVM コネクタ(Kafka、JDBC など)を使用する場合、関連する JAR がジョブに利用可能であることを確認する必要があります。Flink の Python「依存関係管理」ドキュメントは 3 つの標準メカニズムを示しています:

pipeline.jars(Table API)を設定するか、add_jars()(DataStream API)を呼び出すか、サブミット時に CLI --jarfile を使用します。

この例は、Kafka から JSON イベントを読み込み、イベントタイムタイムスタンプ(有界の順序外れを含む)を割り当て、キー付き状態にユーザーごとのローリングカウントを維持し、enriched イベントを出力トピックに書き込みます。

注記:

  • KafkaSource は KafkaSource.builder() を介して構築され、ブートストラップサーバー、トピック、シリアライザーが必要です。
  • PyFlink での exactly-once Kafka シンク構成は、配信保証トランザクション ID プレフィックスの設定を必要とします。
  • チェックポイントデフォルトは Flink 構成(execution.checkpointing.*)および/またはコードで構成できます;構成キーは Flink 構成参照でドキュメント化されています。
import json
from typing import Any

from pyflink.common import Duration, Types
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.watermark_strategy import WatermarkStrategy, TimestampAssigner
from pyflink.common.configuration import Configuration

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
from pyflink.datastream.state import ValueStateDescriptor

from pyflink.datastream.connectors import DeliveryGuarantee
from pyflink.datastream.connectors.kafka import (
    KafkaSource,
    KafkaOffsetsInitializer,
    KafkaSink,
    KafkaRecordSerializationSchema,
)


class EventTimeFromJson(TimestampAssigner):
    """
    JSON ペイロードから event_time_ms を抽出します。
    期待:{"user_id":"u1","event_time_ms":1710000000000,"event":"click",...}
    """
    def extract_timestamp(self, value: str, record_timestamp: int) -> int:
        try:
            obj = json.loads(value)
            return int(obj["event_time_ms"])
        except Exception:
            # フォールバック:破損している場合はレコードタイムスタンプ(イグネスション)を使用
            return record_timestamp


class RollingCount(KeyedProcessFunction):
    def open(self, runtime_context: RuntimeContext):
        desc = ValueStateDescriptor("rolling_count", Types.LONG())
        self.count_state = runtime_context.get_state(desc)

    def process_element(self, value: str, ctx: 'KeyedProcessFunction.Context'):
        obj = json.loads(value)
        current = self.count_state.value()
        if current is None:
            current = 0
        current += 1
        self.count_state.update(current)

        # enriched イベントをエミット
        obj["rolling_count"] = current
        obj["event_time_ms"] = int(obj.get("event_time_ms", 0))
        yield json.dumps(obj)


def build_env() -> StreamExecutionEnvironment:
    # クラスタ/ジョブデフォルト(config.yaml でも設定可能)
    cfg = Configuration()
    cfg.set_string("state.backend.type", "rocksdb")
    cfg.set_string("execution.checkpointing.dir", "s3://my-bucket/flink/checkpoints/realtime-sessions")
    cfg.set_string("execution.checkpointing.interval", "60 s")
    env = StreamExecutionEnvironment.get_execution_environment(cfg)

    # PyFlink では、コネクタ JAR は利用可能である必要があります;必要に応じて env.add_jars(...) を使用します。
    # env.add_jars("file:///opt/flink/lib/flink-connector-kafka-<VERSION>.jar")

    # チェックポイントを明示的に有効化(ジョブはデフォルトをオーバーライド可能)
    env.enable_checkpointing(60_000)
    env.set_parallelism(4)
    return env


def main():
    env = build_env()

    source = (
        KafkaSource.builder()
        .set_bootstrap_servers("kafka:9092")
        .set_topics("events.raw")
        .set_group_id("realtime-sessions-v1")
        .set_value_only_deserializer(SimpleStringSchema())
        .set_starting_offsets(KafkaOffsetsInitializer.earliest())
        .build()
    )

    watermark_strategy = (
        WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(10))
        .with_timestamp_assigner(EventTimeFromJson())
    )

    stream = (
        env.from_source(source, watermark_strategy=watermark_strategy, source_name="kafka-events-raw")
        .key_by(lambda s: json.loads(s)["user_id"])
        .process(RollingCount(), output_type=Types.STRING())
    )

    record_serializer = (
        KafkaRecordSerializationSchema.builder()
        .set_topic("events.enriched")
        .set_value_serialization_schema(SimpleStringSchema())
        .build()
    )

    sink = (
        KafkaSink.builder()
        .set_bootstrap_servers("kafka:9092")
        .set_record_serializer(record_serializer)
        .set_delivery_guarantee(DeliveryGuarantee.EXACTLY_ONCE)
        .set_transactional_id_prefix("realtime-sessions-txn")
        .build()
    )

    stream.sink_to(sink)

    env.execute("realtime-sessions-pyflink")


if __name__ == "__main__":
    main()

上記の API 呼び出しは、PyFlink の KafkaSource ビルダー使用パターンと必須フィールドと一致します。 配信保証については、PyFlink の KafkaSinkBuilder ドキュメントは、DeliveryGuarantee.EXACTLY_ONCE に対してトランザクション ID プレフィックスを設定する必要があると明確に述べています。 タイムスタンプ/ウォーターマークについては、Flink のウォーターマークドキュメントはタイムスタンプ割り当てとウォーターマーク生成をイベントタイム処理のメカニズムとして説明し、PyFlink はこのモデルを反映する WatermarkStrategy API を提供します。

Go には Java/Python のようなネイティブ Flink ジョブ作成 API がないため、Go システムは通常以下を通じて Flink と統合します:

  • Kafka(または他のブローカー)をイグネスション/エグレスとして。
  • 運用アクション(JAR アップロード、ジョブ開始、ジョブステータス照会、セーブポイントトリガー、リスケール)のための Flink REST API。

Kafka セットアップとローカル開発パターンについては、「Apache Kafka クイックスタート - CLI とローカル例で Kafka 4.2 をインストール」を参照してください。

Go Kafka プロデューサ/コンシューマ例(kafka-go)

package main

import (
	"context"
	"log"
	"time"

	"github.com/segmentio/kafka-go"
)

func main() {
	ctx := context.Background()

	// プロデューサ:生イベントを記述
	writer := &kafka.Writer{
		Addr:         kafka.TCP("kafka:9092"),
		Topic:        "events.raw",
		RequiredAcks: kafka.RequireAll,
	}
	defer writer.Close()

	err := writer.WriteMessages(ctx, kafka.Message{
		Key:   []byte("user:u1"),
		Value: []byte(`{"user_id":"u1","event_time_ms":1710000000000,"event":"click"}`),
		Time:  time.Now(),
	})
	if err != nil {
		log.Fatal(err)
	}

	// コンシューマ:enriched イベントを読み取る
	reader := kafka.NewReader(kafka.ReaderConfig{
		Brokers:  []string{"kafka:9092"},
		Topic:    "events.enriched",
		GroupID:  "go-debug-consumer",
		MinBytes: 1e3,
		MaxBytes: 10e6,
	})
	defer reader.Close()

	msg, err := reader.ReadMessage(ctx)
	if err != nil {
		log.Fatal(err)
	}
	log.Printf("enriched key=%s value=%s\n", string(msg.Key), string(msg.Value))
}

これは「配管」コードですが、最も一般的な実用的な統合面です:Kafka トピックは Flink とカスタムサービスの境界です。

Flink の REST API は JobManager Web サーバーの一部であり、デフォルトではポート 8081 でリッスンします(rest.port で構成可能)。

ディスパッチャーの公式 OpenAPI 仕様は /jars/upload を含み、明確に述べています:

  • JAR アップロードはマルチパートデータとして送信する必要があります
  • Content-Type ヘッダーを application/x-java-archive に設定する必要があります
  • -F jarfile=@path/to/flink-job.jar を使用した curl 例を提供します

JAR をアップロードするための実用的な Go スニペット:

package flink

import (
	"bytes"
	"context"
	"fmt"
	"io"
	"mime/multipart"
	"net/http"
	"os"
)

func UploadJar(ctx context.Context, flinkBaseURL, jarPath string) (*http.Response, error) {
	f, err := os.Open(jarPath)
	if err != nil {
		return nil, err
	}
	defer f.Close()

	var body bytes.Buffer
	w := multipart.NewWriter(&body)

	part, err := w.CreateFormFile("jarfile", "job.jar")
	if err != nil {
		return nil, err
	}
	if _, err := io.Copy(part, f); err != nil {
		return nil, err
	}
	if err := w.Close(); err != nil {
		return nil, err
	}

	req, err := http.NewRequestWithContext(ctx, http.MethodPost, flinkBaseURL+"/jars/upload", &body)
	if err != nil {
		return nil, err
	}

	// 重要:マルチパートバウンダリ
	req.Header.Set("Content-Type", w.FormDataContentType())

	// 一部のクライアントも仕様の curl 例と同様に "Expect:" を設定します。
	req.Header.Set("Expect", "")

	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		return nil, err
	}
	if resp.StatusCode >= 300 {
		return resp, fmt.Errorf("upload failed: %s", resp.Status)
	}
	return resp, nil
}

このコードは、マルチパート要件と curl 参照を含む /jars/upload の REST API OpenAPI 説明に基づいています。

以前アップロードされた JAR を実行するには、Flink は /jars/{jarid}/run を公開し、クエリパラメータ(および/または JSON 本体)を介してプログラム引数を渡すことをサポートします。

運用上価値のあるエンドポイント(自動化される可能性が高い):

  • /jobs/jobs/{jobid}:ジョブ状態の一覧表示と検査
  • /jobs/{jobid}/savepoints:セーブポイントトリガー(非同期トリガー + ポーリング)
  • /jobs/{jobid}/rescaling:リスケールトリガー
懸念事項 PyFlink (Python ジョブ) Go (Flink 回りのサービス)
Flink ロジックの作成 DataStream/Table API を介したネイティブ作成;状態 + タイマーサポート ネイティブ Flink API なし;Flink (Java/Python) でロジックを実装し、外部から統合
コネクタ/依存関係 pipeline.jarsadd_jars、または --jarfile を介してコネクタ JAR を配信する必要があります 適用不可(Flink 内では実行していないが)、Kafka/DB クライアントを管理
イグネスション/エグレス PyFlink の KafkaSource/KafkaSink ビルダー Kafka プロデューサ/コンシューマライブラリ;標準マイクロサービスパターン
運用自動化 Flink REST エンドポイントを呼び出せる 自動化を所有することが多い:JAR アップロード、デプロイ、リスケール、REST 経由のセーブポイントトリガー

Flink は、Flink 構成ファイルでメトリクスレポーターを構成することでメトリクスエクスポートをサポートします。これらのレポーターは JobManager と TaskManagers でインスタンス化されます。

Prometheus については、Flink はサポートされている Flink バージョン環境で metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory で構成されたときに Prometheus 形式のメトリクスを公開します。

通常、これに Kubernetes ServiceMonitors(Prometheus Operator)またはマネージド監視スタックを組み合わせます。

スケーリング:並列性、スロット、オペレータベースのオートスケーリング

Flink のスケジューリングモデルはタスクスロットを介して実行リソースを定義し、各スロットは並列タスクのパイプラインを実行できます。

手動スケーリングの場合、REST API はジョブ(/jobs/{jobid}/rescaling)のリスケールエンドポイントを非同期操作として提供します。

Flink Kubernetes Operator を使用した Kubernetes 上の場合、Operator プロジェクトは機能セットの一部として「Flink ジョブオートスケーラ」を宣伝しており、ワークロードが大幅に変化する場合は評価価値があります。

バックアップと安全なアップグレード:チェックポイントとセーブポイント

チェックポイントは自動回復のためのものであり、Flink によって管理されます。セーブポイントはユーザー駆動のライフサイクル操作(停止/再開/フォーク/アップグレード)のためです。

SRE の観点から:

  • チェックポイントは「障害を通じてパイプラインを稼働させる」ために使用します。
  • セーブポイントは「状態を失わずに新しいバージョンをデプロイする」ために使用します。

Flink の REST API はセーブポイントの非同期トリガーもサポートしており、GitOps スタイルの「デプロイ → セーブポイントトリガー → アップグレード」ワークフローに有用です。

CI/CD:GitOps + Helm + REST ジョブサブミッション

Kubernetes の場合:

  • Operator インストールと FlinkDeployment CR を Git に保ち、Argo CD/Flux を介してデプロイし、ビルドごとにコンテナイメージをバージョン管理します。Operator Helm ドキュメントは明確に「Argo CD との作業」について議論しています。

スタンドアロン/セッションクラスタの場合:

  • 不変アーティファクトデプロイのために Flink REST API JAR アップロードと実行エンドポイントを使用します。

また、微妙だが価値のあるセキュリティ/運用トグルに注意してください:web.submit.enable は Web UI 経由のアップロードを管理しますが、ドキュメントは、無効化されていてもセッションクラスタは REST リクエストを介してジョブサブミッションを受け入れると述べています。これは、CI/CD 自動化を維持しながら UI 表面を強化する際に関連します。

LLM システムは、リアルタイムコンテキストの質に依存することがよくあります。Flink は、LLM/AI スタックにおいて「常に最新」な特徴量、埋め込み、行動集約を生成するコンポーネントとして適合します。

一般的なパターンは以下の通りです:

  • ユーザーアクション/イベントをイグネスト、
  • セッションと好みを集約、
  • 埋め込み生成タスクを生成、
  • ベクトルストアと/または特徴量ストアに埋め込みを書き込む。

PyFlink の依存関係管理ドキュメントは明確に「機械学習予測」と Python UDF 内への ML モデルの読み込み(リモートクラスタ実行のため)を呼び出し、これは「Flink オペレータ内のオンライン推論」アプローチに直接マッピングします。

推薦とランキングのためのオンライン特徴量ストア更新

Flink のキー付き状態とチェックポイントモデルは、イベント間でオペレータ状態を維持し、それを信頼性高く回復するために構築されています。これは、下流の推薦エンジンが必要とする連続特徴量計算(ローリングレート、カウント、時間減衰メトリクス)との自然な一致です。

AI パイプラインのための実践的な遅延/一貫性のトレードオフ

アーキテクチャがエンドツーエンドの exactly-once セマンティクスを必要とする場合(例:重複特徴量更新または重複課金イベントを回避)、チェックポイントとトランザクション保証を中心にシンクとソースを構築します。

Kafka ベースのスタックの場合特に:

  • Flink の Kafka コネクタは、チェックポイントが有効化され、配信保証オプションが構成されたときに exactly-once 保証を提供できます。
  • Kafka Streams も exactly-once セマンティクス (EOS) をサポートしており、「AI 特徴量パイプライン」が Flink クラスタではなくアプリケーションコード内に収まる場合に関連します。

Flink をリアルタイム AI コンテキストビルダーとして

この図は Flink のコアプリミティブに基づいています:イベントタイム処理(ウォーターマーク)、状態バックエンド(state.backend.type とシステム管理のローカル状態)、およびフォールトトレランスと運用のためのチェックポイント/セーブポイントメカニズム。