Apache Flink su K8s e Kafka: PyFlink, Go, operazioni e prezzi gestiti

Streaming con stato, checkpoint, K8s, PyFlink, Go.

Indice

Apache Flink è un framework per elaborazioni con stato su flussi di dati limitati e illimitati.

I team lo adottano per lo streaming corretto e a bassa latenza con semantica del tempo degli eventi (watermark), tolleranza ai guasti (checkpoint), aggiornamenti controllati (savepoint) e superfici operative (metriche e REST).

Apache Flink stream processing

Questa guida è rivolta agli ingegneri DevOps e agli sviluppatori Go/Python. Confronta i modelli di deployment (self-managed vs gestiti), spiega l’architettura di base, copre le configurazioni Kubernetes (Helm e Operator) e standalone, confronta Flink con Spark, Kafka Streams, Beam e database di streaming, e mostra i pattern di integrazione PyFlink e Go, inclusi i pipeline orientati a LLM e AI.

Per un contesto più ampio sui pattern di infrastruttura dati, tra cui storage a oggetti, database e messaggistica, consulta Data Infrastructure for AI Systems: Object Storage, Databases, Search & AI Data Architecture.

Apache Flink è esplicitamente posizionato come motore di stream processing con stato: modelli la tua logica come un pipeline di operatori e Flink lo esegue come un dataflow distribuito con stato gestito e semantica del tempo. Nella documentazione moderna di Flink, il progetto si descrive come un framework e un motore di elaborazione distribuito per calcoli con stato su flussi di dati illimitati e limitati.

Da una prospettiva pratica di ingegneria DevOps/software, Flink è una buona scelta quando hai bisogno di almeno una di queste proprietà:

Se hai bisogno di join/aggregazione/arricchimento a bassa latenza con garanzie di correttezza, utilizzi tipicamente l’elaborazione del tempo degli eventi di Flink, dove il “tempo” è quando l’evento è accaduto (non quando è arrivato), e i watermark comunicano il progresso del tempo degli eventi attraverso il pipeline.

Se hai bisogno di calcolo con stato su larga scala (contatori rotolanti, sessioni, regole antifrode, feature engineering), Flink tratta lo stato come una parte di primo piano del modello di programmazione e lo rende tollerante ai guasti tramite il checkpointing.

Se hai bisogno di streaming operativamente robusto (guasti, aggiornamenti rotolanti, riavvii), Flink fa il checkpoint dello stato e delle posizioni del flusso in modo che il job possa riprendersi e continuare con le stesse semantica “come un’esecuzione senza guasti”.

Casi d’uso tipici per team DevOps, Go, Python e AI

Flink è ampiamente usato per “pipeline dati & ETL”, “streaming analytics” e “applicazioni event-driven” (le categorie usate dalla documentazione di Flink).

Per uno stack DevOps + Go/Python, i pattern tipici sono questi:

Un servizio Go produce eventi su Kafka; Flink consuma quegli eventi, esegue un’elaborazione con stato (ad es. deduplicazione, aggregazione a finestra, arricchimento) e scrive i fatti derivati di nuovo su Kafka o in un database. I meccanismi di operatori e checkpointing di Flink esistono per rendere questi pipeline con stato sicuri per la produzione.

Per i team ML/LLM, PyFlink cita esplicitamente scenari come “previsione machine learning” e il caricamento di modelli di machine learning all’interno di UDF Python come motivazione per la gestione delle dipendenze, che è un’endorsement diretto dei pattern “job Flink come runtime per inferenza online / feature engineering”.

Il runtime di Flink consiste in due tipi di processi: JobManager e TaskManagers. La documentazione sottolinea che i client inviano il dataflow al JobManager; il client può poi disconnettersi (modalità detached) o rimanere connesso (modalità attached).

Il JobManager coordina l’esecuzione distribuita: scheduling, reazione al completamento/failure dei task, coordinamento dei checkpoint e coordinamento del recupero. Internamente, include: ResourceManager (slot/risorse), Dispatcher (REST + Web UI + creazione JobMaster per job) e JobMaster (gestisce un singolo job).

I TaskManagers eseguono gli operatori/task e scambiano/tamponano i flussi di dati. L’unità di scheduling più piccola è lo slot del task; più operatori possono essere eseguiti in un singolo slot (l’incatenamento degli operatori e la condivisione degli slot influenzano questo).

Incatenamento degli operatori e slot dei task per il controllo delle prestazioni e dei costi

