Apache Flink op K8s en Kafka: PyFlink, Go, beheer en geprijsde beheerde diensten

Stateful streaming, checkpoints, K8s, PyFlink, Go.

Inhoud

Apache Flink is een framework voor stateful computations (berekeningen met toestand) over oneindige en eindige datastromen.

Teams kiezen ervoor voor correcte, low-latency streaming met event-time semantiek (watermarks), fouttolerantie (checkpoints), gecontroleerde upgrades (savepoints) en operationele interfaces (metrieken en REST).

Apache Flink stream processing

Deze gids is gericht op DevOps- en Go/Python-ontwikkelaars. Het vergelijkt implementatiemodellen (zelfbeheerd versus beheerd), legt de kernarchitectuur uit, beslaat Kubernetes (Helm en Operator) en standalone-opstellingen, stelt Flink tegenover Spark, Kafka Streams, Beam en streaming-databases, en toont PyFlink- en Go-integratiepatronen, inclusief LLM- en AI-georiënteerde pijplijnen.

Voor bredere context over data-infrastructuurpatronen, waaronder objectopslag, databases en messaging, zie Data Infrastructure for AI Systems: Object Storage, Databases, Search & AI Data Architecture.

Apache Flink is expliciet gepositioneerd als een stateful stream processing engine: u modelleert uw logica als een pijplijn van operatoren en Flink voert deze uit als een gedistribueerde dataflow met beheerde toestand en tijdsemantiek. In de moderne Flink-documentatie beschrijft het project zichzelf als een framework en een gedistribueerde verwerkingsengine voor stateful computations over oneindige en eindige datastromen.

Vanuit een praktisch DevOps/software-engineeringperspectief is Flink een goede keuze als u ten minste één van de volgende eigenschappen nodig hebt:

Als u join/aggregate/enrich met lage latentie en correctiegaranties nodig hebt, gebruikt u doorgaans de event-time-verwerking van Flink, waarbij “tijd” het moment is waarop het gebeurtenis plaatsvond (niet wanneer het aankwam), en watermarks de vooruitgang van de event-time door de pijplijn communiceren.

Als u stateful computation op schaal nodig hebt (roltellers, sessies, fraude-regels, feature engineering), behandelt Flink toestand als een integraal onderdeel van het programmeringsmodel en maakt het fouttolerant via checkpointing.

Als u operationeel robuuste streaming nodig hebt (storingen, rollende upgrades, herstarts), controleert Flink de toestand en stroomposities zodat de job kan herstellen en doorgaan met dezelfde semantiek “alsof er geen fout was opgetreden”.

Typische use cases voor DevOps, Go, Python en AI-teams

Flink wordt veel gebruikt voor “data pipelines & ETL”, “streaming analytics” en “event-driven applications” (de categorieën die door de Flink-docs worden gebruikt).

Voor een DevOps + Go/Python-stack zien typische patronen er als volgt uit:

Een Go-service produceert gebeurtenissen naar Kafka; Flink consumeert deze gebeurtenissen, voert stateful verwerking uit (bijv. deduplicatie, gerolde aggregatie, verrijking) en schrijft dan afgeleide feiten terug naar Kafka of een database. De operator- en checkpointing-mechanismen van Flink bestaan om deze stateful pijplijnen productie-veilig te maken.

Voor ML/LLM-teams noemt PyFlink expliciet scenarios als “machine learning prediction” en het laden van machine learning-modellen binnen Python UDF’s als een reden voor dependency-beheer, wat een directe aanbeveling is voor “Flink job als online inference / feature engineering runtime”-patronen.

De runtime van Flink bestaat uit twee processoorten: JobManager en TaskManagers. De documentatie benadrukt dat clients de dataflow naar de JobManager sturen; de client kan dan verbinding verbreken (detached mode) of verbonden blijven (attached mode).

De JobManager coördineert gedistribueerde uitvoering: planning, reageren op voltooiing of falen van taken, coördinatie van checkpoints en coördinatie van herstel. Intern bevat deze: ResourceManager (slots/hulpbronnen), Dispatcher (REST + Web UI + per-job JobMaster creatie) en JobMaster (beheert één job).

De TaskManagers voeren de operatoren/taken uit en wisselen/bufferen datastromen. De kleinste eenheid voor planning is de task slot; meerdere operatoren kunnen in één slot worden uitgevoerd (operator chaining en slot sharing beïnvloeden dit).

Operator chaining en task slots voor prestatie- en kostencontrole

