Apache Flink su K8s e Kafka: PyFlink, Go, operazioni e prezzi gestiti
Streaming con stato, checkpoint, K8s, PyFlink, Go.
Apache Flink è un framework per elaborazioni con stato su flussi di dati limitati e illimitati.
I team lo adottano per lo streaming corretto e a bassa latenza con semantica del tempo degli eventi (watermark), tolleranza ai guasti (checkpoint), aggiornamenti controllati (savepoint) e superfici operative (metriche e REST).

Questa guida è rivolta agli ingegneri DevOps e agli sviluppatori Go/Python. Confronta i modelli di deployment (self-managed vs gestiti), spiega l’architettura di base, copre le configurazioni Kubernetes (Helm e Operator) e standalone, confronta Flink con Spark, Kafka Streams, Beam e database di streaming, e mostra i pattern di integrazione PyFlink e Go, inclusi i pipeline orientati a LLM e AI.
Per un contesto più ampio sui pattern di infrastruttura dati, tra cui storage a oggetti, database e messaggistica, consulta Data Infrastructure for AI Systems: Object Storage, Databases, Search & AI Data Architecture.
Cos’è Apache Flink e perché i team lo usano per l’elaborazione in tempo reale
Apache Flink è esplicitamente posizionato come motore di stream processing con stato: modelli la tua logica come un pipeline di operatori e Flink lo esegue come un dataflow distribuito con stato gestito e semantica del tempo. Nella documentazione moderna di Flink, il progetto si descrive come un framework e un motore di elaborazione distribuito per calcoli con stato su flussi di dati illimitati e limitati.
Da una prospettiva pratica di ingegneria DevOps/software, Flink è una buona scelta quando hai bisogno di almeno una di queste proprietà:
Se hai bisogno di join/aggregazione/arricchimento a bassa latenza con garanzie di correttezza, utilizzi tipicamente l’elaborazione del tempo degli eventi di Flink, dove il “tempo” è quando l’evento è accaduto (non quando è arrivato), e i watermark comunicano il progresso del tempo degli eventi attraverso il pipeline.
Se hai bisogno di calcolo con stato su larga scala (contatori rotolanti, sessioni, regole antifrode, feature engineering), Flink tratta lo stato come una parte di primo piano del modello di programmazione e lo rende tollerante ai guasti tramite il checkpointing.
Se hai bisogno di streaming operativamente robusto (guasti, aggiornamenti rotolanti, riavvii), Flink fa il checkpoint dello stato e delle posizioni del flusso in modo che il job possa riprendersi e continuare con le stesse semantica “come un’esecuzione senza guasti”.
Casi d’uso tipici per team DevOps, Go, Python e AI
Flink è ampiamente usato per “pipeline dati & ETL”, “streaming analytics” e “applicazioni event-driven” (le categorie usate dalla documentazione di Flink).
Per uno stack DevOps + Go/Python, i pattern tipici sono questi:
Un servizio Go produce eventi su Kafka; Flink consuma quegli eventi, esegue un’elaborazione con stato (ad es. deduplicazione, aggregazione a finestra, arricchimento) e scrive i fatti derivati di nuovo su Kafka o in un database. I meccanismi di operatori e checkpointing di Flink esistono per rendere questi pipeline con stato sicuri per la produzione.
Per i team ML/LLM, PyFlink cita esplicitamente scenari come “previsione machine learning” e il caricamento di modelli di machine learning all’interno di UDF Python come motivazione per la gestione delle dipendenze, che è un’endorsement diretto dei pattern “job Flink come runtime per inferenza online / feature engineering”.
Architettura e funzionalità principali di Apache Flink
Architettura del cluster Apache Flink per deployment in produzione
Il runtime di Flink consiste in due tipi di processi: JobManager e TaskManagers. La documentazione sottolinea che i client inviano il dataflow al JobManager; il client può poi disconnettersi (modalità detached) o rimanere connesso (modalità attached).
Il JobManager coordina l’esecuzione distribuita: scheduling, reazione al completamento/failure dei task, coordinamento dei checkpoint e coordinamento del recupero. Internamente, include: ResourceManager (slot/risorse), Dispatcher (REST + Web UI + creazione JobMaster per job) e JobMaster (gestisce un singolo job).
I TaskManagers eseguono gli operatori/task e scambiano/tamponano i flussi di dati. L’unità di scheduling più piccola è lo slot del task; più operatori possono essere eseguiti in un singolo slot (l’incatenamento degli operatori e la condivisione degli slot influenzano questo).
Incatenamento degli operatori e slot dei task per il controllo delle prestazioni e dei costi
Flink incatena i sottotask degli operatori in task, dove ogni task viene eseguito da un singolo thread. Questo è descritto come un’ottimizzazione delle prestazioni che riduce il sovraccarico di trasferimento thread e buffering, aumentando il throughput e diminuendo la latenza.
Gli slot sono importanti operativamente perché sono l’unità di scheduling/isolamento delle risorse. Flink nota che ogni TaskManager può avere uno o più slot del task; gli slot riservano memoria gestita per slot, ma non isolano la CPU.
Elaborazione del tempo degli eventi, watermark e dati in ritardo
Flink supporta più nozioni di tempo—tempo degli eventi, tempo di ingestione, tempo di elaborazione—e usa i watermark per modellare il progresso nel tempo degli eventi.
Per lavorare con il tempo degli eventi, Flink ha bisogno di timestamp assegnati agli eventi e di watermark generati; la documentazione ufficiale “Generating Watermarks” spiega l’assegnazione dei timestamp e la generazione dei watermark come i blocchi costitutivi fondamentali, con WatermarkStrategy come modo standard per configurare strategie comuni.
Tolleranza ai guasti: checkpoint versus savepoint nei sistemi reali
Il checkpointing esiste perché “ogni funzione e operatore in Flink può essere con stato”; lo stato deve essere checkpointato per diventare tollerante ai guasti. I checkpoint abilitano il recupero sia dello stato che delle posizioni del flusso in modo che l’esecuzione possa riprendere con semantica senza guasti.
Flink è molto esplicito che i savepoint sono “un’immagine coerente dello stato di esecuzione di un job di streaming, creata tramite il meccanismo di checkpointing di Flink”, utilizzata per fermare-riprendere, forcare o aggiornare job. I savepoint risiedono su storage stabile (ad es. HDFS, S3).
La pagina ufficiale “Checkpoints vs Savepoints” inquadra la differenza come backup vs log di recupero: i checkpoint sono frequenti, leggeri e gestiti da Flink per il recupero da guasti; i savepoint sono gestiti dall’utente e usati per operazioni controllate come aggiornamenti.
Opzioni di deployment e piani di prezzo di Apache Flink
Opzione Apache Flink gratuita/self-managed
Il runtime open-source Flink è “gratuito” in senso di licenza, ma in produzione paghi per l’infrastruttura e lo sforzo operativo.
Flink è progettato per integrarsi con i gestori di risorse comuni (ad es. YARN e Kubernetes) e può anche essere eseguito come cluster standalone o come libreria.
Driver dei costi self-managed per Apache Flink
I costi di calcolo e memoria sono guidati da JobManager e TaskManagers, e dal tuo layout di parallelismo/slot. La documentazione di configurazione di Flink cita esplicitamente jobmanager.memory.process.size, taskmanager.memory.process.size, taskmanager.numberOfTaskSlots e parallelism.default come manopole principali per setup distribuiti.
Il disco locale è un costo nascosto frequente per job con stato. Flink nota che io.tmp.dirs memorizza dati locali inclusi file RocksDB, risultati intermedi riversati e JAR cacheati; se questi dati vengono eliminati, possono forzare “un’operazione di recupero pesante”, quindi dovrebbero risiedere su storage che non viene periodicamente purgato.
Il costo di storage di oggetti/file durevoli è guidato dalle directory dei checkpoint/savepoint. Nella configurazione Flink 2.x, i checkpoint e i savepoint sono configurati tramite execution.checkpointing.dir e execution.checkpointing.savepoint-dir e accettano URI come s3://… o hdfs://….
Piani gestiti di Apache Flink e modelli di fatturazione tipici
I servizi gestiti riducono i costi operativi ma aggiungono costi della piattaforma e vincoli. I dettagli dipendono dal provider.
Amazon Managed Service for Apache Flink fattura per KPU (1 vCPU + 4 GB di memoria per KPU) e addebita per durata e numero di KPU in incrementi di un secondo. AWS addebita anche un KPU aggiuntivo per “orchestrazione” per applicazione e costi separati per storage/backup.
Confluent Cloud for Apache Flink è basato sull’uso e serverless: crei un pool di calcolo e vieni addebitato per i CFU consumati al minuto mentre le istruzioni sono in esecuzione. La pagina di fatturazione include un esempio di prezzo CFU di $0.21 per CFU-ora (dipendente dalla regione) e sottolinea che puoi limitare la spesa tramite massimi del pool di calcolo.
Aiven e Alibaba Cloud sono provider gestiti di Flink notevoli sul mercato, ma i loro prezzi e dettagli di fatturazione pubblici variano per piano/regione e possono richiedere calcolatori o contatto con le vendite; tratta i costi esatti come non specificati a meno che tu non richieda un preventivo per una regione+piano dalla loro documentazione attuale.
Ververica offre sia opzioni di deployment self-managed che gestite attorno a Flink; le pagine pubbliche enfatizzano le scelte di deployment e il posizionamento del servizio gestito, mentre i prezzi esatti sono tipicamente gestiti tramite flussi “contatto/dettagli prezzi” (quindi i numeri specifici sono spesso non specificati pubblicamente).
Tabella delle opzioni di deployment per Apache Flink in produzione
| Opzione di deployment | Ideale per | Complessità operativa | Benefici chiave | Rischi / compromessi chiave |
|---|---|---|---|---|
| Cluster standalone (VM/bare metal) | Team piccoli, capacità fissa | Media–Alta | Controllo completo; modello mentale più semplice | HA, autoscaling, aggiornamenti sono DIY (più lavoro manuale) |
| Kubernetes con Flink Kubernetes Operator | La maggior parte dei team di piattaforma moderni | Media | Deployment dichiarativi; gestione del ciclo di vita tramite control loop; l’operatore supporta deployment Application/Session/Job | Richiede competenze Kubernetes + operatore |
| Kubernetes nativo (senza operatore) | Team K8s che vogliono integrazione diretta | Media–Alta | Integrazione diretta delle risorse; allocazione/deallocazione dinamica dei TaskManager descritta nella documentazione Flink-on-K8s | Automazione più personalizzata rispetto all’operatore |
| YARN | Piattaforme centrate su Hadoop | Media | Si integra con la gestione risorse YARN | Complessità dello stack Hadoop |
| AWS Managed Service for Apache Flink | Stack dati nativi AWS | Bassa–Media | Orchestrazione gestita + opzioni di scaling; unità di fatturazione prevedibile (KPU) | Accoppiamento alla piattaforma; costi extra per app KPU + storage |
| Confluent Cloud for Apache Flink | Shop orientati a Kafka, app stream SQL-first | Bassa | Fatturazione serverless per uso; contabilità CFU-minuto; pool di calcolo per limitare la spesa | Costi CFU + costi di rete Kafka; API specifiche del servizio |
| Offerte gestite Ververica | Enterprise che hanno bisogno di esperti Flink per le operazioni | Bassa–Media | Posizionamento del servizio gestito da “esperti Flink” | Prezzi spesso non trasparenti (non specificati) |
Tabella dei provider gestiti e costi
I prezzi cambiano per regione e tempo; se hai bisogno di numeri esatti per la tua regione, tratta questo come un punto di partenza e verifica contro le pagine di prezzo attuali del provider (le regioni non citate sono non specificate).
| Provider | Forma del “Piano” | Unità di fatturazione | Esempio di prezzo calcolo | Driver di costo aggiuntivi notevoli |
|---|---|---|---|---|
| Amazon Managed Service for Apache Flink | Runtime gestito | KPU (1 vCPU + 4 GB) | Esempio mostrato: $0.11 per KPU-ora (US East N. Virginia) | +1 KPU orchestrazione per app; storage in esecuzione; backup durevoli opzionali |
| Confluent Cloud for Apache Flink | SQL/processing serverless | CFU-minuto/CFU-ora | Esempio mostrato: $0.21 per CFU-ora (la regione varia) | Tassi di rete Kafka ancora applicabili; massimo pool di calcolo per limitare la spesa |
| Ververica (gestito) | Piattaforma dati streaming unificata gestita | Non specificato (pagine pubbliche) | Non specificato | Funzionalità piattaforma/SLA; prezzi tipicamente tramite vendite (non specificato) |
| Aiven for Apache Flink | Servizio gestito | Modello di fatturazione uso orario (piattaforma-wide) | Non specificato senza piano/regione | Tier del piano + regione cloud + add-on (non specificato) |
| Alibaba Cloud Realtime Compute for Apache Flink | Gestito/serverless | Fatturazione ibrida (pay-as-you-go + mix abbonamento) | Non specificato senza dettagli regione/workspace | Limiti basati su CU e modello workspace (dettagli variano; non specificato qui) |
Confronto Apache Flink vs competitor
Flink si trova in un ecosistema affollato. La scelta “migliore” dipende dalla latenza, dallo stato, dalle preferenze operative e dal modello di autore.
Tabella di confronto competitor: Flink vs Spark vs Kafka Streams vs Beam e opzioni più recenti
| Strumento | Cos’è | Modello di esecuzione streaming | Storia stato & exactly-once | Dove brilla | Punti di dolore tipici |
|---|---|---|---|---|---|
| Apache Flink | Motore di elaborazione stream distribuito per calcoli con stato | Streaming continuo + tempo degli eventi tramite watermark | Tolleranza ai guasti basata su checkpoint; savepoint per aggiornamenti controllati | Pipeline con stato a bassa latenza; logica complessa del tempo degli eventi | Operare stato, checkpoint e aggiornamenti correttamente richiede disciplina |
| Apache Spark Structured Streaming | Motore streaming di Spark basato su DataFrames/Datasets | Modello micro-batch predefinito (con una modalità continua discussa separatamente) | Forte per pipeline analitiche; lo stato esiste ma spesso ha latenza più alta | API unificate batch+stream; ecosistema Spark | Latenza micro-batch e modello mentale “streaming come batch incrementali” |
| Kafka Streams | Libreria per costruire app stream-processing su Kafka | Elaborazione record-per-record | Supporta semantica di elaborazione exactly-once (EOS) | App native Kafka semplici; incorporare in servizio JVM | Solo JVM; meno flessibile per pattern di calcolo distribuito su larga scala |
| Apache Beam | Modello di programmazione unificato + SDK; eseguito tramite runner (Flink, Spark, Dataflow, ecc.) | Dipende dal runner; pipeline Beam si traducono in job runner | La semantica dipende dalla matrice di capacità del runner (specifico del runner) | Portabilità, pipeline multi-lingua; evitare lock-in del motore | Il tuning operativo finisce comunque per essere specifico del runner |
| Materialize | “Livello dati live” / DB SQL streaming; aggiorna incrementalmente i risultati man mano che i dati arrivano | Manutenzione continua di viste incrementali | Affermazioni di forte consistenza nella documentazione del prodotto (dettagli specifici del prodotto) | Servire viste derivate fresche a app/agenti AI | Modello operativo diverso rispetto ai job Flink; non un runtime API operatore generale |
| RisingWave | Database streaming dove l’elaborazione stream è espressa come viste materializzate | Manutenzione continua di viste materializzate | SQL-first; semantica specifica del motore | App streaming centrate su SQL senza costruire job Flink | Meno flessibile per pipeline pesanti in codice arbitrario |
Un’euristica utile: se vuoi un runtime per job di streaming con stato complessi con controllo profondo sul tempo degli eventi, logica degli operatori e deployment, Flink è un candidato primario. Se vuoi viste incrementali SQL-first per il servizio, i database di streaming possono essere alternative. Se vuoi una libreria incorporata in un servizio, Kafka Streams è competitivo. Se vuoi una definizione di pipeline portabile tra motori, Beam è convincente.
Per architetture event-driven native cloud che usano AWS, Building Event-Driven Microservices with AWS Kinesis copre i pattern Kinesis Data Streams per l’elaborazione in tempo reale e il disaccoppiamento dei servizi.
Come usare Apache Flink in sistemi personalizzati
Questa sezione è intenzionalmente pratica: configurazione, deployment e come i tuoi servizi Go/Python interagiscono tipicamente con Flink.
Pattern di architettura consigliato: servizi Go + Kafka + Flink + layer di servizio