Flink incatena i sottotask degli operatori in task, dove ogni task viene eseguito da un singolo thread. Questo è descritto come un’ottimizzazione delle prestazioni che riduce il sovraccarico di trasferimento thread e buffering, aumentando il throughput e diminuendo la latenza.

Gli slot sono importanti operativamente perché sono l’unità di scheduling/isolamento delle risorse. Flink nota che ogni TaskManager può avere uno o più slot del task; gli slot riservano memoria gestita per slot, ma non isolano la CPU.

Elaborazione del tempo degli eventi, watermark e dati in ritardo

Flink supporta più nozioni di tempo—tempo degli eventi, tempo di ingestione, tempo di elaborazione—e usa i watermark per modellare il progresso nel tempo degli eventi.

Per lavorare con il tempo degli eventi, Flink ha bisogno di timestamp assegnati agli eventi e di watermark generati; la documentazione ufficiale “Generating Watermarks” spiega l’assegnazione dei timestamp e la generazione dei watermark come i blocchi costitutivi fondamentali, con WatermarkStrategy come modo standard per configurare strategie comuni.

Tolleranza ai guasti: checkpoint versus savepoint nei sistemi reali

Il checkpointing esiste perché “ogni funzione e operatore in Flink può essere con stato”; lo stato deve essere checkpointato per diventare tollerante ai guasti. I checkpoint abilitano il recupero sia dello stato che delle posizioni del flusso in modo che l’esecuzione possa riprendere con semantica senza guasti.

Flink è molto esplicito che i savepoint sono “un’immagine coerente dello stato di esecuzione di un job di streaming, creata tramite il meccanismo di checkpointing di Flink”, utilizzata per fermare-riprendere, forcare o aggiornare job. I savepoint risiedono su storage stabile (ad es. HDFS, S3).

La pagina ufficiale “Checkpoints vs Savepoints” inquadra la differenza come backup vs log di recupero: i checkpoint sono frequenti, leggeri e gestiti da Flink per il recupero da guasti; i savepoint sono gestiti dall’utente e usati per operazioni controllate come aggiornamenti.

Il runtime open-source Flink è “gratuito” in senso di licenza, ma in produzione paghi per l’infrastruttura e lo sforzo operativo.

Flink è progettato per integrarsi con i gestori di risorse comuni (ad es. YARN e Kubernetes) e può anche essere eseguito come cluster standalone o come libreria.

I costi di calcolo e memoria sono guidati da JobManager e TaskManagers, e dal tuo layout di parallelismo/slot. La documentazione di configurazione di Flink cita esplicitamente jobmanager.memory.process.size, taskmanager.memory.process.size, taskmanager.numberOfTaskSlots e parallelism.default come manopole principali per setup distribuiti.

Il disco locale è un costo nascosto frequente per job con stato. Flink nota che io.tmp.dirs memorizza dati locali inclusi file RocksDB, risultati intermedi riversati e JAR cacheati; se questi dati vengono eliminati, possono forzare “un’operazione di recupero pesante”, quindi dovrebbero risiedere su storage che non viene periodicamente purgato.

Il costo di storage di oggetti/file durevoli è guidato dalle directory dei checkpoint/savepoint. Nella configurazione Flink 2.x, i checkpoint e i savepoint sono configurati tramite execution.checkpointing.dir e execution.checkpointing.savepoint-dir e accettano URI come s3://… o hdfs://….

I servizi gestiti riducono i costi operativi ma aggiungono costi della piattaforma e vincoli. I dettagli dipendono dal provider.

Amazon Managed Service for Apache Flink fattura per KPU (1 vCPU + 4 GB di memoria per KPU) e addebita per durata e numero di KPU in incrementi di un secondo. AWS addebita anche un KPU aggiuntivo per “orchestrazione” per applicazione e costi separati per storage/backup.

Confluent Cloud for Apache Flink è basato sull’uso e serverless: crei un pool di calcolo e vieni addebitato per i CFU consumati al minuto mentre le istruzioni sono in esecuzione. La pagina di fatturazione include un esempio di prezzo CFU di $0.21 per CFU-ora (dipendente dalla regione) e sottolinea che puoi limitare la spesa tramite massimi del pool di calcolo.

Aiven e Alibaba Cloud sono provider gestiti di Flink notevoli sul mercato, ma i loro prezzi e dettagli di fatturazione pubblici variano per piano/regione e possono richiedere calcolatori o contatto con le vendite; tratta i costi esatti come non specificati a meno che tu non richieda un preventivo per una regione+piano dalla loro documentazione attuale.