Flink koppelt operator-subtaken aan taken, waarbij elke taak door een enkele thread wordt uitgevoerd. Dit wordt omschreven als een prestatieoptimalisatie die overhead van thread-overdracht en buffering vermindert, waardoor het doorvoersnelheid toeneemt en de latentie daalt.

Slots zijn operationeel belangrijk omdat ze de eenheid zijn voor hulpbronnenplanning en isolatie. Flink merkt op dat elke TaskManager één of meer task slots kan hebben; slotting reserveert managed memory per slot, maar isoleert geen CPU.

Event-time verwerking, watermarks en late data

Flink ondersteunt meerdere begrippen van tijd—event time, ingestion time, processing time—en gebruikt watermarks om vooruitgang in event time te modelleren.

Om met event time te werken, heeft Flink tijdstempels nodig die aan gebeurtenissen worden toegewezen en moeten watermarks worden gegenereerd; de officiële “Generating Watermarks”-documentatie leg uit dat tijdstempeltoewijzing en watermarkgeneratie de kernblokken zijn, waarbij WatermarkStrategy de standaard manier is om veelvoorkomende strategieën te configureren.

Fouttolerantie: checkpoints versus savepoints in echte systemen

Checkpointing bestaat omdat “elke functie en operator in Flink stateful kan zijn”; toestand moet gecontroleerd worden om fouttolerant te worden. Checkpoints maken herstel van zowel toestand als stroomposities mogelijk, zodat de uitvoering kan hervatten met failure-free semantiek.

Flink is zeer expliciet dat savepoints “een consistente afbeelding van de uitvoeringstoestand van een streaming job zijn, gemaakt via het checkpointing-mechanisme van Flink”, gebruikt om jobs te stoppen en hervatten, te fork of te updaten. Savepoints leven op stabiele opslag (bijv. HDFS, S3).

De officiële “Checkpoints vs Savepoints”-pagina frame het verschil zoals back-ups versus herstellogs: checkpoints zijn frequent, lichtgewicht en door Flink beheerd voor herstel na storingen; savepoints zijn door de gebruiker beheerd en worden gebruikt voor gecontroleerde operaties zoals upgrades.

De open-source Flink-runtime is “gratis” in de zin van licenties, maar in productie betaalt u voor infrastructuur en operationele inspanning.

Flink is ontworpen om te integreren met veelvoorkomende resource managers (bijv. YARN en Kubernetes) en kan ook als standalone-cluster of als bibliotheek draaien.

Compute- en geheugenkosten worden gedreven door JobManager en TaskManagers, en door uw parallelisme/slot-indeling. De configuratiedocumentatie van Flink noemt expliciet jobmanager.memory.process.size, taskmanager.memory.process.size, taskmanager.numberOfTaskSlots en parallelism.default als kernknoppen voor gedistribueerde opstellingen.

Lokale schijf is een frequente verborgen kostenpost voor stateful jobs. Flink merkt op dat io.tmp.dirs lokale data opslaat, waaronder RocksDB-bestanden, overgelopen tussenresultaten en cached JAR’s; als deze data wordt verwijderd, kan dit leiden tot “een zware hersteloperatie”, dus het moet op opslag leven die niet periodiek wordt opgeruimd.

De kosten voor duurzame object/file-opslag worden gedreven door checkpoint/savepoint-mappen. In Flink 2.x-configuratie worden checkpoints en savepoints geconfigureerd via execution.checkpointing.dir en execution.checkpointing.savepoint-dir en accepteren ze URIs zoals s3://… of hdfs://….

Beheerde services verminderen operationele kosten maar voegen platformkosten en beperkingen toe. De details zijn afhankelijk van de provider.

Amazon Managed Service for Apache Flink factureert per KPU (1 vCPU + 4 GB geheugen per KPU) en factureert per duur en aantal KPUs in stappen van één seconde. AWS factureert ook een extra “orchestratie” KPU per applicatie en aparte opslag/back-upkosten.

Confluent Cloud for Apache Flink is gebruiksbasis en serverloos: u maakt een compute-pool aan en u wordt gefactureerd voor CFU’s die per minuut worden verbruikt terwijl statements worden uitgevoerd. De factuurpagina bevat een voorbeeld CFU-prijs van $0,21 per CFU-uur (afhankelijk van regio) en benadrukt dat u de uitgaven kunt beperken via compute-pool maxima.

Aiven en Alibaba Cloud zijn opmerkelijke beheerde Flink-providers op de markt, maar hun openbare prijzen en factuurgegevens variëren per plan/regio en kunnen rekenmachines of contact met sales vereisen; behandel exacte kosten als niet gespecificeerd tenzij u een regio+plan citeert uit hun huidige documenten.

