Apache Flink auf K8s und Kafka: PyFlink, Go, Betrieb und verwaltetes Preismodell

Zustandsbehaftetes Streaming, Checkpoints, K8s, PyFlink, Go.

Inhaltsverzeichnis

Apache Flink ist ein Framework für zustandsbehaftete Berechnungen über unendliche und endliche Datenströme.

Teams setzen es für korrekte, latenzarme Stream-Verarbeitung mit Event-Time-Semantik (Watermarks), Fehlertoleranz (Checkpoints), kontrollierte Upgrades (Savepoints) und operative Schnittstellen (Metriken und REST) ein.

Apache Flink Stream-Verarbeitung

Dieser Leitfaden richtet sich an DevOps- und Go/Python-Entwickler. Er vergleicht Bereitstellungsmodelle (selbstverwaltet vs. verwaltet), erklärt die Kernarchitektur, behandelt Kubernetes (Helm und Operator) sowie Standalone-Setups, stellt Flink im Vergleich zu Spark, Kafka Streams, Beam und Streaming-Datenbanken dar und zeigt Integrationsmuster für PyFlink und Go, einschließlich von LLM- und KI-orientierten Pipelines.

Für einen breiteren Kontext zu Dateninfrastruktur-Mustern, einschließlich Objektspeicherung, Datenbanken und Messaging, siehe Dateninfrastruktur für KI-Systeme: Objektspeicher, Datenbanken, Suche & KI-Datenarchitektur.

Apache Flink ist explizit als zustandsbehafteter Stream-Processing-Engine positioniert: Sie modellieren Ihre Logik als eine Pipeline von Operatoren, und Flink führt sie als verteilten Datenfluss mit verwaltetem Zustand und Zeitsemantik aus. In der modernen Flink-Dokumentation beschreibt das Projekt sich selbst als Framework und verteilte Verarbeitungsengine für zustandsbehaftete Berechnungen über unendliche und endliche Datenströme.

Aus der praktischen Perspektive von DevOps und Softwareentwicklung ist Flink eine gute Wahl, wenn Sie mindestens eine der folgenden Eigenschaften benötigen:

Wenn Sie Join/Aggregation/Anreicherung mit niedriger Latenz und Korrektheitsgarantien benötigen, verwenden Sie typischerweise die Event-Time-Verarbeitung von Flink, bei der „Zeit" den Zeitpunkt des Ereignisses bezeichnet (nicht den Zeitpunkt des Eintreffens) und Watermarks den Fortschritt der Event-Time durch die Pipeline kommunizieren.

Wenn Sie zustandsbehaftete Berechnungen im großen Maßstab benötigen (laufende Zähler, Sessions, Betrugsregeln, Feature-Engineering), behandelt Flink den Zustand als ersten Klassenteil des Programmiermodells und macht ihn fehlertolerant durch Checkpointing.

Wenn Sie operativ robuste Stream-Verarbeitung benötigen (Fehler, Rolling Upgrades, Neustarts), sichert Flink den Zustand und die Stream-Positionen, sodass der Job wiederhergestellt werden kann und mit der gleichen Semantik „als fehlerfreie Ausführung" fortgesetzt werden kann.

Typische Anwendungsfälle für DevOps-, Go-, Python- und KI-Teams

Flink wird häufig für „Datenpipelines & ETL", „Streaming-Analysen" und „ereignisgesteuerte Anwendungen" verwendet (die Kategorien, die in der Flink-Dokumentation verwendet werden).

Für einen DevOps + Go/Python-Stack sehen typische Muster wie folgt aus:

Ein Go-Dienst erzeugt Ereignisse für Kafka; Flink konsumiert diese Ereignisse, führt zustandsbehaftete Verarbeitung durch (z. B. Deduplizierung, Fenster-Aggregation, Anreicherung) und schreibt abgeleitete Fakten zurück nach Kafka oder in eine Datenbank. Die Operator- und Checkpointing-Mechanismen von Flink existieren, um diese zustandsbehafteten Pipelines produktionsfähig zu machen.

Für ML/LLM-Teams nennt PyFlink explizit Szenarien wie „Maschinelles Lernen-Vorhersagen" und das Laden von Machine-Learning-Modellen innerhalb von Python-UDFs als Motivation für das Abhängigkeitsmanagement, was eine direkte Unterstützung von „Flink-Job als Online-Inferenz-/Feature-Engineering-Laufzeit"-Mustern darstellt.

Die Laufzeit von Flink besteht aus zwei Prozesstypen: JobManager und TaskManagers. Die Dokumentation betont, dass Clients den Datenfluss an den JobManager übermitteln; der Client kann dann die Verbindung trennen (abgelöster Modus) oder verbunden bleiben (angefügte Modus).

Der JobManager koordiniert die verteilte Ausführung: Planung, Reaktion auf Aufgabenabschluss/Fehler, Koordinierung von Checkpoints und Koordinierung der Wiederherstellung. Intern umfasst er: ResourceManager (Slots/Ressourcen), Dispatcher (REST + Web-UI + pro Job JobMaster-Erstellung) und JobMaster (verwaltet einen Job).