Ververica offre sia opzioni di deployment self-managed che gestite attorno a Flink; le pagine pubbliche enfatizzano le scelte di deployment e il posizionamento del servizio gestito, mentre i prezzi esatti sono tipicamente gestiti tramite flussi “contatto/dettagli prezzi” (quindi i numeri specifici sono spesso non specificati pubblicamente).

Opzione di deployment Ideale per Complessità operativa Benefici chiave Rischi / compromessi chiave
Cluster standalone (VM/bare metal) Team piccoli, capacità fissa Media–Alta Controllo completo; modello mentale più semplice HA, autoscaling, aggiornamenti sono DIY (più lavoro manuale)
Kubernetes con Flink Kubernetes Operator La maggior parte dei team di piattaforma moderni Media Deployment dichiarativi; gestione del ciclo di vita tramite control loop; l’operatore supporta deployment Application/Session/Job Richiede competenze Kubernetes + operatore
Kubernetes nativo (senza operatore) Team K8s che vogliono integrazione diretta Media–Alta Integrazione diretta delle risorse; allocazione/deallocazione dinamica dei TaskManager descritta nella documentazione Flink-on-K8s Automazione più personalizzata rispetto all’operatore
YARN Piattaforme centrate su Hadoop Media Si integra con la gestione risorse YARN Complessità dello stack Hadoop
AWS Managed Service for Apache Flink Stack dati nativi AWS Bassa–Media Orchestrazione gestita + opzioni di scaling; unità di fatturazione prevedibile (KPU) Accoppiamento alla piattaforma; costi extra per app KPU + storage
Confluent Cloud for Apache Flink Shop orientati a Kafka, app stream SQL-first Bassa Fatturazione serverless per uso; contabilità CFU-minuto; pool di calcolo per limitare la spesa Costi CFU + costi di rete Kafka; API specifiche del servizio
Offerte gestite Ververica Enterprise che hanno bisogno di esperti Flink per le operazioni Bassa–Media Posizionamento del servizio gestito da “esperti Flink” Prezzi spesso non trasparenti (non specificati)

Tabella dei provider gestiti e costi

I prezzi cambiano per regione e tempo; se hai bisogno di numeri esatti per la tua regione, tratta questo come un punto di partenza e verifica contro le pagine di prezzo attuali del provider (le regioni non citate sono non specificate).

Provider Forma del “Piano” Unità di fatturazione Esempio di prezzo calcolo Driver di costo aggiuntivi notevoli
Amazon Managed Service for Apache Flink Runtime gestito KPU (1 vCPU + 4 GB) Esempio mostrato: $0.11 per KPU-ora (US East N. Virginia) +1 KPU orchestrazione per app; storage in esecuzione; backup durevoli opzionali
Confluent Cloud for Apache Flink SQL/processing serverless CFU-minuto/CFU-ora Esempio mostrato: $0.21 per CFU-ora (la regione varia) Tassi di rete Kafka ancora applicabili; massimo pool di calcolo per limitare la spesa
Ververica (gestito) Piattaforma dati streaming unificata gestita Non specificato (pagine pubbliche) Non specificato Funzionalità piattaforma/SLA; prezzi tipicamente tramite vendite (non specificato)
Aiven for Apache Flink Servizio gestito Modello di fatturazione uso orario (piattaforma-wide) Non specificato senza piano/regione Tier del piano + regione cloud + add-on (non specificato)
Alibaba Cloud Realtime Compute for Apache Flink Gestito/serverless Fatturazione ibrida (pay-as-you-go + mix abbonamento) Non specificato senza dettagli regione/workspace Limiti basati su CU e modello workspace (dettagli variano; non specificato qui)

Flink si trova in un ecosistema affollato. La scelta “migliore” dipende dalla latenza, dallo stato, dalle preferenze operative e dal modello di autore.