Ververica biedt zowel zelfbeheerde als beheerde implementatie-opties rondom Flink; openbare pagina’s benadrukken implementatiekeuzes en beheerde service-positionering, terwijl exacte prijzen doorgaans via “contact/prijsdetails”-stromen worden afhandeld (dus specifieke getallen zijn vaak niet gespecificeerd openbaar).

Implementatie-optie Best voor Operationele complexiteit Kernvoordelen Kernrisico’s / afwegingen
Standalone-cluster (VM’s/bare metal) Kleine teams, vaste capaciteit Medium–Hoog Volledige controle; eenvoudigste mentaal model HA, autoscaling, upgrades zijn DIY (meer werk)
Kubernetes met Flink Kubernetes Operator Meeste moderne platformteams Medium Declaratieve implementaties; levenscyclusbeheer via controlelus; operator ondersteunt Application/Session/Job-implementaties Kubernetes + operator-expertise vereist
Native Kubernetes (zonder operator) K8s-teams die directe integratie willen Medium–Hoog Directe hulpbronnenintegratie; dynamische TaskManager-toewijzing/de-allocatie beschreven in Flink-on-K8s-docs Meer maatwerkautomatisering dan operator
YARN Hadoop-gecentreerde platforms Medium Integreert met YARN-resource-beheer Hadoop-stack complexiteit
AWS Managed Service for Apache Flink AWS-native data-stacks Laag–Medium Beheerde orchestratie + schaalopties; voorspelbare factuur-eenheid (KPU) Platform-koppeling; extra per-app overhead KPU + opslagkosten
Confluent Cloud for Apache Flink Kafka-first shops, SQL-first stream apps Laag Serverloze gebruiksfacturatie; CFU-minutenrekening; compute-pools om uitgaven te beperken CFU-kosten + Kafka-netwerk Kosten; service-specifieke API’s
Ververica beheerde aanbiedingen Enterprise die Flink-expert ops nodig heeft Laag–Medium “Flink experts” beheerde service-positionering Prijzen vaak niet transparant (niet gespecificeerd)

Tabel met beheerde providers en kosten

Prijzen veranderen per regio en tijd; als u exacte getallen nodig heeft voor uw regio, behandel dit als een startpunt en controleer tegen de huidige prijspagina’s van de provider (niet geciteerde regio’s zijn niet gespecificeerd).

Provider “Plan” vorm Factuur-eenheid Voorbeeld compute-prijs Opmerkelijke extra kostenstuurders
Amazon Managed Service for Apache Flink Beheerde runtime KPU (1 vCPU + 4 GB) Voorbeeld getoond: $0,11 per KPU-uur (US East N. Virginia) +1 orchestratie KPU per app; lopende opslag; optionele duurzame back-ups
Confluent Cloud for Apache Flink Serverloze SQL/verwerking CFU-minuut/CFU-uur Voorbeeld getoond: $0,21 per CFU-uur (regio varieert) Kafka-netwerktarieven gelden nog steeds; compute-pool max om uitgaven te beperken
Ververica (beheerd) Beheerde “Unified Streaming Data Platform” Niet gespecificeerd (openbare pagina’s) Niet gespecificeerd Platform-functies/SLA’s; prijzen doorgaans via sales (niet gespecificeerd)
Aiven voor Apache Flink Beheerde service Uurlijk gebruiksmodel (platform breed) Niet gespecificeerd zonder plan/regio Plan-niveau + cloud regio + add-ons (niet gespecificeerd)
Alibaba Cloud Realtime Compute for Apache Flink Beheerd/serverloos Hybride facturatie (pay-as-you-go + abonnement mix) Niet gespecificeerd zonder regio/workspace details CU-gebaseerde limieten en workspace-model (details variëren; hier niet gespecificeerd)

Flink zit in een druk ecosysteem. De “beste” keuze hangt af van latentie, statefulness, operationele voorkeuren en het auteuringsmodel.