Die TaskManagers führen die Operatoren/Aufgaben aus und austauschen/puffern Datenströme. Die kleinste Planungseinheit ist der Task-Slot; mehrere Operatoren können in einem Slot ausgeführt werden (Operator-Ketten und Slot-Sharing beeinflussen dies).

Operator-Ketten und Task-Slots für Leistungs- und Kostenkontrolle

Flink verknüpft Operator-Subaufgaben zu Tasks, wobei jeder Task von einem einzelnen Thread ausgeführt wird. Dies wird als Leistungsoptimierung beschrieben, die den Thread-Handover und Puffer-Overhead reduziert, die Durchsatzrate erhöht und die Latenz verringert.

Slots sind operativ wichtig, da sie die Einheit der Ressourcenplanung/-isolation sind. Flink stellt fest, dass jeder TaskManager einen oder mehrere Task-Slots haben kann; Slotting reserviert verwalteten Speicher pro Slot, isoliert jedoch keine CPU.

Event-Time-Verarbeitung, Watermarks und verspätete Daten

Flink unterstützt mehrere Zeitbegriffe – Event-Time, Ingestions-Time, Verarbeitungs-Time – und verwendet Watermarks, um den Fortschritt in der Event-Time zu modellieren.

Um mit Event-Time zu arbeiten, benötigt Flink Zeitstempel für Ereignisse und generierte Watermarks; die offizielle Dokumentation zu „Erstellen von Watermarks" erklärt die Zeitstempelzuweisung und Watermark-Generierung als Kernbausteine, wobei WatermarkStrategy der Standardweg ist, um gängige Strategien zu konfigurieren.

Fehlertoleranz: Checkpoints gegenüber Savepoints in realen Systemen

Checkpointing existiert, weil „jede Funktion und jeder Operator in Flink zustandsbehaftet sein kann"; der Zustand muss checkpointed werden, um fehlertolerant zu sein. Checkpoints ermöglichen die Wiederherstellung sowohl des Zustands als auch der Stream-Positionen, sodass die Ausführung mit fehlerfreier Semantik fortgesetzt werden kann.

Flink ist sehr explizit, dass Savepoints „ein konsistentes Bild des Ausführungszustands eines Streaming-Jobs sind, das über den Checkpointing-Mechanismus von Flink erstellt wird" und zum Stop-and-Resume, Fork oder Update von Jobs verwendet werden. Savepoints befinden sich auf stabilem Speicher (z. B. HDFS, S3).

Die offizielle Seite „Checkpoints vs. Savepoints" rahmt den Unterschied wie Backups vs. Wiederherstellungsprotokolle ein: Checkpoints sind häufig, leichtgewichtig und von Flink für die Fehlerwiederherstellung verwaltet; Savepoints sind vom Benutzer verwaltet und für kontrollierte Operationen wie Upgrades verwendet.

Die Open-Source-Flink-Laufzeit ist im Sinne der Lizenzierung „kostenlos", aber in der Produktion zahlen Sie für Infrastruktur und operativen Aufwand.

Flink ist darauf ausgelegt, sich mit gängigen Ressourcenmanagern (z. B. YARN und Kubernetes) zu integrieren und kann auch als Standalone-Cluster oder als Bibliothek ausgeführt werden.

Rechen- und Speicherkosten werden durch JobManager und TaskManagers sowie durch Ihr Parallelitäts-/Slot-Layout bestimmt. Die Konfigurationsdokumentation von Flink nennt explizit jobmanager.memory.process.size, taskmanager.memory.process.size, taskmanager.numberOfTaskSlots und parallelism.default als Kernknöpfe für verteilte Setups.

Lokaler Datenträger ist ein häufiger versteckter Kostenfaktor für zustandsbehaftete Jobs. Flink stellt fest, dass io.tmp.dirs lokale Daten speichert, einschließlich RocksDB-Dateien, verschütteter Zwischenergebnisse und gecachter JARs; wenn diese Daten gelöscht werden, kann dies „eine schwerwiegende Wiederherstellungsoperation" erzwingen, sodass sie auf einem Speicher leben sollten, der nicht regelmäßig bereinigt wird.

Die Kosten für dauerhaften Objekt-/Dateispeicher werden durch Checkpoint-/Savepoint-Verzeichnisse bestimmt. In der Flink 2.x-Konfiguration werden Checkpoints und Savepoints über execution.checkpointing.dir und execution.checkpointing.savepoint-dir konfiguriert und akzeptieren URIs wie s3://… oder hdfs://….

Verwaltete Dienste reduzieren den operativen Aufwand, fügen aber Plattformgebühren und Einschränkungen hinzu. Die Details sind anbieterabhängig.

Amazon Managed Service for Apache Flink berechnet nach KPUs (1 vCPU + 4 GB Speicher pro KPU) und berechnet nach Dauer und Anzahl der KPUs in Sekundenschritten. AWS berechnet zusätzlich eine „Orchestrierungs"-KPU pro Anwendung und separate Speicher-/Backup-Gebühren.

Confluent Cloud for Apache Flink ist nutzungsabhängig und serverlos: Sie erstellen einen Compute-Pool, und Sie werden für die verbrauchten CFUs pro Minute berechnet, während Statements ausgeführt werden. Die Preisliste enthält ein Beispiel-CFU-Preis von 0,21 USD pro CFU-Stunde (regionabhängig) und betont, dass Sie die Ausgaben über Compute-Pool-Maximalwerte begrenzen können.