Strumento Cos’è Modello di esecuzione streaming Storia stato & exactly-once Dove brilla Punti di dolore tipici
Apache Flink Motore di elaborazione stream distribuito per calcoli con stato Streaming continuo + tempo degli eventi tramite watermark Tolleranza ai guasti basata su checkpoint; savepoint per aggiornamenti controllati Pipeline con stato a bassa latenza; logica complessa del tempo degli eventi Operare stato, checkpoint e aggiornamenti correttamente richiede disciplina
Apache Spark Structured Streaming Motore streaming di Spark basato su DataFrames/Datasets Modello micro-batch predefinito (con una modalità continua discussa separatamente) Forte per pipeline analitiche; lo stato esiste ma spesso ha latenza più alta API unificate batch+stream; ecosistema Spark Latenza micro-batch e modello mentale “streaming come batch incrementali”
Kafka Streams Libreria per costruire app stream-processing su Kafka Elaborazione record-per-record Supporta semantica di elaborazione exactly-once (EOS) App native Kafka semplici; incorporare in servizio JVM Solo JVM; meno flessibile per pattern di calcolo distribuito su larga scala
Apache Beam Modello di programmazione unificato + SDK; eseguito tramite runner (Flink, Spark, Dataflow, ecc.) Dipende dal runner; pipeline Beam si traducono in job runner La semantica dipende dalla matrice di capacità del runner (specifico del runner) Portabilità, pipeline multi-lingua; evitare lock-in del motore Il tuning operativo finisce comunque per essere specifico del runner
Materialize “Livello dati live” / DB SQL streaming; aggiorna incrementalmente i risultati man mano che i dati arrivano Manutenzione continua di viste incrementali Affermazioni di forte consistenza nella documentazione del prodotto (dettagli specifici del prodotto) Servire viste derivate fresche a app/agenti AI Modello operativo diverso rispetto ai job Flink; non un runtime API operatore generale
RisingWave Database streaming dove l’elaborazione stream è espressa come viste materializzate Manutenzione continua di viste materializzate SQL-first; semantica specifica del motore App streaming centrate su SQL senza costruire job Flink Meno flessibile per pipeline pesanti in codice arbitrario

Un’euristica utile: se vuoi un runtime per job di streaming con stato complessi con controllo profondo sul tempo degli eventi, logica degli operatori e deployment, Flink è un candidato primario. Se vuoi viste incrementali SQL-first per il servizio, i database di streaming possono essere alternative. Se vuoi una libreria incorporata in un servizio, Kafka Streams è competitivo. Se vuoi una definizione di pipeline portabile tra motori, Beam è convincente.

Per architetture event-driven native cloud che usano AWS, Building Event-Driven Microservices with AWS Kinesis copre i pattern Kinesis Data Streams per l’elaborazione in tempo reale e il disaccoppiamento dei servizi.

Questa sezione è intenzionalmente pratica: configurazione, deployment e come i tuoi servizi Go/Python interagiscono tipicamente con Flink.

Go services + Kafka + Flink + serving layer

Flink è spesso il “mezzo con stato” che trasforma eventi ad alto volume in segnali durevoli (contatori, sessioni, anomalie, record arricchiti). I checkpoint e i backend dello stato sono ciò che rende quel mezzo affidabile in produzione.

Nota importante sulla versione: a partire da Flink 2.0, il file di configurazione supportato è conf/config.yaml; il precedente flink-conf.yaml non è “più supportato”.

Un conf/config.yaml minimale (illustrativo) per un piccolo cluster self-managed:

# conf/config.yaml (stile Flink 2.x)
rest:
  address: flink-jobmanager.example.internal
  port: 8081

jobmanager:
  rpc:
    address: flink-jobmanager.example.internal
    port: 6123
  memory:
    process:
      size: 2048m

taskmanager:
  memory:
    process:
      size: 4096m
  numberOfTaskSlots: 2

parallelism:
  default: 2

# Impostazioni predefinite di checkpointing (i job possono ancora sovrascrivere nel codice)
state:
  backend:
    type: rocksdb
execution:
  checkpointing:
    dir: s3://my-bucket/flink/checkpoints
    savepoint-dir: s3://my-bucket/flink/savepoints
    interval: 60 s

# Evita tmp dirs che vengono purgati (file RocksDB, JAR cacheati, ecc.)
io:
  tmp:
    dirs: ["/var/lib/flink/tmp"]

Perché queste chiavi: il riferimento alla configurazione di Flink documenta esplicitamente i dettagli di scoperta rest.* e jobmanager.rpc.*, le chiavi della memoria del processo, le chiavi slot/parallelismo e le impostazioni predefinite di checkpoint, inclusi state.backend.type, execution.checkpointing.dir, execution.checkpointing.savepoint-dir e execution.checkpointing.interval.

La scelta di io.tmp.dirs è operativamente importante perché Flink la usa per file locali RocksDB e artefatti cacheati; eliminare può causare un recupero pesante.

Se sei su Flink 1.x (ancora comune in alcuni ambienti gestiti), troverai flink-conf.yaml in giro. Questo è legacy per gli utenti Flink 2.x.