Tool Wat het is Streaming uitvoeringsmodel Toestand & exactly-once verhaal Waar het uitblinkt Typische pijnpunten
Apache Flink Gedistribueerde stream processing engine voor stateful computations Continue streaming + event time via watermarks Checkpoint-gebaseerde fouttolerantie; savepoints voor gecontroleerde upgrades Low-latency stateful pijplijnen; complexe event-time logica Correct opereren van toestand, checkpoints en upgrades vereist discipline
Apache Spark Structured Streaming Streaming-engine van Spark gebouwd rond DataFrames/Datasets Standaard micro-batch model (met een continue modus apart besproken) Sterk voor analytische pijplijnen; toestand bestaat maar vaak hogere latentie Geünificeerde batch+stream API’s; Spark ecosysteem Micro-batch latentie en “streaming als incrementele batches” mentaal model
Kafka Streams Bibliotheek om stream-processing-apps op Kafka te bouwen Record-at-a-time verwerking Ondersteunt exactly-once processing semantiek (EOS) Eenvoudige Kafka-native apps; inbedden in JVM-service Alleen JVM; minder flexibel voor grote gedistribueerde compute patronen
Apache Beam Geünificeerd programmeringsmodel + SDK’s; uitgevoerd via runners (Flink, Spark, Dataflow, etc.) Hangt af van runner; Beam-pijplijnen vertalen naar runner-jobs Semantiek hangen af van runner-capaciteit (runner-specifiek) Portabiliteit, multi-talige pijplijnen; vermijden van engine lock-in Operationele tuning eindigt toch runner-specifiek
Materialize “Live data layer” / streaming SQL DB; update resultaten incrementeel naarmate data aankomt Continue incrementele view-onderhoud Sterke consistentieclaims in productdocs (details zijn product-specifiek) Serveren van verse afgeleide views aan apps/AI-agents Anders operationeel model dan Flink jobs; geen algemene operator-API runtime
RisingWave Streaming database waar stream processing wordt uitgedrukt als materialized views Continue materialized view-onderhoud SQL-first; engine-specifieke semantiek SQL-gecentreerde streaming apps zonder Flink jobs te bouwen Minder flexibel voor arbitrare code-zware pijplijnen

Een nuttige heuristiek: als u een runtime wilt voor complexe stateful streaming jobs met diepe controle over event-time, operator-logica en implementaties, is Flink een primaire kandidaat. Als u SQL-first incrementele views wilt voor serveren, kunnen streaming databases alternatieven zijn. Als u een bibliotheek ingebed in een service wilt, is Kafka Streams concurrerend. Als u één portabele pijplijndefinitie over engines wilt, is Beam overtuigend.

Voor cloud-native event-driven architecturen die AWS gebruiken, behandelt Building Event-Driven Microservices with AWS Kinesis Kinesis Data Streams patronen voor real-time verwerking en service-ontkoppeling.

Dit deel is intentioneel praktisch: configuratie, implementatie en hoe uw Go/Python-services doorgaans met Flink interacteren.

Go services + Kafka + Flink + serving layer

Flink is vaak de “stateful middle” die hoogvolume-gebeurtenissen omzet in duurzame signalen (tellers, sessies, anomalieën, verrijkte records). Checkpoints en state backends maken die middle betrouwbaar in productie.

Belangrijke versie-opmerking: beginnend met Flink 2.0, is het ondersteunde configuratiebestand conf/config.yaml; het vorige flink-conf.yaml is “niet langer ondersteund”.

Een minimaal (illustratief) conf/config.yaml voor een klein zelfbeheerd cluster:

# conf/config.yaml (Flink 2.x stijl)
rest:
  address: flink-jobmanager.example.internal
  port: 8081

jobmanager:
  rpc:
    address: flink-jobmanager.example.internal
    port: 6123
  memory:
    process:
      size: 2048m

taskmanager:
  memory:
    process:
      size: 4096m
  numberOfTaskSlots: 2

parallelism:
  default: 2

# Checkpointing defaults (jobs kunnen dit nog steeds overschrijven in code)
state:
  backend:
    type: rocksdb
execution:
  checkpointing:
    dir: s3://my-bucket/flink/checkpoints
    savepoint-dir: s3://my-bucket/flink/savepoints
    interval: 60 s

# Vermijd tmp dirs die worden opgeruimd (RocksDB bestanden, cached jars, etc.)
io:
  tmp:
    dirs: ["/var/lib/flink/tmp"]

Waarom deze sleutels: De configuratiereferentie van Flink documenteert expliciet de rest.* en jobmanager.rpc.* ontdekkingsdetails, de process memory-sleutels, de slot/parallelisme-sleutels, en de standaard checkpoint-instellingen inclusief state.backend.type, execution.checkpointing.dir, execution.checkpointing.savepoint-dir, en execution.checkpointing.interval.

De io.tmp.dirs-keuze is operationeel belangrijk omdat Flink het gebruikt voor lokale RocksDB-bestanden en cached artefacten; het verwijderen hiervan kan zware hersteloperaties veroorzaken.

Als u op Flink 1.x zit (nog steeds veelvoorkomend in sommige beheerde omgevingen), zult u flink-conf.yaml in het veld zien. Dit is legacy voor Flink 2.x-gebruikers.