Flink è spesso il “mezzo con stato” che trasforma eventi ad alto volume in segnali durevoli (contatori, sessioni, anomalie, record arricchiti). I checkpoint e i backend dello stato sono ciò che rende quel mezzo affidabile in produzione.
Esempio di configurazione standalone per Apache Flink 2.x
Nota importante sulla versione: a partire da Flink 2.0, il file di configurazione supportato è conf/config.yaml; il precedente flink-conf.yaml non è “più supportato”.
Un conf/config.yaml minimale (illustrativo) per un piccolo cluster self-managed:
# conf/config.yaml (stile Flink 2.x)
rest:
address: flink-jobmanager.example.internal
port: 8081
jobmanager:
rpc:
address: flink-jobmanager.example.internal
port: 6123
memory:
process:
size: 2048m
taskmanager:
memory:
process:
size: 4096m
numberOfTaskSlots: 2
parallelism:
default: 2
# Impostazioni predefinite di checkpointing (i job possono ancora sovrascrivere nel codice)
state:
backend:
type: rocksdb
execution:
checkpointing:
dir: s3://my-bucket/flink/checkpoints
savepoint-dir: s3://my-bucket/flink/savepoints
interval: 60 s
# Evita tmp dirs che vengono purgati (file RocksDB, JAR cacheati, ecc.)
io:
tmp:
dirs: ["/var/lib/flink/tmp"]
Perché queste chiavi: il riferimento alla configurazione di Flink documenta esplicitamente i dettagli di scoperta rest.* e jobmanager.rpc.*, le chiavi della memoria del processo, le chiavi slot/parallelismo e le impostazioni predefinite di checkpoint, inclusi state.backend.type, execution.checkpointing.dir, execution.checkpointing.savepoint-dir e execution.checkpointing.interval.
La scelta di io.tmp.dirs è operativamente importante perché Flink la usa per file locali RocksDB e artefatti cacheati; eliminare può causare un recupero pesante.
Esempio di configurazione legacy standalone per Flink 1.x
Se sei su Flink 1.x (ancora comune in alcuni ambienti gestiti), troverai flink-conf.yaml in giro. Questo è legacy per gli utenti Flink 2.x.
# conf/flink-conf.yaml (stile legacy 1.x; NON supportato in Flink 2.x)
jobmanager.rpc.address: flink-jobmanager
rest.port: 8081
taskmanager.numberOfTaskSlots: 2
parallelism.default: 2
# Le chiavi di checkpoint legacy variano per versione; trattale come illustrative.
state.backend.type: rocksdb
state.checkpoints.dir: s3://my-bucket/flink/checkpoints
state.savepoints.dir: s3://my-bucket/flink/savepoints
Se stai migrando, Flink fornisce uno script di migrazione (bin/migrate-config-file.sh) per convertire flink-conf.yaml in config.yaml.
Deployment Kubernetes/Helm con l’operatore Kubernetes di Flink
L’operatore Kubernetes di Flink agisce come piano di controllo per la gestione del ciclo di vita delle applicazioni Flink e viene installato usando Helm.
Dalla documentazione Helm ufficiale dell’operatore, puoi installare sia dal chart nell’albero sorgente, sia dal repository chart ospitato da Apache:
# install from bundled chart in source tree
helm install flink-kubernetes-operator helm/flink-kubernetes-operator
# install from Apache downloads Helm repository (replace <OPERATOR-VERSION>)
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-<OPERATOR-VERSION>/
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
Questi comandi esatti sono mostrati nella documentazione di installazione Helm dell’operatore.
Esempio CR FlinkDeployment (illustrativo)
Questo è un esempio semplificato per mostrare i punti di integrazione che personalizzerai tipicamente (immagine, risorse, posizioni dei checkpoint, logging/metriche). L’operatore riconcilia questo stato desiderato tramite il suo control loop.
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: realtime-sessions
namespace: flink
spec:
image: my-registry.example.com/flink/realtime-sessions:2026-03-06
flinkVersion: v2_2
serviceAccount: flink
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
state.backend.type: "rocksdb"
execution.checkpointing.dir: "s3://my-bucket/flink/checkpoints/realtime-sessions"
execution.checkpointing.savepoint-dir: "s3://my-bucket/flink/savepoints/realtime-sessions"
execution.checkpointing.interval: "60 s"
rest.port: "8081"
jobManager:
resource:
cpu: 1
memory: "2048m"
taskManager:
resource:
cpu: 2
memory: "4096m"
job:
jarURI: local:///opt/flink/usrlib/realtime-sessions.jar
parallelism: 4
upgradeMode: savepoint
state: running
Il pattern upgradeMode: savepoint è comune quando vuoi aggiornamenti con stato sicuri; i savepoint sono progettati per flussi di lavoro stop/riprendi/fork/aggiorna e puntano a posizioni di storage stabili.
Sviluppo PyFlink: job di streaming Kafka realistico con checkpoint e stato RocksDB
PyFlink è l’API Python per Apache Flink ed è esplicitamente proposta per carichi di lavoro batch/stream scalabili inclusi pipeline ML e ETL.
Packaging delle dipendenze per job Kafka PyFlink
Quando usi connettori JVM (Kafka, JDBC, ecc.) da PyFlink, devi assicurarti che i JAR pertinenti siano disponibili per il job. La documentazione Python “Dependency Management” di Flink mostra tre meccanismi standard:
Impostare pipeline.jars (Table API), chiamare add_jars() (DataStream API) o CLI --jarfile al momento della sottomissione.
Esempio di job Kafka PyFlink (API DataStream + tempo eventi + stato + checkpointing)
Questo esempio legge eventi JSON da Kafka, assegna timestamp del tempo degli eventi (con out-of-orderness limitato), mantiene un conteggio rotolante per utente nello stato chiave e scrive un evento arricchito in un topic di output.
Note:
- KafkaSource viene costruito tramite
KafkaSource.builder()e richiede bootstrap servers, topic e deserializzatore. - La configurazione del sink Kafka exactly-once in PyFlink richiede di impostare la garanzia di consegna e un prefisso ID transazionale.
- I valori predefiniti di checkpoint possono essere configurati nella configurazione Flink (
execution.checkpointing.*) e/o nel codice; le chiavi di configurazione sono documentate nel riferimento alla configurazione di Flink.
import json
from typing import Any
from pyflink.common import Duration, Types
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.watermark_strategy import WatermarkStrategy, TimestampAssigner
from pyflink.common.configuration import Configuration
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
from pyflink.datastream.state import ValueStateDescriptor
from pyflink.datastream.connectors import DeliveryGuarantee
from pyflink.datastream.connectors.kafka import (
KafkaSource,
KafkaOffsetsInitializer,
KafkaSink,
KafkaRecordSerializationSchema,
)
class EventTimeFromJson(TimestampAssigner):
"""
Estrae event_time_ms dal payload JSON.
Aspettativa: {"user_id":"u1","event_time_ms":1710000000000,"event":"click",...}
"""
def extract_timestamp(self, value: str, record_timestamp: int) -> int:
try:
obj = json.loads(value)
return int(obj["event_time_ms"])
except Exception:
# fallback: usa timestamp del record (ingestione) se malformato
return record_timestamp
class RollingCount(KeyedProcessFunction):
def open(self, runtime_context: RuntimeContext):
desc = ValueStateDescriptor("rolling_count", Types.LONG())
self.count_state = runtime_context.get_state(desc)
def process_element(self, value: str, ctx: 'KeyedProcessFunction.Context'):
obj = json.loads(value)
current = self.count_state.value()
if current is None:
current = 0
current += 1
self.count_state.update(current)
# emetti evento arricchito
obj["rolling_count"] = current
obj["event_time_ms"] = int(obj.get("event_time_ms", 0))
yield json.dumps(obj)
def build_env() -> StreamExecutionEnvironment:
# Default cluster/job (possono anche essere impostati in config.yaml)
cfg = Configuration()
cfg.set_string("state.backend.type", "rocksdb")
cfg.set_string("execution.checkpointing.dir", "s3://my-bucket/flink/checkpoints/realtime-sessions")
cfg.set_string("execution.checkpointing.interval", "60 s")
env = StreamExecutionEnvironment.get_execution_environment(cfg)
# In PyFlink, i jar del connettore devono essere disponibili; usa env.add_jars(...) se necessario.
# env.add_jars("file:///opt/flink/lib/flink-connector-kafka-<VERSION>.jar")
# Abilita checkpointing esplicitamente anche (i job possono sovrascrivere i default)
env.enable_checkpointing(60_000)
env.set_parallelism(4)
return env
def main():
env = build_env()
source = (
KafkaSource.builder()
.set_bootstrap_servers("kafka:9092")
.set_topics("events.raw")
.set_group_id("realtime-sessions-v1")
.set_value_only_deserializer(SimpleStringSchema())
.set_starting_offsets(KafkaOffsetsInitializer.earliest())
.build()
)
watermark_strategy = (
WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(10))
.with_timestamp_assigner(EventTimeFromJson())
)
stream = (
env.from_source(source, watermark_strategy=watermark_strategy, source_name="kafka-events-raw")
.key_by(lambda s: json.loads(s)["user_id"])
.process(RollingCount(), output_type=Types.STRING())
)
record_serializer = (
KafkaRecordSerializationSchema.builder()
.set_topic("events.enriched")
.set_value_serialization_schema(SimpleStringSchema())
.build()
)
sink = (
KafkaSink.builder()
.set_bootstrap_servers("kafka:9092")
.set_record_serializer(record_serializer)
.set_delivery_guarantee(DeliveryGuarantee.EXACTLY_ONCE)
.set_transactional_id_prefix("realtime-sessions-txn")
.build()
)
stream.sink_to(sink)
env.execute("realtime-sessions-pyflink")
if __name__ == "__main__":
main()
Le chiamate API sopra si allineano con il modello di utilizzo del builder KafkaSource di PyFlink e i campi richiesti.
Per le garanzie di consegna, la documentazione KafkaSinkBuilder di PyFlink dice esplicitamente che per DeliveryGuarantee.EXACTLY_ONCE devi impostare il prefisso ID transazionale.
Per timestamping/watermarking, la documentazione watermark di Flink spiega l’assegnazione dei timestamp e la generazione dei watermark come il meccanismo per elaborare il tempo degli eventi, e PyFlink fornisce un’API WatermarkStrategy che specchia questo modello.
Integrazione Go: produttore/consumatore Kafka + sottomissione job Flink REST
Go non ha un’API di autore job Flink nativa come Java/Python, quindi i sistemi Go tipicamente si integrano con Flink tramite:
- Kafka (o altri broker) come ingestione/egress.
- L’API REST di Flink per azioni operative (upload JAR, avvio job, interrogazione stato job, attivazione savepoint, ridimensionamento).
Per la configurazione Kafka e pattern di sviluppo locale, consulta Apache Kafka Quickstart - Install Kafka 4.2 with CLI and Local Examples.
Esempio produttore/consumatore Kafka Go (kafka-go)
package main
import (
"context"
"log"
"time"
"github.com/segmentio/kafka-go"
)
func main() {
ctx := context.Background()
// Produttore: scrivi eventi grezzi
writer := &kafka.Writer{
Addr: kafka.TCP("kafka:9092"),
Topic: "events.raw",
RequiredAcks: kafka.RequireAll,
}
defer writer.Close()
err := writer.WriteMessages(ctx, kafka.Message{
Key: []byte("user:u1"),
Value: []byte(`{"user_id":"u1","event_time_ms":1710000000000,"event":"click"}`),
Time: time.Now(),
})
if err != nil {
log.Fatal(err)
}
// Consumatore: leggi eventi arricchiti
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"kafka:9092"},
Topic: "events.enriched",
GroupID: "go-debug-consumer",
MinBytes: 1e3,
MaxBytes: 10e6,
})
defer reader.Close()
msg, err := reader.ReadMessage(ctx)
if err != nil {
log.Fatal(err)
}
log.Printf("enriched key=%s value=%s\n", string(msg.Key), string(msg.Value))
}
Questo è codice “di collegamento”, ma è la superficie di integrazione pratica più comune: i topic Kafka sono il confine tra Flink e servizi personalizzati.
API REST Flink: upload ed esecuzione job da Go
L’API REST di Flink fa parte del server web JobManager e ascolta sulla porta 8081 di default (configurabile tramite rest.port).
La specifica OpenAPI ufficiale per il dispatcher include /jars/upload e dichiara esplicitamente:
- L’upload JAR deve essere inviato come dati multi-part
- assicurati che l’header
Content-Typesia impostato suapplication/x-java-archive - fornisce un esempio curl usando
-F jarfile=@path/to/flink-job.jar
Uno snippet Go pratico per caricare un JAR:
package flink
import (
"bytes"
"context"
"fmt"
"io"
"mime/multipart"
"net/http"
"os"
)
func UploadJar(ctx context.Context, flinkBaseURL, jarPath string) (*http.Response, error) {
f, err := os.Open(jarPath)
if err != nil {
return nil, err
}
defer f.Close()
var body bytes.Buffer
w := multipart.NewWriter(&body)
part, err := w.CreateFormFile("jarfile", "job.jar")
if err != nil {
return nil, err
}
if _, err := io.Copy(part, f); err != nil {
return nil, err
}
if err := w.Close(); err != nil {
return nil, err
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, flinkBaseURL+"/jars/upload", &body)
if err != nil {
return nil, err
}
// Importante: boundary multipart
req.Header.Set("Content-Type", w.FormDataContentType())
// Alcuni client impostano anche "Expect:" simile all'esempio curl nella spec.
req.Header.Set("Expect", "")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode >= 300 {
return resp, fmt.Errorf("upload failed: %s", resp.Status)
}
return resp, nil
}
Questo codice è guidato dalla descrizione OpenAPI REST API per /jars/upload inclusi i requisiti multipart e il riferimento curl.
Per eseguire un JAR precedentemente caricato, Flink espone /jars/{jarid}/run e supporta il passaggio di argomenti del programma tramite parametri di query (e/o corpo JSON).
Endpoint operativamente preziosi che probabilmente automatizzerai:
/jobse/jobs/{jobid}per elencare e ispezionare lo stato del job/jobs/{jobid}/savepointsper attivare savepoint (trigger asincrono + polling)/jobs/{jobid}/rescalingper attivare ridimensionamento
Tabella di confronto snippet di codice: PyFlink vs Go in una piattaforma basata su Flink
| Preoccupazione | PyFlink (job Python) | Go (servizi attorno a Flink) |
|---|---|---|
| Autore della logica Flink | Autore nativo tramite API DataStream/Table; supporta stato + timer | Nessuna API Flink nativa; implementa la logica in Flink (Java/Python) e integra esternamente |
| Connettori/dipendenze | Devi inviare JAR connettore tramite pipeline.jars, add_jars o --jarfile |
Non applicabile (non stai eseguendo dentro Flink), ma gestisci client Kafka/DB |
| Ingestione/egress | Costruttori KafkaSource/KafkaSink in PyFlink | Librerie produttore/consumatore Kafka; pattern microservizi standard |
| Automazione Ops | Può chiamare anche endpoint REST Flink | Spesso possiede l’automazione: upload JAR, deployment, ridimensionamento, trigger savepoint via REST |
Guida DevOps: monitoraggio, scaling, backup e CI/CD per Apache Flink
Monitoraggio Apache Flink in Kubernetes e su VM
Flink supporta l’esportazione delle metriche configurando metric reporter nel file di configurazione di Flink; questi reporter vengono istanziati su JobManager e TaskManagers.
Per Prometheus, Flink espone metriche in formato Prometheus quando configurato con metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory in un ambiente Flink supportato.
Generalmente combini questo con Kubernetes ServiceMonitors (Prometheus Operator) o con il tuo stack di monitoraggio gestito.
Scaling: parallelismo, slot e autoscaling basato su operatori
Il modello di scheduling di Flink definisce le risorse di esecuzione tramite slot del task, e ogni slot può eseguire una pipeline di task paralleli.
Per il scaling manuale, l’API REST fornisce un endpoint di ridimensionamento per un job (/jobs/{jobid}/rescaling) come operazione asincrona.
Se sei su Kubernetes con l’operatore Kubernetes di Flink, il progetto operatore annuncia un “Flink Job Autoscaler” come parte del suo set di funzionalità, che vale la pena valutare se i tuoi carichi di lavoro variano sostanzialmente.
Backup e aggiornamenti sicuri: checkpoint e savepoint
I checkpoint sono per il recupero automatizzato e sono gestiti da Flink; i savepoint sono per operazioni del ciclo di vita guidate dall’utente (stop/riprendi/fork/aggiorna).
Da una prospettiva SRE:
- Usa i checkpoint per “mantenere il pipeline in esecuzione attraverso i guasti”.
- Usa i savepoint per “distribuire una nuova versione senza perdere lo stato”.
L’API REST di Flink supporta anche l’attivazione di savepoint in modo asincrono, utile per flussi di lavoro “deploy → trigger savepoint → aggiorna” stile GitOps.
CI/CD: GitOps + Helm + sottomissione job REST
Per Kubernetes:
- Mantieni l’installazione dell’operatore e i tuoi CR FlinkDeployment in Git, distribuisci tramite Argo CD/Flux e versiona le immagini dei container per build. La documentazione Helm dell’operatore discute esplicitamente “Working with Argo CD”.
Per cluster standalone/session:
- Usa gli endpoint di upload e run JAR dell’API REST Flink per deployment di artefatti immutabili.
Nota anche un interruttore di sicurezza/ops sottile ma prezioso: web.submit.enable governa gli upload tramite la Web UI, ma la documentazione nota che anche quando disabilitato, i cluster session accettano ancora le sottomissioni dei job tramite richieste REST; questo è rilevante quando si rafforzano le superfici UI mantenendo l’automazione CI/CD.
Pattern di integrazione LLM/AI con Apache Flink per pipeline in tempo reale
I sistemi LLM sono spesso buoni solo quanto il loro contesto in tempo reale. Flink si inserisce negli stack LLM/AI come il componente che produce feature, embedding e aggregati comportamentali “sempre freschi”.
Pipeline di embedding in tempo reale con Flink
Un pattern comune è:
- ingerire azioni/eventi utente,
- aggregare sessioni e preferenze,
- produrre task di generazione embedding,
- scrivere embedding in un vector store e/o feature store.
La documentazione sulla gestione delle dipendenze di PyFlink cita esplicitamente “previsione machine learning” e il caricamento di modelli ML all’interno di UDF Python (per esecuzione su cluster remoti), che si mappa direttamente agli approcci “inferenza online all’interno degli operatori Flink”.
Aggiornamenti feature store online per raccomandazione e ranking
Il modello di stato chiave e checkpointing di Flink è costruito per mantenere lo stato dell’operatore attraverso gli eventi e recuperarlo in modo affidabile. Questo è una corrispondenza naturale per il calcolo continuo delle feature (tassi rotolanti, conteggi, metriche decadute nel tempo) di cui i recommender a valle hanno bisogno.
Compromessi pratici di latenza/consistenza per pipeline AI
Se la tua architettura richiede semantica exactly-once end-to-end (ad es. evitare aggiornamenti feature duplicati o eventi di fatturazione duplicati), strutturerai sink e sorgenti attorno al checkpointing e garanzie transazionali.
Nello specifico degli stack basati su Kafka:
- Il connettore Kafka di Flink può fornire garanzie exactly-once quando il checkpointing è abilitato e le opzioni di garanzia di consegna sono configurate.
- Kafka Streams supporta anche semantica exactly-once (EOS), che è rilevante se il tuo “pipeline feature AI” è abbastanza piccolo da risiedere nel codice dell’applicazione piuttosto che in un cluster Flink.
Vista architetturale per “Flink come costruttore di contesto AI in tempo reale”

Questo diagramma si basa sui primitivi fondamentali di Flink: elaborazione del tempo degli eventi (watermark), backend dello stato (state.backend.type e stato locale gestito dal sistema) e meccanismi di checkpoint/savepoint per la tolleranza ai guasti e le operazioni.