Aiven und Alibaba Cloud sind bemerkenswerte verwaltete Flink-Anbieter auf dem Markt, aber ihre öffentlichen Preis- und Abrechnungsdetails variieren je nach Plan/Region und können Rechner oder Vertriebskontakt erfordern; betrachten Sie genaue Kosten als nicht spezifiziert, es sei denn, Sie zitieren eine Region+Plan aus ihren aktuellen Dokumenten.

Ververica bietet sowohl selbstverwaltete als auch verwaltete Bereitstellungsoptionen rund um Flink; öffentliche Seiten betonen Bereitstellungsentscheidungen und die Positionierung des verwalteten Dienstes, während genaue Preise typischerweise über „Kontakt/Preisdetails"-Flows abgewickelt werden (sind also öffentlich oft nicht spezifiziert).

Bereitstellungsoption Am besten für Operative Komplexität Hauptvorteile Hauptrisiken / Kompromisse
Standalone-Cluster (VMs/Bare Metal) Kleine Teams, feste Kapazität Mittel–Hoch Volle Kontrolle; einfachstes mentales Modell HA, Autoscaling, Upgrades sind DIY (mehr Arbeit)
Kubernetes mit Flink Kubernetes Operator Die meisten modernen Plattform-Teams Mittel Deklarative Bereitstellungen; Lebenszyklusverwaltung über Regelkreis; Operator unterstützt Application/Session/Job-Bereitstellungen Kubernetes + Operator-Expertise erforderlich
Native Kubernetes (ohne Operator) K8s-Teams, die direkte Integration wünschen Mittel–Hoch Direkte Ressourcenintegration; dynamische TaskManager-Allokation/-Deallokation beschrieben in Flink-on-K8s-Docs Mehr maßgeschneiderte Automatisierung als Operator
YARN Hadoop-zentrierte Plattformen Mittel Integration mit YARN-Ressourcenverwaltung Hadoop-Stack-Komplexität
AWS Managed Service for Apache Flink AWS-native Datenstacks Niedrig–Mittel Verwaltete Orchestrierung + Skalierungsoptionen; vorhersehbare Abrechnungseinheit (KPU) Plattformkopplung; zusätzliche pro-App-Overhead-KPU + Speichergebühren
Confluent Cloud for Apache Flink Kafka-first-Shops, SQL-first-Stream-Apps Niedrig Serverlose Nutzungsabrechnung; CFU-Minuten-Accounting; Compute-Pools zur Begrenzung der Ausgaben CFU-Kosten + Kafka-Netzwerkkosten; dienstspezifische APIs
Ververica verwaltete Angebote Unternehmen, die Flink-Experten-Betrieb benötigen Niedrig–Mittel Positionierung als „Flink-Experten"-Verwaltungsdienst Preise oft nicht transparent (nicht spezifiziert)

Tabelle der verwalteten Anbieter und Kosten

Preise ändern sich je nach Region und Zeit; wenn Sie genaue Zahlen für Ihre Region benötigen, betrachten Sie dies als Ausgangspunkt und überprüfen Sie diese gegen die aktuellen Preislisten des Anbieters (nicht zitierte Regionen sind nicht spezifiziert).

Anbieter „Plan"-Form Abrechnungseinheit Beispielrechnungspreis Bemerkenswerte zusätzliche Kostentreiber
Amazon Managed Service for Apache Flink Verwaltete Laufzeit KPU (1 vCPU + 4 GB) Beispiel: 0,11 USD pro KPU-Stunde (US East N. Virginia) +1 Orchestrierungs-KPU pro App; laufender Speicher; optionale dauerhafte Backups
Confluent Cloud for Apache Flink Serverless SQL/Verarbeitung CFU-Minute/CFU-Stunde Beispiel: 0,21 USD pro CFU-Stunde (Region variiert) Kafka-Netzwerkraten gelten weiterhin; Compute-Pool-Max zur Begrenzung der Ausgaben
Ververica (verwaltet) Verwaltete „Unified Streaming Data Platform" Nicht spezifiziert (öffentliche Seiten) Nicht spezifiziert Plattformfunktionen/SLAs; Preise typischerweise über Vertrieb (nicht spezifiziert)
Aiven for Apache Flink Verwaltungsdienst Stundennutzungsabrechnungsmodell (plattformweit) Nicht spezifiziert ohne Plan/Region Plan-Ebene + Cloud-Region + Add-ons (nicht spezifiziert)
Alibaba Cloud Realtime Compute for Apache Flink Verwaltet/serverlos Hybridabrechnung (Pay-as-you-go + Abonnement-Mix) Nicht spezifiziert ohne Region/Workspace-Details CU-basierte Limits und Workspace-Modell (Details variieren; hier nicht spezifiziert)

Flink befindet sich in einem geschäftigen Ökosystem. Die „beste" Wahl hängt von Latenz, Zustandsbehaftetheit, operativen Präferenzen und dem Autorierungsmodell ab.