# conf/flink-conf.yaml (legacy 1.x stijl; NIET ondersteund in Flink 2.x)
jobmanager.rpc.address: flink-jobmanager
rest.port: 8081
taskmanager.numberOfTaskSlots: 2
parallelism.default: 2

# Legacy checkpoint-sleutels verschillen per versie; behandel als illustratief.
state.backend.type: rocksdb
state.checkpoints.dir: s3://my-bucket/flink/checkpoints
state.savepoints.dir: s3://my-bucket/flink/savepoints

Als u migreert, biedt Flink een migratiescript (bin/migrate-config-file.sh) om flink-conf.yaml naar config.yaml te converteren.

De Flink Kubernetes Operator fungeert als een controlevlak voor Flink applicatie levenscyclusbeheer en wordt geïnstalleerd via Helm.

Volgens de officiële operator Helm-docs kunt u installeren vanuit de source tree chart, of vanuit de Apache-gehoste chart-repository:

# installeren vanuit gebundelde chart in source tree
helm install flink-kubernetes-operator helm/flink-kubernetes-operator

# installeren vanuit Apache downloads Helm repository (vervang <OPERATOR-VERSION>)
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-<OPERATOR-VERSION>/
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator

Deze exacte commando’s worden getoond in de Helm-installatiedocumentatie van de operator.

Voorbeeld FlinkDeployment CR (illustratief)

Dit is een vereenvoudigd voorbeeld om de integratiepunten te tonen die u doorgaans aanpast (image, resources, checkpoint-locaties, logging/metrieken). De operator reconcileert deze gewenste toestand via zijn controlelus.

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: realtime-sessions
  namespace: flink
spec:
  image: my-registry.example.com/flink/realtime-sessions:2026-03-06
  flinkVersion: v2_2
  serviceAccount: flink
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.backend.type: "rocksdb"
    execution.checkpointing.dir: "s3://my-bucket/flink/checkpoints/realtime-sessions"
    execution.checkpointing.savepoint-dir: "s3://my-bucket/flink/savepoints/realtime-sessions"
    execution.checkpointing.interval: "60 s"
    rest.port: "8081"
  jobManager:
    resource:
      cpu: 1
      memory: "2048m"
  taskManager:
    resource:
      cpu: 2
      memory: "4096m"
  job:
    jarURI: local:///opt/flink/usrlib/realtime-sessions.jar
    parallelism: 4
    upgradeMode: savepoint
    state: running

Het upgradeMode: savepoint-patroon is gebruikelijk wanneer u veilige stateful upgrades wilt; savepoints zijn ontworpen voor stop/resume/fork/update workflows en wijzen naar stabiele opslaglocaties.

PyFlink is de Python-API voor Apache Flink en wordt expliciet gepresenteerd voor schaalbare batch/stream-werklasten, inclusief ML-pijplijnen en ETL.

Wanneer u JVM-connectors (Kafka, JDBC, etc.) gebruikt vanuit PyFlink, moet u ervoor zorgen dat de relevante JAR’s beschikbaar zijn voor de job. De Python “Dependency Management”-docs van Flink tonen drie standaardmechanismen:

Instellen van pipeline.jars (Table API), aanroepen van add_jars() (DataStream API), of CLI --jarfile op het moment van indienen.

Dit voorbeeld leest JSON-gebeurtenissen van Kafka, wijst event-time tijdstempels toe (met gebonden out-of-orderness), onderhoudt een per-user rollende telling in geklede toestand, en schrijft een verrijkte gebeurtenis naar een output-topic.

Opmerkingen:

  • KafkaSource wordt gebouwd via KafkaSource.builder() en vereist bootstrap servers, topics en een deserialiser.
  • Exactly-once Kafka sink-configuratie in PyFlink vereist het instellen van delivery guarantee en een transactional ID prefix.
  • Checkpoint-standaarden kunnen worden geconfigureerd in Flink config (execution.checkpointing.*) en/of in code; de config-sleutels zijn gedocumenteerd in de Flink configuratierferentie.
import json
from typing import Any

from pyflink.common import Duration, Types
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.watermark_strategy import WatermarkStrategy, TimestampAssigner
from pyflink.common.configuration import Configuration

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
from pyflink.datastream.state import ValueStateDescriptor

from pyflink.datastream.connectors import DeliveryGuarantee
from pyflink.datastream.connectors.kafka import (
    KafkaSource,
    KafkaOffsetsInitializer,
    KafkaSink,
    KafkaRecordSerializationSchema,
)