# conf/flink-conf.yaml (stile legacy 1.x; NON supportato in Flink 2.x)
jobmanager.rpc.address: flink-jobmanager
rest.port: 8081
taskmanager.numberOfTaskSlots: 2
parallelism.default: 2

# Le chiavi di checkpoint legacy variano per versione; trattale come illustrative.
state.backend.type: rocksdb
state.checkpoints.dir: s3://my-bucket/flink/checkpoints
state.savepoints.dir: s3://my-bucket/flink/savepoints

Se stai migrando, Flink fornisce uno script di migrazione (bin/migrate-config-file.sh) per convertire flink-conf.yaml in config.yaml.

L’operatore Kubernetes di Flink agisce come piano di controllo per la gestione del ciclo di vita delle applicazioni Flink e viene installato usando Helm.

Dalla documentazione Helm ufficiale dell’operatore, puoi installare sia dal chart nell’albero sorgente, sia dal repository chart ospitato da Apache:

# install from bundled chart in source tree
helm install flink-kubernetes-operator helm/flink-kubernetes-operator

# install from Apache downloads Helm repository (replace <OPERATOR-VERSION>)
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-<OPERATOR-VERSION>/
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator

Questi comandi esatti sono mostrati nella documentazione di installazione Helm dell’operatore.

Esempio CR FlinkDeployment (illustrativo)

Questo è un esempio semplificato per mostrare i punti di integrazione che personalizzerai tipicamente (immagine, risorse, posizioni dei checkpoint, logging/metriche). L’operatore riconcilia questo stato desiderato tramite il suo control loop.

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: realtime-sessions
  namespace: flink
spec:
  image: my-registry.example.com/flink/realtime-sessions:2026-03-06
  flinkVersion: v2_2
  serviceAccount: flink
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.backend.type: "rocksdb"
    execution.checkpointing.dir: "s3://my-bucket/flink/checkpoints/realtime-sessions"
    execution.checkpointing.savepoint-dir: "s3://my-bucket/flink/savepoints/realtime-sessions"
    execution.checkpointing.interval: "60 s"
    rest.port: "8081"
  jobManager:
    resource:
      cpu: 1
      memory: "2048m"
  taskManager:
    resource:
      cpu: 2
      memory: "4096m"
  job:
    jarURI: local:///opt/flink/usrlib/realtime-sessions.jar
    parallelism: 4
    upgradeMode: savepoint
    state: running

Il pattern upgradeMode: savepoint è comune quando vuoi aggiornamenti con stato sicuri; i savepoint sono progettati per flussi di lavoro stop/riprendi/fork/aggiorna e puntano a posizioni di storage stabili.

PyFlink è l’API Python per Apache Flink ed è esplicitamente proposta per carichi di lavoro batch/stream scalabili inclusi pipeline ML e ETL.

Quando usi connettori JVM (Kafka, JDBC, ecc.) da PyFlink, devi assicurarti che i JAR pertinenti siano disponibili per il job. La documentazione Python “Dependency Management” di Flink mostra tre meccanismi standard:

Impostare pipeline.jars (Table API), chiamare add_jars() (DataStream API) o CLI --jarfile al momento della sottomissione.

Questo esempio legge eventi JSON da Kafka, assegna timestamp del tempo degli eventi (con out-of-orderness limitato), mantiene un conteggio rotolante per utente nello stato chiave e scrive un evento arricchito in un topic di output.

Note:

  • KafkaSource viene costruito tramite KafkaSource.builder() e richiede bootstrap servers, topic e deserializzatore.
  • La configurazione del sink Kafka exactly-once in PyFlink richiede di impostare la garanzia di consegna e un prefisso ID transazionale.
  • I valori predefiniti di checkpoint possono essere configurati nella configurazione Flink (execution.checkpointing.*) e/o nel codice; le chiavi di configurazione sono documentate nel riferimento alla configurazione di Flink.
import json
from typing import Any

from pyflink.common import Duration, Types
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.watermark_strategy import WatermarkStrategy, TimestampAssigner
from pyflink.common.configuration import Configuration

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
from pyflink.datastream.state import ValueStateDescriptor

from pyflink.datastream.connectors import DeliveryGuarantee
from pyflink.datastream.connectors.kafka import (
    KafkaSource,
    KafkaOffsetsInitializer,
    KafkaSink,
    KafkaRecordSerializationSchema,
)