Werkzeug Was es ist Streaming-Ausführungsmodell Zustand & genau-einmal-Geschichte Wo es glänzt Typische Schmerzpunkte
Apache Flink Verteilter Stream-Processing-Engine für zustandsbehaftete Berechnungen Kontinuierliches Streaming + Event-Time über Watermarks Checkpoint-basierte Fehlertoleranz; Savepoints für kontrollierte Upgrades Latenzarme zustandsbehaftete Pipelines; komplexe Event-Time-Logik Korrektes Betreiben von Zustand, Checkpoints, Upgrades erfordert Disziplin
Apache Spark Structured Streaming Spark-Streaming-Engine, die um DataFrames/Datasets herum gebaut ist Standard Micro-Batch-Modell (mit einem separaten kontinuierlichen Modus diskutiert) Stark für analytische Pipelines; Zustand existiert, aber oft höhere Latenz Unified Batch+Stream-APIs; Spark-Ökosystem Micro-Batch-Latenz und „Streaming als inkrementelle Batches"-mentales Modell
Kafka Streams Bibliothek zum Aufbau von Stream-Processing-Apps auf Kafka Record-at-a-time-Verarbeitung Unterstützt genau-einmal-Verarbeitungssemantik (EOS) Einfache Kafka-native Apps; Einbettung in JVM-Dienst JVM-only; weniger flexibel für große verteilte Rechenmuster
Apache Beam Unified-Programmierungsmodell + SDKs; ausgeführt über Runner (Flink, Spark, Dataflow, etc.) Hängt vom Runner ab; Beam-Pipelines werden in Runner-Jobs übersetzt Semantiken hängen von der Runner-Fähigkeitsmatrix ab (runner-spezifisch) Portabilität, mehrsprachige Pipelines; Vermeidung von Engine-Lock-in Operatives Tuning endet immer noch als runner-spezifisch
Materialize „Live Data Layer" / Streaming SQL DB; aktualisiert Ergebnisse inkrementell, wenn Daten eintreffen Kontinuierliche Wartung inkrementeller Views Starke Konsistenzansprüche in Produktdocs (Details sind produktspezifisch) Bereitstellung frischer abgeleiteter Views für Apps/KI-Agenten Unterschiedliches operatives Modell als Flink-Jobs; keine allgemeine Operator-API-Laufzeit
RisingWave Streaming-Datenbank, in der Stream-Verarbeitung als materialisierte Views ausgedrückt wird Kontinuierliche Wartung materialisierter Views SQL-first; enginespezifische Semantiken SQL-zentrierte Streaming-Apps ohne Aufbau von Flink-Jobs Weniger flexibel für willkürliche code-lastige Pipelines

Eine nützliche Heuristik: Wenn Sie eine Laufzeit für komplexe zustandsbehaftete Streaming-Jobs mit tiefgehender Kontrolle über Event-Time, Operator-Logik und Bereitstellungen wünschen, ist Flink ein primärer Kandidat. Wenn Sie SQL-first-inkrementelle Views für die Bereitstellung wünschen, können Streaming-Datenbanken Alternativen sein. Wenn Sie eine Bibliothek eingebettet in einen Dienst wünschen, ist Kafka Streams konkurrenzfähig. Wenn Sie eine portable Pipeline-Definition über Engines hinweg wünschen, ist Beam überzeugend.

Für cloudbasierte ereignisgesteuerte Architekturen mit AWS behandelt Aufbau ereignisgesteuerter Microservices mit AWS Kinesis Kinesis Data Streams-Muster für Echtzeitverarbeitung und Dienstentkopplung.

Dieser Abschnitt ist bewusst praktisch: Konfiguration, Bereitstellung und wie Ihre Go/Python-Dienste typischerweise mit Flink interagieren.

Go-Dienste + Kafka + Flink + Serving-Layer

Flink ist oft der „zustandsbehaftete Mittelteil", der hochvolumige Ereignisse in dauerhafte Signale (Zähler, Sessions, Anomalien, angereicherte Datensätze) verwandelt. Checkpoints und State-Backends sind es, die diesen Mittelteil in der Produktion zuverlässig machen.

Wichtiger Versionshinweis: Ab Flink 2.0 ist die unterstützte Konfigurationsdatei conf/config.yaml; das vorherige flink-conf.yaml ist „nicht mehr unterstützt".

Ein minimales (illustrierendes) conf/config.yaml für einen kleinen selbstverwalteten Cluster:

# conf/config.yaml (Flink 2.x Stil)
rest:
  address: flink-jobmanager.example.internal
  port: 8081

jobmanager:
  rpc:
    address: flink-jobmanager.example.internal
    port: 6123
  memory:
    process:
      size: 2048m

taskmanager:
  memory:
    process:
      size: 4096m
  numberOfTaskSlots: 2

parallelism:
  default: 2

# Checkpointing-Standardwerte (Jobs können im Code überschreiben)
state:
  backend:
    type: rocksdb
execution:
  checkpointing:
    dir: s3://my-bucket/flink/checkpoints
    savepoint-dir: s3://my-bucket/flink/savepoints
    interval: 60 s

# Vermeiden Sie tmp-Verzeichnisse, die bereinigt werden (RocksDB-Dateien, gecachte JARs, etc.)
io:
  tmp:
    dirs: ["/var/lib/flink/tmp"]