class EventTimeFromJson(TimestampAssigner):
    """
    Haal event_time_ms op uit de JSON payload.
    Verwacht: {"user_id":"u1","event_time_ms":1710000000000,"event":"click",...}
    """
    def extract_timestamp(self, value: str, record_timestamp: int) -> int:
        try:
            obj = json.loads(value)
            return int(obj["event_time_ms"])
        except Exception:
            # fallback: gebruik record timestamp (ingestion) als misvormd
            return record_timestamp


class RollingCount(KeyedProcessFunction):
    def open(self, runtime_context: RuntimeContext):
        desc = ValueStateDescriptor("rolling_count", Types.LONG())
        self.count_state = runtime_context.get_state(desc)

    def process_element(self, value: str, ctx: 'KeyedProcessFunction.Context'):
        obj = json.loads(value)
        current = self.count_state.value()
        if current is None:
            current = 0
        current += 1
        self.count_state.update(current)

        # stuur verrijkte gebeurtenis
        obj["rolling_count"] = current
        obj["event_time_ms"] = int(obj.get("event_time_ms", 0))
        yield json.dumps(obj)


def build_env() -> StreamExecutionEnvironment:
    # Cluster/job defaults (kan ook in config.yaml worden ingesteld)
    cfg = Configuration()
    cfg.set_string("state.backend.type", "rocksdb")
    cfg.set_string("execution.checkpointing.dir", "s3://my-bucket/flink/checkpoints/realtime-sessions")
    cfg.set_string("execution.checkpointing.interval", "60 s")
    env = StreamExecutionEnvironment.get_execution_environment(cfg)

    # In PyFlink moeten connector jars beschikbaar zijn; gebruik env.add_jars(...) indien nodig.
    # env.add_jars("file:///opt/flink/lib/flink-connector-kafka-<VERSION>.jar")

    # Schakel checkpointing expliciet in (jobs kunnen defaults overschrijven)
    env.enable_checkpointing(60_000)
    env.set_parallelism(4)
    return env


def main():
    env = build_env()

    source = (
        KafkaSource.builder()
        .set_bootstrap_servers("kafka:9092")
        .set_topics("events.raw")
        .set_group_id("realtime-sessions-v1")
        .set_value_only_deserializer(SimpleStringSchema())
        .set_starting_offsets(KafkaOffsetsInitializer.earliest())
        .build()
    )

    watermark_strategy = (
        WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(10))
        .with_timestamp_assigner(EventTimeFromJson())
    )

    stream = (
        env.from_source(source, watermark_strategy=watermark_strategy, source_name="kafka-events-raw")
        .key_by(lambda s: json.loads(s)["user_id"])
        .process(RollingCount(), output_type=Types.STRING())
    )

    record_serializer = (
        KafkaRecordSerializationSchema.builder()
        .set_topic("events.enriched")
        .set_value_serialization_schema(SimpleStringSchema())
        .build()
    )

    sink = (
        KafkaSink.builder()
        .set_bootstrap_servers("kafka:9092")
        .set_record_serializer(record_serializer)
        .set_delivery_guarantee(DeliveryGuarantee.EXACTLY_ONCE)
        .set_transactional_id_prefix("realtime-sessions-txn")
        .build()
    )

    stream.sink_to(sink)

    env.execute("realtime-sessions-pyflink")


if __name__ == "__main__":
    main()

De API-oproepen hierboven lijnen op met PyFlink’s KafkaSource builder-gebruikspatroon en vereiste velden. Voor delivery guarantees zegt de PyFlink KafkaSinkBuilder-documentatie expliciet dat voor DeliveryGuarantee.EXACTLY_ONCE u de transactional ID prefix moet instellen. Voor timestamping/watermarking legt de Flink watermark-documentatie uit dat tijdstempeltoewijzing en watermarkgeneratie het mechanisme zijn om event time te verwerken, en PyFlink biedt een WatermarkStrategy-API die dit model weerspiegelt.

Go heeft geen native Flink job auteurings-API zoals Java/Python, dus Go-systemen integreren doorgaans met Flink via:

  • Kafka (of andere brokers) als ingestion/egress.
  • De Flink REST API voor operationele acties (uploaden van JAR’s, jobs starten, job-status opvragen, savepoints triggeren, rescaling).

Voor Kafka-opstelling en lokale ontwikkelingspatronen, zie Apache Kafka Quickstart - Install Kafka 4.2 met CLI en Lokale Voorbeelden.

Go Kafka producer/consumer voorbeeld (kafka-go)

package main

import (
	"context"
	"log"
	"time"

	"github.com/segmentio/kafka-go"
)