class EventTimeFromJson(TimestampAssigner):
    """
    Estrae event_time_ms dal payload JSON.
    Aspettativa: {"user_id":"u1","event_time_ms":1710000000000,"event":"click",...}
    """
    def extract_timestamp(self, value: str, record_timestamp: int) -> int:
        try:
            obj = json.loads(value)
            return int(obj["event_time_ms"])
        except Exception:
            # fallback: usa timestamp del record (ingestione) se malformato
            return record_timestamp


class RollingCount(KeyedProcessFunction):
    def open(self, runtime_context: RuntimeContext):
        desc = ValueStateDescriptor("rolling_count", Types.LONG())
        self.count_state = runtime_context.get_state(desc)

    def process_element(self, value: str, ctx: 'KeyedProcessFunction.Context'):
        obj = json.loads(value)
        current = self.count_state.value()
        if current is None:
            current = 0
        current += 1
        self.count_state.update(current)

        # emetti evento arricchito
        obj["rolling_count"] = current
        obj["event_time_ms"] = int(obj.get("event_time_ms", 0))
        yield json.dumps(obj)


def build_env() -> StreamExecutionEnvironment:
    # Default cluster/job (possono anche essere impostati in config.yaml)
    cfg = Configuration()
    cfg.set_string("state.backend.type", "rocksdb")
    cfg.set_string("execution.checkpointing.dir", "s3://my-bucket/flink/checkpoints/realtime-sessions")
    cfg.set_string("execution.checkpointing.interval", "60 s")
    env = StreamExecutionEnvironment.get_execution_environment(cfg)

    # In PyFlink, i jar del connettore devono essere disponibili; usa env.add_jars(...) se necessario.
    # env.add_jars("file:///opt/flink/lib/flink-connector-kafka-<VERSION>.jar")

    # Abilita checkpointing esplicitamente anche (i job possono sovrascrivere i default)
    env.enable_checkpointing(60_000)
    env.set_parallelism(4)
    return env


def main():
    env = build_env()

    source = (
        KafkaSource.builder()
        .set_bootstrap_servers("kafka:9092")
        .set_topics("events.raw")
        .set_group_id("realtime-sessions-v1")
        .set_value_only_deserializer(SimpleStringSchema())
        .set_starting_offsets(KafkaOffsetsInitializer.earliest())
        .build()
    )

    watermark_strategy = (
        WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(10))
        .with_timestamp_assigner(EventTimeFromJson())
    )

    stream = (
        env.from_source(source, watermark_strategy=watermark_strategy, source_name="kafka-events-raw")
        .key_by(lambda s: json.loads(s)["user_id"])
        .process(RollingCount(), output_type=Types.STRING())
    )

    record_serializer = (
        KafkaRecordSerializationSchema.builder()
        .set_topic("events.enriched")
        .set_value_serialization_schema(SimpleStringSchema())
        .build()
    )

    sink = (
        KafkaSink.builder()
        .set_bootstrap_servers("kafka:9092")
        .set_record_serializer(record_serializer)
        .set_delivery_guarantee(DeliveryGuarantee.EXACTLY_ONCE)
        .set_transactional_id_prefix("realtime-sessions-txn")
        .build()
    )

    stream.sink_to(sink)

    env.execute("realtime-sessions-pyflink")


if __name__ == "__main__":
    main()

Le chiamate API sopra si allineano con il modello di utilizzo del builder KafkaSource di PyFlink e i campi richiesti. Per le garanzie di consegna, la documentazione KafkaSinkBuilder di PyFlink dice esplicitamente che per DeliveryGuarantee.EXACTLY_ONCE devi impostare il prefisso ID transazionale. Per timestamping/watermarking, la documentazione watermark di Flink spiega l’assegnazione dei timestamp e la generazione dei watermark come il meccanismo per elaborare il tempo degli eventi, e PyFlink fornisce un’API WatermarkStrategy che specchia questo modello.

Go non ha un’API di autore job Flink nativa come Java/Python, quindi i sistemi Go tipicamente si integrano con Flink tramite:

  • Kafka (o altri broker) come ingestione/egress.
  • L’API REST di Flink per azioni operative (upload JAR, avvio job, interrogazione stato job, attivazione savepoint, ridimensionamento).

Per la configurazione Kafka e pattern di sviluppo locale, consulta Apache Kafka Quickstart - Install Kafka 4.2 with CLI and Local Examples.

Esempio produttore/consumatore Kafka Go (kafka-go)

package main

import (
	"context"
	"log"
	"time"

	"github.com/segmentio/kafka-go"
)