Warum diese Schlüssel: Flinks Konfigurationsreferenz dokumentiert explizit die rest.*- und jobmanager.rpc.*-Entdeckungsdetails, die Prozessspeicherschlüssel, die Slot-/Parallelitätsschlüssel und die Standard-Checkpoint-Einstellungen, einschließlich state.backend.type, execution.checkpointing.dir, execution.checkpointing.savepoint-dir und execution.checkpointing.interval.

Die Wahl von io.tmp.dirs ist operativ wichtig, da Flink es für lokale RocksDB-Dateien und gecachte Artefakte verwendet; das Löschen kann eine schwerwiegende Wiederherstellung verursachen.

Wenn Sie sich auf Flink 1.x befinden (in einigen verwalteten Umgebungen immer noch üblich), werden Sie flink-conf.yaml in der Praxis sehen. Dies ist Legacy für Flink 2.x-Nutzer.

# conf/flink-conf.yaml (Legacy 1.x Stil; NICHT unterstützt in Flink 2.x)
jobmanager.rpc.address: flink-jobmanager
rest.port: 8081
taskmanager.numberOfTaskSlots: 2
parallelism.default: 2

# Legacy-Checkpoint-Schlüssel unterscheiden sich je nach Version; betrachten Sie dies als illustrierend.
state.backend.type: rocksdb
state.checkpoints.dir: s3://my-bucket/flink/checkpoints
state.savepoints.dir: s3://my-bucket/flink/savepoints

Wenn Sie migrieren, stellt Flink ein Migrationsskript (bin/migrate-config-file.sh) bereit, um flink-conf.yaml in config.yaml zu konvertieren.

Der Flink Kubernetes Operator fungiert als Steuerungsfläche für die Lebenszyklusverwaltung von Flink-Anwendungen und wird mit Helm installiert.

Aus der offiziellen Operator-Helm-Dokumentation können Sie entweder aus dem Quellbaum-Chart oder aus dem von Apache gehosteten Chart-Repository installieren:

# aus dem gebündelten Chart im Quellbaum installieren
helm install flink-kubernetes-operator helm/flink-kubernetes-operator

# aus dem Apache-Downloads-Helm-Repository installieren (ersetzen Sie <OPERATOR-VERSION>)
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-<OPERATOR-VERSION>/
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator

Diese genauen Befehle werden in der Helm-Installationsdokumentation des Operators gezeigt.

Beispiel FlinkDeployment CR (illustrierend)

Dies ist ein vereinfachtes Beispiel, um die Integrationspunkte zu zeigen, die Sie typischerweise anpassen (Bild, Ressourcen, Checkpoint-Standorte, Logging/Metriken). Der Operator bringt diesen gewünschten Zustand über seinen Regelkreis in Einklang.

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: realtime-sessions
  namespace: flink
spec:
  image: my-registry.example.com/flink/realtime-sessions:2026-03-06
  flinkVersion: v2_2
  serviceAccount: flink
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.backend.type: "rocksdb"
    execution.checkpointing.dir: "s3://my-bucket/flink/checkpoints/realtime-sessions"
    execution.checkpointing.savepoint-dir: "s3://my-bucket/flink/savepoints/realtime-sessions"
    execution.checkpointing.interval: "60 s"
    rest.port: "8081"
  jobManager:
    resource:
      cpu: 1
      memory: "2048m"
  taskManager:
    resource:
      cpu: 2
      memory: "4096m"
  job:
    jarURI: local:///opt/flink/usrlib/realtime-sessions.jar
    parallelism: 4
    upgradeMode: savepoint
    state: running

Das Muster upgradeMode: savepoint ist üblich, wenn Sie sichere zustandsbehaftete Upgrades wünschen; Savepoints sind für Stop/Resume/Fork/Update-Workflows konzipiert und verweisen auf stabile Speicherstandorte.

PyFlink ist die Python-API für Apache Flink und wird explizit für skalierbare Batch/Stream-Workloads einschließlich ML-Pipelines und ETL beworben.

Wenn Sie JVM-Connectors (Kafka, JDBC, etc.) aus PyFlink verwenden, müssen Sie sicherstellen, dass die relevanten JARs für den Job verfügbar sind. Flinks Python „Dependency Management"-Dokumente zeigen drei Standardmechanismen:

Einstellen von pipeline.jars (Table API), Aufrufen von add_jars() (DataStream API) oder CLI --jarfile zum Zeitpunkt der Einreichung.

Dieses Beispiel liest JSON-Ereignisse von Kafka, weist Event-Time-Zeitstempel zu (mit begrenzter Unordnung), pflegt einen pro Benutzer laufenden Zähler im keyed state und schreibt ein angereichertes Ereignis an ein Ausgabe-Topic.

Hinweise:

  • KafkaSource wird über KafkaSource.builder() erstellt und benötigt Bootstrap-Server, Topics und einen Deserialisierer.
  • Die genau-einmal-Kafka-Sink-Konfiguration in PyFlink erfordert das Einstellen der Liefergarantie und eines transaktionalen ID-Präfixes.
  • Checkpoint-Standardwerte können in der Flink-Konfiguration (execution.checkpointing.*) und/oder im Code konfiguriert werden; die Konfigurationsschlüssel sind in der Flink-Konfigurationsreferenz dokumentiert.
import json
from typing import Any