func main() {
	ctx := context.Background()

	// Producer: schrijf ruwe gebeurtenissen
	writer := &kafka.Writer{
		Addr:         kafka.TCP("kafka:9092"),
		Topic:        "events.raw",
		RequiredAcks: kafka.RequireAll,
	}
	defer writer.Close()

	err := writer.WriteMessages(ctx, kafka.Message{
		Key:   []byte("user:u1"),
		Value: []byte(`{"user_id":"u1","event_time_ms":1710000000000,"event":"click"}`),
		Time:  time.Now(),
	})
	if err != nil {
		log.Fatal(err)
	}

	// Consumer: lees verrijkte gebeurtenissen
	reader := kafka.NewReader(kafka.ReaderConfig{
		Brokers:  []string{"kafka:9092"},
		Topic:    "events.enriched",
		GroupID:  "go-debug-consumer",
		MinBytes: 1e3,
		MaxBytes: 10e6,
	})
	defer reader.Close()

	msg, err := reader.ReadMessage(ctx)
	if err != nil {
		log.Fatal(err)
	}
	log.Printf("enriched key=%s value=%s\n", string(msg.Key), string(msg.Value))
}

Dit is “plumbing” code, maar het is het meest voorkomende praktische integratievlak: Kafka topics zijn de grens tussen Flink en aangepaste services.

De REST API van Flink is onderdeel van de JobManager webserver en luistert standaard op poort 8081 (configureerbaar via rest.port).

De officiële OpenAPI-specificatie voor de dispatcher bevat /jars/upload en stelt expliciet:

  • JAR upload moet worden verzonden als multi-part data
  • zorg dat de Content-Type header is ingesteld op application/x-java-archive
  • biedt een curl-voorbeeld met -F jarfile=@path/to/flink-job.jar

Een praktisch Go-snippet om een JAR te uploaden:

package flink

import (
	"bytes"
	"context"
	"fmt"
	"io"
	"mime/multipart"
	"net/http"
	"os"
)

func UploadJar(ctx context.Context, flinkBaseURL, jarPath string) (*http.Response, error) {
	f, err := os.Open(jarPath)
	if err != nil {
		return nil, err
	}
	defer f.Close()

	var body bytes.Buffer
	w := multipart.NewWriter(&body)

	part, err := w.CreateFormFile("jarfile", "job.jar")
	if err != nil {
		return nil, err
	}
	if _, err := io.Copy(part, f); err != nil {
		return nil, err
	}
	if err := w.Close(); err != nil {
		return nil, err
	}

	req, err := http.NewRequestWithContext(ctx, http.MethodPost, flinkBaseURL+"/jars/upload", &body)
	if err != nil {
		return nil, err
	}

	// Belangrijk: multipart boundary
	req.Header.Set("Content-Type", w.FormDataContentType())

	// Sommige clients stellen ook "Expect:" in zoals in het curl-voorbeeld in de spec.
	req.Header.Set("Expect", "")

	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		return nil, err
	}
	if resp.StatusCode >= 300 {
		return resp, fmt.Errorf("upload failed: %s", resp.Status)
	}
	return resp, nil
}

Deze code wordt geleid door de REST API OpenAPI-beschrijving voor /jars/upload, inclusief zijn multi-part vereiste en curl-referentie.

Om een eerder geüploade JAR uit te voeren, exposeert Flink /jars/{jarid}/run en ondersteunt het doorgeven van programma-argumenten via query parameters (en/of JSON body).

Operationeel waardevolle endpoints die u waarschijnlijk automatiseert:

  • /jobs en /jobs/{jobid} om job-status op te vragen en te inspecteren
  • /jobs/{jobid}/savepoints om savepoints te triggeren (async trigger + polling)
  • /jobs/{jobid}/rescaling om rescaling te triggeren
Aangelegenheid PyFlink (Python jobs) Go (services rond Flink)
Flink logica auteur Native auteur via DataStream/Table API’s; ondersteunt toestand + timers Geen native Flink API; implementeer logica in Flink (Java/Python) en integreer extern
Connectors/dependencies Moet connector JAR’s meesturen via pipeline.jars, add_jars, of --jarfile Niet van toepassing (u draait niet binnen Flink), maar u beheert Kafka/DB clients
Ingestion/egress KafkaSource/KafkaSink builders in PyFlink Kafka producer/consumer bibliotheken; standaard microservice patronen
Ops automatisering Kan ook Flink REST endpoints aanroepen Eigen vaak automatisering: upload JAR, implementeren, rescale, trigger savepoint via REST

Flink ondersteunt het exporteren van metrieken door metric reporters te configureren in het Flink configuratiebestand; deze reporters worden geïnstantieerd op JobManager en TaskManagers.