func main() {
	ctx := context.Background()

	// Produttore: scrivi eventi grezzi
	writer := &kafka.Writer{
		Addr:         kafka.TCP("kafka:9092"),
		Topic:        "events.raw",
		RequiredAcks: kafka.RequireAll,
	}
	defer writer.Close()

	err := writer.WriteMessages(ctx, kafka.Message{
		Key:   []byte("user:u1"),
		Value: []byte(`{"user_id":"u1","event_time_ms":1710000000000,"event":"click"}`),
		Time:  time.Now(),
	})
	if err != nil {
		log.Fatal(err)
	}

	// Consumatore: leggi eventi arricchiti
	reader := kafka.NewReader(kafka.ReaderConfig{
		Brokers:  []string{"kafka:9092"},
		Topic:    "events.enriched",
		GroupID:  "go-debug-consumer",
		MinBytes: 1e3,
		MaxBytes: 10e6,
	})
	defer reader.Close()

	msg, err := reader.ReadMessage(ctx)
	if err != nil {
		log.Fatal(err)
	}
	log.Printf("enriched key=%s value=%s\n", string(msg.Key), string(msg.Value))
}

Questo è codice “di collegamento”, ma è la superficie di integrazione pratica più comune: i topic Kafka sono il confine tra Flink e servizi personalizzati.

L’API REST di Flink fa parte del server web JobManager e ascolta sulla porta 8081 di default (configurabile tramite rest.port).

La specifica OpenAPI ufficiale per il dispatcher include /jars/upload e dichiara esplicitamente:

  • L’upload JAR deve essere inviato come dati multi-part
  • assicurati che l’header Content-Type sia impostato su application/x-java-archive
  • fornisce un esempio curl usando -F jarfile=@path/to/flink-job.jar

Uno snippet Go pratico per caricare un JAR:

package flink

import (
	"bytes"
	"context"
	"fmt"
	"io"
	"mime/multipart"
	"net/http"
	"os"
)

func UploadJar(ctx context.Context, flinkBaseURL, jarPath string) (*http.Response, error) {
	f, err := os.Open(jarPath)
	if err != nil {
		return nil, err
	}
	defer f.Close()

	var body bytes.Buffer
	w := multipart.NewWriter(&body)

	part, err := w.CreateFormFile("jarfile", "job.jar")
	if err != nil {
		return nil, err
	}
	if _, err := io.Copy(part, f); err != nil {
		return nil, err
	}
	if err := w.Close(); err != nil {
		return nil, err
	}

	req, err := http.NewRequestWithContext(ctx, http.MethodPost, flinkBaseURL+"/jars/upload", &body)
	if err != nil {
		return nil, err
	}

	// Importante: boundary multipart
	req.Header.Set("Content-Type", w.FormDataContentType())

	// Alcuni client impostano anche "Expect:" simile all'esempio curl nella spec.
	req.Header.Set("Expect", "")

	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		return nil, err
	}
	if resp.StatusCode >= 300 {
		return resp, fmt.Errorf("upload failed: %s", resp.Status)
	}
	return resp, nil
}

Questo codice è guidato dalla descrizione OpenAPI REST API per /jars/upload inclusi i requisiti multipart e il riferimento curl.

Per eseguire un JAR precedentemente caricato, Flink espone /jars/{jarid}/run e supporta il passaggio di argomenti del programma tramite parametri di query (e/o corpo JSON).

Endpoint operativamente preziosi che probabilmente automatizzerai:

  • /jobs e /jobs/{jobid} per elencare e ispezionare lo stato del job
  • /jobs/{jobid}/savepoints per attivare savepoint (trigger asincrono + polling)
  • /jobs/{jobid}/rescaling per attivare ridimensionamento
Preoccupazione PyFlink (job Python) Go (servizi attorno a Flink)
Autore della logica Flink Autore nativo tramite API DataStream/Table; supporta stato + timer Nessuna API Flink nativa; implementa la logica in Flink (Java/Python) e integra esternamente
Connettori/dipendenze Devi inviare JAR connettore tramite pipeline.jars, add_jars o --jarfile Non applicabile (non stai eseguendo dentro Flink), ma gestisci client Kafka/DB
Ingestione/egress Costruttori KafkaSource/KafkaSink in PyFlink Librerie produttore/consumatore Kafka; pattern microservizi standard
Automazione Ops Può chiamare anche endpoint REST Flink Spesso possiede l’automazione: upload JAR, deployment, ridimensionamento, trigger savepoint via REST

Flink supporta l’esportazione delle metriche configurando metric reporter nel file di configurazione di Flink; questi reporter vengono istanziati su JobManager e TaskManagers.