from pyflink.common import Duration, Types
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.watermark_strategy import WatermarkStrategy, TimestampAssigner
from pyflink.common.configuration import Configuration

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
from pyflink.datastream.state import ValueStateDescriptor

from pyflink.datastream.connectors import DeliveryGuarantee
from pyflink.datastream.connectors.kafka import (
    KafkaSource,
    KafkaOffsetsInitializer,
    KafkaSink,
    KafkaRecordSerializationSchema,
)


class EventTimeFromJson(TimestampAssigner):
    """
    Extrahiert event_time_ms aus der JSON-Nutzlast.
    Erwartet: {"user_id":"u1","event_time_ms":1710000000000,"event":"click",...}
    """
    def extract_timestamp(self, value: str, record_timestamp: int) -> int:
        try:
            obj = json.loads(value)
            return int(obj["event_time_ms"])
        except Exception:
            # Fallback: Verwendet den Record-Zeitstempel (Ingestion), wenn fehlerhaft
            return record_timestamp


class RollingCount(KeyedProcessFunction):
    def open(self, runtime_context: RuntimeContext):
        desc = ValueStateDescriptor("rolling_count", Types.LONG())
        self.count_state = runtime_context.get_state(desc)

    def process_element(self, value: str, ctx: 'KeyedProcessFunction.Context'):
        obj = json.loads(value)
        current = self.count_state.value()
        if current is None:
            current = 0
        current += 1
        self.count_state.update(current)

        # emittiert angereichertes Ereignis
        obj["rolling_count"] = current
        obj["event_time_ms"] = int(obj.get("event_time_ms", 0))
        yield json.dumps(obj)


def build_env() -> StreamExecutionEnvironment:
    # Cluster/Job-Standardwerte (können auch in config.yaml gesetzt werden)
    cfg = Configuration()
    cfg.set_string("state.backend.type", "rocksdb")
    cfg.set_string("execution.checkpointing.dir", "s3://my-bucket/flink/checkpoints/realtime-sessions")
    cfg.set_string("execution.checkpointing.interval", "60 s")
    env = StreamExecutionEnvironment.get_execution_environment(cfg)

    # In PyFlink müssen Connector-JARs verfügbar sein; verwenden Sie env.add_jars(...) falls erforderlich.
    # env.add_jars("file:///opt/flink/lib/flink-connector-kafka-<VERSION>.jar")

    # Aktivieren Sie Checkpointing explizit (Jobs können Standardwerte überschreiben)
    env.enable_checkpointing(60_000)
    env.set_parallelism(4)
    return env


def main():
    env = build_env()

    source = (
        KafkaSource.builder()
        .set_bootstrap_servers("kafka:9092")
        .set_topics("events.raw")
        .set_group_id("realtime-sessions-v1")
        .set_value_only_deserializer(SimpleStringSchema())
        .set_starting_offsets(KafkaOffsetsInitializer.earliest())
        .build()
    )

    watermark_strategy = (
        WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(10))
        .with_timestamp_assigner(EventTimeFromJson())
    )

    stream = (
        env.from_source(source, watermark_strategy=watermark_strategy, source_name="kafka-events-raw")
        .key_by(lambda s: json.loads(s)["user_id"])
        .process(RollingCount(), output_type=Types.STRING())
    )

    record_serializer = (
        KafkaRecordSerializationSchema.builder()
        .set_topic("events.enriched")
        .set_value_serialization_schema(SimpleStringSchema())
        .build()
    )

    sink = (
        KafkaSink.builder()
        .set_bootstrap_servers("kafka:9092")
        .set_record_serializer(record_serializer)
        .set_delivery_guarantee(DeliveryGuarantee.EXACTLY_ONCE)
        .set_transactional_id_prefix("realtime-sessions-txn")
        .build()
    )

    stream.sink_to(sink)

    env.execute("realtime-sessions-pyflink")


if __name__ == "__main__":
    main()

Die oben genannten API-Aufrufe entsprechen dem PyFlink-KafkaSource-Buildernutzungsmuster und den erforderlichen Feldern. Für Liefergarantien sagt die PyFlink-KafkaSinkBuilder-Dokumentation explizit, dass für DeliveryGuarantee.EXACTLY_ONCE das transaktionale ID-Präfix gesetzt werden muss. Für Zeitstempelung/Watermarking erklärt Flinks Watermark-Dokumentation die Zeitstempelzuweisung und Watermark-Generierung als Mechanismus zur Verarbeitung von Event-Time, und PyFlink bietet eine WatermarkStrategy-API, die dieses Modell spiegelt.

Go hat keine native Flink-Job-Autoring-API wie Java/Python, daher integrieren Go-Systeme typischerweise mit Flink durch:

  • Kafka (oder andere Broker) als Ingestion/Egress.
  • Die Flink REST-API für operative Aktionen (Upload von JARs, Starten von Jobs, Abfragen des Job-Status, Auslösen von Savepoints, Rescaling).

Für Kafka-Setup und lokale Entwicklungsmuster siehe Apache Kafka Quickstart - Installieren von Kafka 4.2 mit CLI und lokalen Beispielen.

Go Kafka Producer/Consumer-Beispiel (kafka-go)

package main

import (
	"context"
	"log"
	"time"

	"github.com/segmentio/kafka-go"
)