Voor Prometheus exposeert Flink Prometheus-formaat metrieken wanneer geconfigureerd met metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory in een ondersteunde Flink-versie-omgeving.

U combineert dit doorgaans met Kubernetes ServiceMonitors (Prometheus Operator) of met uw beheerde monitoringstack.

Schalen: parallelisme, slots en operator-gebaseerd autoscaling

Het planningsmodel van Flink definieert uitvoeringshulpbronnen via task slots, en elk slot kan een pijplijn van parallelle taken uitvoeren.

Voor handmatig schalen biedt de REST API een rescaling-endpoint voor een job (/jobs/{jobid}/rescaling) als een async operatie.

Als u op Kubernetes bent met de Flink Kubernetes Operator, adverteert het operator-project een “Flink Job Autoscaler” als onderdeel van zijn functieset, wat de moeite waard is om te evalueren als uw werklasten aanzienlijk variëren.

Back-ups en veilige upgrades: checkpoints en savepoints

Checkpoints zijn voor geautomatiseerd herstel en worden beheerd door Flink; savepoints zijn voor door de gebruiker gedreven levenscyclusoperaties (stop/resume/fork/upgrade).

Vanuit een SRE-perspectief:

  • Gebruik checkpoints voor “houd de pijplijn draaiende door storingen heen”.
  • Gebruik savepoints voor “implementeer een nieuwe versie zonder dat toestand verloren gaat”.

De REST API van Flink ondersteunt ook het asynchroon triggeren van savepoints, wat nuttig is voor GitOps-stijl “implementeren → trigger savepoint → upgrade” workflows.

CI/CD: GitOps + Helm + REST job indienen

Voor Kubernetes:

  • Bewaar de operator-installatie en uw FlinkDeployment CR’s in Git, implementeer via Argo CD/Flux, en versioneer container-images per build. De operator Helm-docs bespreken expliciet “Werken met Argo CD”.

Voor standalone/session clusters:

  • Gebruik de Flink REST API JAR upload en run endpoints voor immutable artefact-implementaties.

Merk ook op een subtiele maar waardevolle beveiligings/ops-schakelaar: web.submit.enable beheert uploads via de Web UI, maar de docs merken op dat zelfs wanneer dit uitgeschakeld is, session clusters toch job-indieningen accepteren via REST-oproepen; dit is relevant bij het versterken van UI-oppervlakken terwijl CI/CD-automatisering behouden blijft.

LLM-systemen zijn vaak niet beter dan hun real-time context. Flink past in LLM/AI-stacks als het component dat “altijd verse” features, embeddings en gedragsaggregaten produceert.

Een veelvoorkomend patroon is:

  • ingestie van gebruikersacties/gebeurtenissen,
  • aggregatie van sessies en voorkeuren,
  • productie van embedding-generatietaken,
  • schrijven van embeddings naar een vector store en/of feature store.

De dependency management-documentatie van PyFlink noemt expliciet “machine learning prediction” en het laden van ML-modellen binnen Python UDF’s (voor remote cluster uitvoering), wat direct correspondeert met “online inference binnen Flink operators”-aanpakken.

Online feature store updates voor aanbeveling en ranking

Het model van Flink’s geklede toestand en checkpointing is gebouwd om operator-toestand over gebeurtenissen te onderhouden en het betrouwbaar te herstellen. Dat is een natuurlijke match voor continue feature-berekening (roltellingen, tellingen, tijd-gedecayde metrieken) die downstream recommenders nodig hebben.

Praktische latentie/consistentie afwegingen voor AI-pijplijnen

Als uw architectuur end-to-end exactly-once semantiek vereist (bijv. duplicaten van feature-updates of duplicaten van facturatiegebeurtenissen vermijden), zult u sinks en sources structureren rondom checkpointing en transactieggaranties.

In Kafka-gebaseerde stacks specifiek:

  • De Kafka-connector van Flink kan exactly-once garanties leveren wanneer checkpointing is ingeschakeld en delivery guarantee-opties zijn geconfigureerd.
  • Kafka Streams ondersteunt ook exactly-once semantiek (EOS), wat relevant is als uw “AI feature pijplijn” klein genoeg is om binnen applicatiecode te leven in plaats van in een Flink-cluster.

Flink as the real-time AI context builder

Dit diagram is gebaseerd op de kernprimitieven van Flink: event-time verwerking (watermarks), state backends (state.backend.type en systeembeheerde lokale toestand), en checkpoint/savepoint-mechanismen voor fouttolerantie en operaties.