Per Prometheus, Flink espone metriche in formato Prometheus quando configurato con metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory in un ambiente Flink supportato.

Generalmente combini questo con Kubernetes ServiceMonitors (Prometheus Operator) o con il tuo stack di monitoraggio gestito.

Scaling: parallelismo, slot e autoscaling basato su operatori

Il modello di scheduling di Flink definisce le risorse di esecuzione tramite slot del task, e ogni slot può eseguire una pipeline di task paralleli.

Per il scaling manuale, l’API REST fornisce un endpoint di ridimensionamento per un job (/jobs/{jobid}/rescaling) come operazione asincrona.

Se sei su Kubernetes con l’operatore Kubernetes di Flink, il progetto operatore annuncia un “Flink Job Autoscaler” come parte del suo set di funzionalità, che vale la pena valutare se i tuoi carichi di lavoro variano sostanzialmente.

Backup e aggiornamenti sicuri: checkpoint e savepoint

I checkpoint sono per il recupero automatizzato e sono gestiti da Flink; i savepoint sono per operazioni del ciclo di vita guidate dall’utente (stop/riprendi/fork/aggiorna).

Da una prospettiva SRE:

  • Usa i checkpoint per “mantenere il pipeline in esecuzione attraverso i guasti”.
  • Usa i savepoint per “distribuire una nuova versione senza perdere lo stato”.

L’API REST di Flink supporta anche l’attivazione di savepoint in modo asincrono, utile per flussi di lavoro “deploy → trigger savepoint → aggiorna” stile GitOps.

CI/CD: GitOps + Helm + sottomissione job REST

Per Kubernetes:

  • Mantieni l’installazione dell’operatore e i tuoi CR FlinkDeployment in Git, distribuisci tramite Argo CD/Flux e versiona le immagini dei container per build. La documentazione Helm dell’operatore discute esplicitamente “Working with Argo CD”.

Per cluster standalone/session:

  • Usa gli endpoint di upload e run JAR dell’API REST Flink per deployment di artefatti immutabili.

Nota anche un interruttore di sicurezza/ops sottile ma prezioso: web.submit.enable governa gli upload tramite la Web UI, ma la documentazione nota che anche quando disabilitato, i cluster session accettano ancora le sottomissioni dei job tramite richieste REST; questo è rilevante quando si rafforzano le superfici UI mantenendo l’automazione CI/CD.

I sistemi LLM sono spesso buoni solo quanto il loro contesto in tempo reale. Flink si inserisce negli stack LLM/AI come il componente che produce feature, embedding e aggregati comportamentali “sempre freschi”.

Un pattern comune è:

  • ingerire azioni/eventi utente,
  • aggregare sessioni e preferenze,
  • produrre task di generazione embedding,
  • scrivere embedding in un vector store e/o feature store.

La documentazione sulla gestione delle dipendenze di PyFlink cita esplicitamente “previsione machine learning” e il caricamento di modelli ML all’interno di UDF Python (per esecuzione su cluster remoti), che si mappa direttamente agli approcci “inferenza online all’interno degli operatori Flink”.

Aggiornamenti feature store online per raccomandazione e ranking

Il modello di stato chiave e checkpointing di Flink è costruito per mantenere lo stato dell’operatore attraverso gli eventi e recuperarlo in modo affidabile. Questo è una corrispondenza naturale per il calcolo continuo delle feature (tassi rotolanti, conteggi, metriche decadute nel tempo) di cui i recommender a valle hanno bisogno.

Compromessi pratici di latenza/consistenza per pipeline AI

Se la tua architettura richiede semantica exactly-once end-to-end (ad es. evitare aggiornamenti feature duplicati o eventi di fatturazione duplicati), strutturerai sink e sorgenti attorno al checkpointing e garanzie transazionali.

Nello specifico degli stack basati su Kafka:

  • Il connettore Kafka di Flink può fornire garanzie exactly-once quando il checkpointing è abilitato e le opzioni di garanzia di consegna sono configurate.
  • Kafka Streams supporta anche semantica exactly-once (EOS), che è rilevante se il tuo “pipeline feature AI” è abbastanza piccolo da risiedere nel codice dell’applicazione piuttosto che in un cluster Flink.

Flink as the real-time AI context builder

Questo diagramma si basa sui primitivi fondamentali di Flink: elaborazione del tempo degli eventi (watermark), backend dello stato (state.backend.type e stato locale gestito dal sistema) e meccanismi di checkpoint/savepoint per la tolleranza ai guasti e le operazioni.