func main() {
	ctx := context.Background()

	// Producer: schreibt rohe Ereignisse
	writer := &kafka.Writer{
		Addr:         kafka.TCP("kafka:9092"),
		Topic:        "events.raw",
		RequiredAcks: kafka.RequireAll,
	}
	defer writer.Close()

	err := writer.WriteMessages(ctx, kafka.Message{
		Key:   []byte("user:u1"),
		Value: []byte(`{"user_id":"u1","event_time_ms":1710000000000,"event":"click"}`),
		Time:  time.Now(),
	})
	if err != nil {
		log.Fatal(err)
	}

	// Consumer: liest angereicherte Ereignisse
	reader := kafka.NewReader(kafka.ReaderConfig{
		Brokers:  []string{"kafka:9092"},
		Topic:    "events.enriched",
		GroupID:  "go-debug-consumer",
		MinBytes: 1e3,
		MaxBytes: 10e6,
	})
	defer reader.Close()

	msg, err := reader.ReadMessage(ctx)
	if err != nil {
		log.Fatal(err)
	}
	log.Printf("enriched key=%s value=%s\n", string(msg.Key), string(msg.Value))
}

Dies ist „Rohrleitungs"-Code, aber es ist die häufigste praktische Integrationsschnittstelle: Kafka-Topics sind die Grenze zwischen Flink und benutzerdefinierten Diensten.

Flinks REST-API ist Teil des JobManager-Webservers und hört standardmäßig auf Port 8081 (konfigurierbar über rest.port).

Die offizielle OpenAPI-Spezifikation für den Dispatcher umfasst /jars/upload und stellt explizit fest:

  • JAR-Upload muss als Multi-Part-Daten gesendet werden
  • stellen Sie sicher, dass der Content-Type-Header auf application/x-java-archive gesetzt ist
  • bietet ein curl-Beispiel mit -F jarfile=@path/to/flink-job.jar

Ein praktisches Go-Snippet zum Hochladen einer JAR:

package flink

import (
	"bytes"
	"context"
	"fmt"
	"io"
	"mime/multipart"
	"net/http"
	"os"
)

func UploadJar(ctx context.Context, flinkBaseURL, jarPath string) (*http.Response, error) {
	f, err := os.Open(jarPath)
	if err != nil {
		return nil, err
	}
	defer f.Close()

	var body bytes.Buffer
	w := multipart.NewWriter(&body)

	part, err := w.CreateFormFile("jarfile", "job.jar")
	if err != nil {
		return nil, err
	}
	if _, err := io.Copy(part, f); err != nil {
		return nil, err
	}
	if err := w.Close(); err != nil {
		return nil, err
	}

	req, err := http.NewRequestWithContext(ctx, http.MethodPost, flinkBaseURL+"/jars/upload", &body)
	if err != nil {
		return nil, err
	}

	// Wichtig: Multi-Part-Grenze
	req.Header.Set("Content-Type", w.FormDataContentType())

	// Einige Clients setzen auch "Expect:" ähnlich wie das curl-Beispiel in der Spezifikation.
	req.Header.Set("Expect", "")

	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		return nil, err
	}
	if resp.StatusCode >= 300 {
		return resp, fmt.Errorf("upload failed: %s", resp.Status)
	}
	return resp, nil
}

Dieser Code wird durch die REST-API-OpenAPI-Beschreibung für /jars/upload einschließlich seiner Multi-Part-Anforderung und des curl-Verweises geleitet.

Um eine zuvor hochgeladene JAR auszuführen, bietet Flink /jars/{jarid}/run an und unterstützt das Übergeben von Programmargumenten über Abfrageparameter (und/oder JSON-Körper).

Operativ wertvolle Endpunkte, die Sie wahrscheinlich automatisieren:

  • /jobs und /jobs/{jobid} zum Auflisten und Inspektieren des Job-Status
  • /jobs/{jobid}/savepoints zum Auslösen von Savepoints (asynchrones Auslösen + Polling)
  • /jobs/{jobid}/rescaling zum Auslösen von Rescaling
Aspekt PyFlink (Python-Jobs) Go (Dienste um Flink herum)
Autorisierung von Flink-Logik Native Autorisierung über DataStream/Table-APIs; unterstützt Zustand + Timer Keine native Flink-API; Implementieren Sie Logik in Flink (Java/Python) und integrieren Sie extern
Connectors/Abhängigkeiten Müssen Connector-JARs über pipeline.jars, add_jars oder --jarfile mitliefern Nicht anwendbar (Sie laufen nicht innerhalb von Flink), aber Sie verwalten Kafka/DB-Clients
Ingestion/Egress KafkaSource/KafkaSink-Builders in PyFlink Kafka-Producer/Consumer-Bibliotheken; Standard-Microservice-Muster
Ops-Automatisierung Kann auch Flink-REST-Endpunkte aufrufen Besitzt oft die Automatisierung: JAR hochladen, bereitstellen, reskalieren, Savepoint über REST auslösen

Flink unterstützt das Exportieren von Metriken durch Konfiguration von Metrik-Reportern in der Flink-Konfigurationsdatei; diese Reporter werden auf JobManagern und TaskManagern instanziiert.

Für Prometheus bietet Flink Prometheus-Format-Metriken an, wenn es mit metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory in einer unterstützten Flink-Version-Umgebung konfiguriert wird.

Sie kombinieren dies in der Regel mit Kubernetes ServiceMonitors (Prometheus Operator) oder mit Ihrem verwalteten Überwachungsstack.

Skalierung: Parallelität, Slots und operatorbasiertes Autoscaling

Flinks Planungsmodell definiert Ausführungsressourcen über Task-Slots, und jeder Slot kann eine Pipeline paralleler Aufgaben ausführen.

Für manuelle Skalierung bietet die REST-API einen Rescaling-Endpunkt für einen Job (/jobs/{jobid}/rescaling) als asynchrone Operation.

Wenn Sie sich auf Kubernetes mit dem Flink Kubernetes Operator befinden, bewirbt das Operator-Projekt einen „Flink Job Autoscaler" als Teil seines Funktionsumfangs, was sich lohnt zu evaluieren, wenn sich Ihre Workloads erheblich unterscheiden.

Backups und sichere Upgrades: Checkpoints und Savepoints

Checkpoints sind für die automatisierte Wiederherstellung und werden von Flink verwaltet; Savepoints sind für benutzergesteuerte Lebenszyklusoperationen (Stop/Resume/Fork/Upgrade).

Aus einer SRE-Perspektive:

  • Verwenden Sie Checkpoints für „halte die Pipeline durch Fehler hinweg am Laufen".
  • Verwenden Sie Savepoints für „deploy eine neue Version ohne Verlust des Zustands".

Flinks REST-API unterstützt auch das asynchrone Auslösen von Savepoints, was für GitOps-artige „deploy → trigger savepoint → upgrade"-Workflows nützlich ist.

CI/CD: GitOps + Helm + REST-Job-Einreichung

Für Kubernetes:

  • Halten Sie die Operator-Installation und Ihre FlinkDeployment-CRs in Git, bereitstellen Sie über Argo CD/Flux und versionieren Sie Container-Images pro Build. Die Operator-Helm-Docs diskutieren explizit „Arbeiten mit Argo CD".

Für Standalone/Session-Cluster:

  • Verwenden Sie die Flink REST-API JAR-Upload- und Ausführen-Endpunkte für immutable Artefakt-Bereitstellungen.

Beachten Sie auch einen subtilen, aber wertvollen Sicherheits-/Ops-Schalter: web.submit.enable regelt Uploads über die Web-UI, aber die Docs stellen fest, dass Session-Cluster selbst bei Deaktivierung Job-Einreichungen über REST-Anfragen akzeptieren; dies ist relevant, wenn Sie UI-Oberflächen härten, während Sie CI/CD-Automatisierung beibehalten.

LLM-Systeme sind oft nur so gut wie ihr Echtzeit-Kontext. Flink passt in LLM/KI-Stacks als Komponente, die „immer frische" Features, Embeddings und verhaltensbezogene Aggregate produziert.

Ein übliches Muster ist:

  • Benutzeraktionen/Ereignisse ingestieren,
  • Sessions und Präferenzen aggregieren,
  • Embedding-Generierungsaufgaben produzieren,
  • Embeddings in einen Vektorspeicher und/oder Feature-Speicher schreiben.

PyFlinks Abhängigkeitsmanagement-Dokumentation nennt explizit „Maschinelles Lernen-Vorhersagen" und das Laden von ML-Modellen innerhalb von Python-UDFs (für Remote-Cluster-Ausführung), was direkt auf „Online-Inferenz innerhalb von Flink-Operatoren"-Ansätze abbildet.

Online-Feature-Speicher-Updates für Empfehlung und Ranking

Flinks keyed state und Checkpointing-Modell ist darauf ausgelegt, den Operator-Zustand über Ereignisse hinweg zu pflegen und ihn zuverlässig wiederherzustellen. Das ist eine natürliche Übereinstimmung für kontinuierliche Feature-Berechnungen (laufende Raten, Zähler, zeitlich abklingende Metriken), die nachgelagerte Empfehlungssysteme benötigen.

Praktische Latenz/Konsistenz-Kompromisse für KI-Pipelines

Wenn Ihre Architektur genau-einmal-Semantik von Ende zu Ende erfordert (z. B. um doppelte Feature-Updates oder doppelte Abrechnungsevents zu vermeiden), strukturieren Sie Sinks und Sources um Checkpointing und transaktionale Garantien herum.

In Kafka-basierten Stacks speziell:

  • Flinks Kafka-Connector kann genau-einmal-Garantien liefern, wenn Checkpointing aktiviert ist und Liefergarantie-Optionen konfiguriert sind.
  • Kafka Streams unterstützt ebenfalls genau-einmal-Semantik (EOS), was relevant ist, wenn Ihre „KI-Feature-Pipeline" klein genug ist, um innerhalb von Anwendungscode statt in einem Flink-Cluster zu leben.

Flink als Echtzeit-KI-Kontext-Baumeister

Dieses Diagramm basiert auf Flinks Kernprimitiven: Event-Time-Verarbeitung (Watermarks), State-Backends (state.backend.type und systemverwalteter lokaler Zustand) und Checkpoint/Savepoint-Mechanismen für Fehlertoleranz und Operationen.