Apache Flink na K8s i Kafka: PyFlink, Go, operacje oraz cennik zarządzany.

Strumieniowanie z zachowaniem stanu, punkty kontrolne, K8s, PyFlink, Go.

Page content

Apache Flink to framework do obliczeń z pamięcią stanu (stateful computations) nad nieograniczonymi i ograniczonymi strumieniami danych.

Zespoły wdrażają go w celu poprawnego, niskopóźniowego przetwarzania strumieniowego z semantyką czasu zdarzenia (watermarks), tolerancją błędów (checkpoints), kontrolowanymi aktualizacjami (savepoints) oraz powierzchnią operacyjną (metryki i REST).

Apache Flink stream processing

Ten przewodnik skierowany jest do inżynierów DevOps oraz programistów Go/Python. Porównuje modele wdrażania (samodzielnie zarządzane vs. zarządzane), wyjaśnia podstawową architekturę, omawia konfiguracje Kubernetes (Helm i Operator) oraz samodzielne, zestawia Flink ze Sparkiem, Kafka Streams, Beam oraz bazami danych strumieniowych, a także przedstawia wzorce integracji PyFlink i Go, w tym rurociągi zorientowane na LLM i AI.

Wszelkie szersze konteksty dotyczące wzorców infrastruktury danych, w tym magazynowania obiektowego, baz danych i komunikacji, znaleźć można w Infrastruktura danych dla systemów AI: Magazynowanie obiektowe, bazy danych, wyszukiwanie i architektura danych AI.

Apache Flink jest wyraźnie pozycjonowany jako silnik do przetwarzania strumieni z pamięcią stanu: modelujesz swoją logikę jako rurociąg operatorów, a Flink wykonuje go jako rozproszone przepływy danych z zarządzanym stanem i semantyką czasu. W nowoczesnej dokumentacji Flink projekt opisuje się jako framework i rozproszone silniki przetwarzania dla obliczeń z pamięcią stanu nad nieograniczonymi i ograniczonymi strumieniami danych.

Z perspektywy praktycznej inżynierii DevOps/oprogramowania, Flink jest dobrym wyborem, gdy potrzebujesz co najmniej jednej z tych właściwości:

Jeśli potrzebujesz łączenia/agregacji/wzbogacania z niskim opóźnieniem i gwarancjami poprawności, zazwyczaj używasz przetwarzania czasu zdarzenia (event-time) w Flink, gdzie „czas" to moment wystąpienia zdarzenia (nie moment jego przyjścia), a watermarki komunikują postęp czasu zdarzenia przez rurociąg.

Jeśli potrzebujesz obliczeń z pamięcią stanu w skali (liczniki, sesje, reguły przeciwko oszustwom, inżynieria cech), Flink traktuje stan jako element pierwszorzędny modelu programistycznego i czyni go odpornym na błędy dzięki checkpointom.

Jeśli potrzebujesz operacyjnie odpornego strumieniowania (awarie, aktualizacje rolkowe, restarty), Flink checkpointuje stan i pozycje strumieni, dzięki czemu praca może zostać odzyskana i kontynuowana z tą samą semantyką „jakby nie doszło do błędu".

Typowe przypadki użycia dla zespołów DevOps, Go, Python i AI

Flink jest szeroko wykorzystywany do „przewodów danych i ETL", „analizy strumieniowej" oraz „aplikacji napędzanych zdarzeniami" (kategorie używane w dokumentacji Flink).

Dla stosu DevOps + Go/Python typowe wzorce wyglądają następująco:

Usługa Go generuje zdarzenia do Kafka; Flink pobiera te zdarzenia, wykonuje przetwarzanie z pamięcią stanu (np. usuwanie duplikatów, agregacja okienkowa, wzbogacenie), a następnie zapisuje wywodzone fakty z powrotem do Kafka lub bazy danych. Mechanizmy operatorów i checkpointów w Flink istnieją, aby sprawić, że te rurociągi z pamięcią stanu są bezpieczne w produkcji.

Dla zespołów ML/LLM, PyFlink wyraźnie wskazuje scenariusze takie jak „przewidywanie uczenia maszynowego" i ładowanie modeli uczenia maszynowego wewnątrz Python UDF jako motywację do zarządzania zależnościami, co jest bezpośrednim wsparciem dla wzorców „praca Flink jako środowisko inferencji online/inżynierii cech".

Czas trwania (runtime) Flink składa się z dwóch typów procesów: JobManager i TaskManagers. Dokumentacja podkreśla, że klienci przekazują przepływ danych do JobManager; klient może następnie odłączyć się (tryb odłączony) lub pozostać połączony (tryb przyłączony).

JobManager koordynuje rozproszone wykonanie: harmonogramowanie, reagowanie na zakończenie/zawalenie zadań, koordynowanie checkpointów i koordynowanie odzyskiwania. Wewnętrznie zawiera: ResourceManager (sloty/zasoby), Dispatcher (REST + Interfejs WWW + tworzenie JobMaster dla każdej pracy) oraz JobMaster (zarządza jedną pracą).

TaskManagers wykonują operatora/zadania i wymieniają/buforują strumienie danych. Najmniejszą jednostką harmonogramowania jest slot zadania; w jednym slotie mogą być wykonane wiele operatorów (łańcuchowanie operatorów i współdzielenie slotów mają na to wpływ).

Łańcuchowanie operatorów i sloty zadań dla kontroli wydajności i kosztów

Flink łączy podzadania operatorów w zadania, gdzie każde zadanie jest wykonywane przez pojedynczą wątk. Jest to opisane jako optymalizacja wydajności, która zmniejsza nakład na przekazanie wątków i buforowanie, zwiększając przepustowość i zmniejszając opóźnienia.

Sloty mają znaczenie operacyjne, ponieważ są jednostką harmonogramowania/izolacji zasobów. Flink zauważa, że każdy TaskManager może mieć jeden lub więcej slotów zadania; alokacja slotów rezerwuje zarządzaną pamięć na slot, ale nie izoluje CPU.

Przetwarzanie czasu zdarzenia, watermarki i spóźnione dane

Flink obsługuje wiele pojęć czasu – czas zdarzenia, czas pobrania, czas przetwarzania – i używa watermarków do modelowania postępu w czasie zdarzenia.

Aby pracować z czasem zdarzenia, Flink potrzebuje przypisanych znaczników czasu dla zdarzeń i generowanych watermarków; oficjalna dokumentacja „Generating Watermarks" wyjaśnia przypisywanie znaczników czasu i generowanie watermarków jako podstawowe bloki budulcowe, przy czym WatermarkStrategy jest standardowym sposobem konfigurowania typowych strategii.

Tolerancja błędów: checkpointy versus savepoints w systemach rzeczywistych

Checkpointing istnieje, ponieważ „każda funkcja i operator w Flink może mieć stan"; stan musi być checkpointowany, aby stać się odpornym na błędy. Checkpointy umożliwiają odzyskanie zarówno stanu, jak i pozycji strumienia, dzięki czemu wykonanie może zostać wznowione z semantyką bezbłędnej pracy.

Flink jest bardzo wyraźne, że savepointy to „spójny obraz stanu wykonania pracy strumieniowej, utworzony za pomocą mechanizmu checkpointingu Flink", używany do zatrzymywania/wznawiania, forkingu lub aktualizacji prac. Savepointy przechowywane są na trwałym magazynie (np. HDFS, S3).

Oficjalna strona „Checkpoints vs Savepoints" przedstawia różnicę jak kopie zapasowe vs. logi odzyskiwania: checkpointy są częste, lekkie i zarządzane przez Flink do odzyskiwania po błędach; savepointy są zarządzane przez użytkownika i używane do kontrolowanych operacji, takich jak aktualizacje.

Otwartoźródłowy runtime Flink jest „darmowy" w sensie licencyjnym, ale w produkcji płacisz za infrastrukturę i wysiłek operacyjny.

Flink został zaprojektowany do integracji z popularnymi menedżerami zasobów (np. YARN i Kubernetes) i może również działać jako samodzielny klastery lub jako biblioteka.

Koszty obliczeniowe i pamięciowe są napędzane przez JobManager i TaskManagers oraz układ równoległości/slotów. Dokumentacja konfiguracyjna Flink wyraźnie wskazuje jobmanager.memory.process.size, taskmanager.memory.process.size, taskmanager.numberOfTaskSlots i parallelism.default jako główne pokrętła dla konfiguracji rozproszonych.

Lokalny dysk jest częstym ukrytym kosztem dla prac z pamięcią stanu. Flink zauważa, że io.tmp.dirs przechowuje dane lokalne, w tym pliki RocksDB, rozlane wyniki pośrednie i zakasowane JARy; jeśli te dane zostaną usunięte, może to wymusić „ciężką operację odzyskiwania", więc powinny one znajdować się na magazynie, który nie jest okresowo czyszczony.

Koszt trwałego magazynowania obiektów/plików jest napędzany przez katalogi checkpointów/savepointów. W konfiguracji Flink 2.x checkpointy i savepointy są konfigurowane za pomocą execution.checkpointing.dir i execution.checkpointing.savepoint-dir i akceptują URI takie jak s3://… lub hdfs://….

Usługi zarządzane zmniejszają koszty operacyjne, ale dodają opłaty platformowe i ograniczenia. Szczegóły zależą od dostawcy.

Amazon Managed Service for Apache Flink rozlicza się według KPU (1 vCPU + 4 GB pamięci na KPU) i pobiera opłatę za czas trwania i liczbę KPU w przyrostach jednej sekundy. AWS pobiera również dodatkową opłatę za „orchestrację" KPU na aplikację oraz osobne opłaty za magazynowanie/kopie zapasowe.

Confluent Cloud for Apache Flink jest oparty na użyciu i serverless: tworzysz pulę obliczeniową i jesteś rozliczany za CFU zużyte na minutę, podczas gdy instrukcje są uruchamiane. Strona rozliczeń zawiera przykład ceny CFU wynoszący 0,21 USD za godzinę CFU (zależny od regionu) i podkreśla, że możesz ograniczyć wydatki poprzez maksymalne limity puli obliczeniowej.

Aiven i Alibaba Cloud to godne uwagi dostawcy zarządzanego Flink na rynku, ale ich publiczne ceny i szczegóły rozliczeń różnią się w zależności od planu/regionu i mogą wymagać kalkulatorów lub kontaktu ze sprzedażą; traktuj dokładne koszty jako nieokreślone, chyba że zacytujesz region+plan z ich bieżącej dokumentacji.

Ververica oferuje opcje wdrożenia samodzielnie zarządzanego i zarządzanego wokół Flink; strony publiczne podkreślają wybór wdrożenia i pozycjonowanie usługi zarządzanej, podczas gdy dokładne ceny są zazwyczaj obsługiwane poprzez przepływy „kontakt/szczegóły cenowe" (zatem konkretne liczby są często nieokreślone publicznie).

Opcja wdrożenia Najlepsze dla Skomplikowanie operacyjne Kluczowe korzyści Kluczowe ryzyka/kompromisy
Samodzielny klastery (VMs/bare metal) Małe zespoły, stała pojemność Średnie–Wysokie Pełna kontrola; najprostszy model mentalny HA, autoskalowanie, aktualizacje to DIY (więcej pracy)
Kubernetes z Flink Kubernetes Operator Większość nowoczesnych zespołów platformowych Średnie Deklaracyjne wdrożenia; zarządzanie cyklem życia poprzez pętlę sterowania; obsługa wdrożeń Application/Session/Job Wymagana znajomość Kubernetes + operatora
Natywny Kubernetes (bez operatora) Zespoły K8s chcące bezpośrednią integrację Średnie–Wysokie Bezpośrednia integracja zasobów; dynamiczna alokacja/dealokacja TaskManager opisana w dokumentacji Flink-on-K8s Bardziej niestandardowa automatyzacja niż z operatorem
YARN Platformy centrowane na Hadoop Średnie Integracja z zarządzaniem zasobami YARN Skomplikowanie stosu Hadoop
AWS Managed Service for Apache Flink Stosy danych natywne dla AWS Niskie–Średnie Zarządzana orkiestracja + opcje skalowania; przewidywalna jednostka rozliczeniowa (KPU) Powiązanie z platformą; dodatkowa opłata KPU za aplikację + opłaty za magazynowanie
Confluent Cloud for Apache Flink Firmy oparte na Kafka, aplikacje strumieniowe z priorytetem SQL Niskie Rozliczenie za użycie serverless; rozliczenie CFU-minuty; puli obliczeniowe do ograniczenia wydatków Koszty CFU + koszty sieciowe Kafka; specyficzne dla usługi API
Oferty zarządzane Ververica Przedsiębiorstwa potrzebujące eksperckiej obsługi Flink Niskie–Średnie Pozycjonowanie jako usługa zarządzana „ekspertami Flink" Ceny często nieprzejrzyste (nieokreślone)

Tabela dostawców i kosztów zarządzanych

Ceny zmieniają się w zależności od regionu i czasu; jeśli potrzebujesz dokładnych liczb dla Twojego regionu, traktuj to jako punkt wyjścia i zweryfikuj ze stronami cenowymi dostawcy (regiony niepodane są nieokreślone).

Dostawca Forma „planu" Jednostka rozliczeniowa Przykładowa cena obliczeniowa Godne uwagi dodatkowe czynniki kosztów
Amazon Managed Service for Apache Flink Runtime zarządzany KPU (1 vCPU + 4 GB) Przykład: 0,11 USD za godzinę KPU (US East N. Virginia) +1 KPU orkiestracji na aplikację; uruchomiony magazyn; opcjonalne trwałe kopie zapasowe
Confluent Cloud for Apache Flink SQL/Przetwarzanie serverless CFU-minuta/CFU-godzina Przykład: 0,21 USD za godzinę CFU (zależne od regionu) Stosują się stawki sieciowe Kafka; maksymalne limity puli obliczeniowej do ograniczenia wydatków
Ververica (zarządzany) Zarządzana „Unified Streaming Data Platform" Nieokreślone (strony publiczne) Nieokreślone Funkcje platformy/SLA; ceny zazwyczaj przez sprzedaż (nieokreślone)
Aiven for Apache Flink Usługa zarządzana Model rozliczeń godzinowego użycia (platformowy) Nieokreślone bez planu/regionu Poziom planu + region chmurowy + dodatki (nieokreślone)
Alibaba Cloud Realtime Compute for Apache Flink Zarządzany/serverless Hybrydowe rozliczenia (płać za użycie + subskrypcja) Nieokreślone bez szczegółów regionu/pracowni Limity oparte na CU i model przestrzeni roboczej (szczegóły różnią się; nieokreślone tutaj)

Flink znajduje się w gęstym ekosystemie. „Najlepszy" wybór zależy od opóźnienia, pamięci stanu, preferencji operacyjnych i modelu autorskiego.

Narzędzie Czym jest Model wykonania strumieniowego Stan i historia dokładnie raz Gdzie błyszczy Typowe problemy
Apache Flink Rozproszony silnik przetwarzania strumieni dla obliczeń z pamięcią stanu Ciągłe strumienie + czas zdarzenia poprzez watermarki Tolerancja błędów oparta na checkpointach; savepointy do kontrolowanych aktualizacji Rurociągi z pamięcią stanu o niskim opóźnieniu; skomplikowana logika czasu zdarzenia Prawidłowe operowanie stanem, checkpointami i aktualizacjami wymaga dyscypliny
Apache Spark Structured Streaming Silnik strumieniowy Sparka oparty na DataFrames/Datasets Domyślny model mikro-płatniczy (z trybem ciągłym omawianym osobno) Silny dla rurociągów analitycznych; stan istnieje, ale często wyższe opóźnienia Zjednoczone API batch+strumień; ekosystem Sparka Opóźnienia mikro-płatnicze i model mentalny „strumienie jako inkrementalne płatki"
Kafka Streams Biblioteka do budowania aplikacji przetwarzania strumieni na Kafce Przetwarzanie rekord po rekordzie Obsługuje semantykę przetwarzania dokładnie raz (EOS) Proste aplikacje natywne dla Kafka; wbudowanie w usługę JVM Tylko JVM; mniej elastyczny dla dużych wzorców obliczeń rozproszonych
Apache Beam Zjednoczony model programistyczny + SDK; wykonywany poprzez uruchamiacze (Flink, Spark, Dataflow itp.) Zależne od uruchamiacza; rurociągi Beam tłumaczą się na prace uruchamiacza Semantyka zależy od macierzy zdolności uruchamiacza (specyficzna dla uruchamiacza) Portowalność, rurociągi wielojęzyczne; unikanie zablokowania silnikiem Tuning operacyjny wciąż kończy się specyfiką uruchamiacza
Materialize „Warstwa danych na żywo" / baza danych SQL strumieniowa; inkrementalnie aktualizuje wyniki, gdy dane przychodzą Ciągła konserwacja widoków inkrementalnych Silne twierdzenia o spójności w dokumentacji produktu (szczegóły specyficzne dla produktu) Serwisowanie świeżych widoków wywodzonych dla aplikacji/agentów AI Inny model operacyjny niż prace Flink; nie jest ogólnym środowiskiem wykonania API operatorów
RisingWave Baza danych strumieniowa, gdzie przetwarzanie strumieni jest wyrażane jako widoki materializowane Ciągła konserwacja materializowanych widoków Priorytet SQL; semantyka specyficzna dla silnika Aplikacje strumieniowe centrowane na SQL bez budowania prac Flink Mniej elastyczny dla dowolnych rurociągów obciążonych kodem

Użyteczna heurystyka: jeśli chcesz środowisko wykonania dla skomplikowanych prac strumieniowych z pamięcią stanu z głęboką kontrolą nad czasem zdarzenia, logiką operatorów i wdrożeniami, Flink jest kandydatem pierwszego wyboru. Jeśli chcesz widoki inkrementalne z priorytetem SQL do serwowania, bazy danych strumieniowych mogą być alternatywą. Jeśli chcesz bibliotekę wbudowaną w usługę, Kafka Streams jest konkurencyjne. Jeśli chcesz jedną przenośną definicję rurociągu przez silniki, Beam jest przekonujące.

Dla natywnych dla chmury architektur napędzanych zdarzeniami używających AWS, Budowanie mikroserwisów napędzanych zdarzeniami z AWS Kinesis obejmuje wzorce Kinesis Data Streams dla przetwarzania w czasie rzeczywistym i odłączania usług.

Ta sekcja jest celowo praktyczna: konfiguracja, wdrożenie i jak Twoje usługi Go/Python zazwyczaj interagują z Flink.

Go services + Kafka + Flink + serving layer

Flink jest często „stanowym środkiem", który przekształca wydarzenia o wysokim wolumenie w trwałe sygnały (liczniki, sesje, anomalie, wzbogacone rekordy). Checkpointy i backendy stanu to, co czyni ten środek niezawodnym w produkcji.

Ważna uwaga dotycząca wersji: począwszy od Flink 2.0, obsługiwany plik konfiguracyjny to conf/config.yaml; poprzedni flink-conf.yaml jest „już nieobsługiwany".

Minimalny (ilustracyjny) conf/config.yaml dla małego samodzielnie zarządzanego klastra:

# conf/config.yaml (styl Flink 2.x)
rest:
  address: flink-jobmanager.example.internal
  port: 8081

jobmanager:
  rpc:
    address: flink-jobmanager.example.internal
    port: 6123
  memory:
    process:
      size: 2048m

taskmanager:
  memory:
    process:
      size: 4096m
  numberOfTaskSlots: 2

parallelism:
  default: 2

# Domyślne ustawienia checkpointowania (prace mogą nadpisać w kodzie)
state:
  backend:
    type: rocksdb
execution:
  checkpointing:
    dir: s3://my-bucket/flink/checkpoints
    savepoint-dir: s3://my-bucket/flink/savepoints
    interval: 60 s

# Unikaj katalogów tymczasowych, które są czyszczone (pliki RocksDB, zakasowane JARy itp.)
io:
  tmp:
    dirs: ["/var/lib/flink/tmp"]

Dlaczego te klucze: Referencja konfiguracyjna Flink wyraźnie dokumentuje szczegóły odkrywania rest.* i jobmanager.rpc.*, klucze pamięci procesu, klucze slotów/równoległości oraz domyślne ustawienia checkpointowania, w tym state.backend.type, execution.checkpointing.dir, execution.checkpointing.savepoint-dir oraz execution.checkpointing.interval.

Wybór io.tmp.dirs jest operacyjnie ważny, ponieważ Flink używa go do lokalnych plików RocksDB i zakasowanych artefaktów; usunięcie go może spowodować ciężkie odzyskiwanie.

Jeśli korzystasz z Flink 1.x (wciąż powszechny w niektórych środowiskach zarządzanych), zobaczysz flink-conf.yaml w obiegu. Jest to legenda dla użytkowników Flink 2.x.

# conf/flink-conf.yaml (styl legacy 1.x; NIE obsługiwany w Flink 2.x)
jobmanager.rpc.address: flink-jobmanager
rest.port: 8081
taskmanager.numberOfTaskSlots: 2
parallelism.default: 2

# Klucze checkpointowania legacy różnią się w zależności od wersji; traktuj jako ilustracyjne.
state.backend.type: rocksdb
state.checkpoints.dir: s3://my-bucket/flink/checkpoints
state.savepoints.dir: s3://my-bucket/flink/savepoints

Jeśli migrujesz, Flink dostarcza skrypt migracji (bin/migrate-config-file.sh) do konwersji flink-conf.yaml na config.yaml.

Flink Kubernetes Operator działa jako płaszcza sterowania dla zarządzania cyklem życia aplikacji Flink i jest instalowany za pomocą Helm.

Z oficjalnej dokumentacji Helm operatora możesz zainstalować albo z wykresu drzewa źródłowego, albo z repozytorium wykresów hostowanego przez Apache:

# zainstaluj z wykresu dołączonym w drzewie źródłowym
helm install flink-kubernetes-operator helm/flink-kubernetes-operator

# zainstaluj z repozytorium Helm Apache downloads (zastąp <OPERATOR-VERSION>)
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-<OPERATOR-VERSION>/
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator

Te dokładne polecenia są pokazane w dokumentacji instalacji Helm operatora.

Przykład CR FlinkDeployment (ilustracyjny)

To jest uproszczony przykład pokazujący punkty integracji, które zazwyczaj dostosowujesz (obraz, zasoby, lokalizacje checkpointów, logowanie/metryki). Operator dopasowuje ten pożądaną stan poprzez swoją pętlę sterowania.

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: realtime-sessions
  namespace: flink
spec:
  image: my-registry.example.com/flink/realtime-sessions:2026-03-06
  flinkVersion: v2_2
  serviceAccount: flink
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.backend.type: "rocksdb"
    execution.checkpointing.dir: "s3://my-bucket/flink/checkpoints/realtime-sessions"
    execution.checkpointing.savepoint-dir: "s3://my-bucket/flink/savepoints/realtime-sessions"
    execution.checkpointing.interval: "60 s"
    rest.port: "8081"
  jobManager:
    resource:
      cpu: 1
      memory: "2048m"
  taskManager:
    resource:
      cpu: 2
      memory: "4096m"
  job:
    jarURI: local:///opt/flink/usrlib/realtime-sessions.jar
    parallelism: 4
    upgradeMode: savepoint
    state: running

Wzorzec upgradeMode: savepoint jest powszechny, gdy chcesz bezpieczne aktualizacje z pamięcią stanu; savepointy są zaprojektowane dla przepływów pracy zatrzymuj/wznów/fork/aktualizuj i wskazują na lokalizacje stabilnego magazynu.

PyFlink to API Python dla Apache Flink i jest wyraźnie promowane dla skalowalnych obciążeń batch/stream, w tym rurociągów ML i ETL.

Kiedy używasz łączników JVM (Kafka, JDBC itp.) z PyFlink, musisz zapewnić, że odpowiednie JARy są dostępne dla pracy. Dokumentacja Python „Dependency Management" Flink pokazuje trzy standardowe mechanizmy:

Ustawianie pipeline.jars (Table API), wywoływanie add_jars() (DataStream API) lub CLI --jarfile w momencie przesłania.

Ten przykład odczytuje zdarzenia JSON z Kafka, przypisuje znaczniki czasu zdarzenia (z ograniczoną nieuporządkowaniem), utrzymuje zliczanie rolkowe na użytkownika w stanowym stanie kluczowanym i zapisuje wzbogacone zdarzenie do tematu wyjściowego.

Uwagi:

  • KafkaSource jest budowany poprzez KafkaSource.builder() i wymaga serwerów bootstrap, tematów i deserializatora.
  • Konfiguracja dokładnie raz (exactly-once) dla Kafka sink w PyFlink wymaga ustawienia gwarancji dostawy i prefiksu identyfikatora transakcyjnego.
  • Domyślne ustawienia checkpointowania można skonfigurować w konfiguracji Flink (execution.checkpointing.*) i/lub w kodzie; klucze konfiguracji są udokumentowane w referencji konfiguracyjnej Flink.
import json
from typing import Any

from pyflink.common import Duration, Types
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.watermark_strategy import WatermarkStrategy, TimestampAssigner
from pyflink.common.configuration import Configuration

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
from pyflink.datastream.state import ValueStateDescriptor

from pyflink.datastream.connectors import DeliveryGuarantee
from pyflink.datastream.connectors.kafka import (
    KafkaSource,
    KafkaOffsetsInitializer,
    KafkaSink,
    KafkaRecordSerializationSchema,
)


class EventTimeFromJson(TimestampAssigner):
    """
    Wydobycie event_time_ms z ładunku JSON.
    Oczekiwany format: {"user_id":"u1","event_time_ms":1710000000000,"event":"click",...}
    """
    def extract_timestamp(self, value: str, record_timestamp: int) -> int:
        try:
            obj = json.loads(value)
            return int(obj["event_time_ms"])
        except Exception:
            # awaryjnie: użyj znacznika czasu rekordu (pobrania), jeśli niepoprawny
            return record_timestamp


class RollingCount(KeyedProcessFunction):
    def open(self, runtime_context: RuntimeContext):
        desc = ValueStateDescriptor("rolling_count", Types.LONG())
        self.count_state = runtime_context.get_state(desc)

    def process_element(self, value: str, ctx: 'KeyedProcessFunction.Context'):
        obj = json.loads(value)
        current = self.count_state.value()
        if current is None:
            current = 0
        current += 1
        self.count_state.update(current)

        # emituj wzbogacone zdarzenie
        obj["rolling_count"] = current
        obj["event_time_ms"] = int(obj.get("event_time_ms", 0))
        yield json.dumps(obj)


def build_env() -> StreamExecutionEnvironment:
    # Domyślne ustawienia klastra/pracy (można też ustawić w config.yaml)
    cfg = Configuration()
    cfg.set_string("state.backend.type", "rocksdb")
    cfg.set_string("execution.checkpointing.dir", "s3://my-bucket/flink/checkpoints/realtime-sessions")
    cfg.set_string("execution.checkpointing.interval", "60 s")
    env = StreamExecutionEnvironment.get_execution_environment(cfg)

    # W PyFlink, JARy łączników muszą być dostępne; użyj env.add_jars(...) jeśli potrzebne.
    # env.add_jars("file:///opt/flink/lib/flink-connector-kafka-<VERSION>.jar")

    # Włącz checkpointowanie jawnie (prace mogą nadpisać domyślne wartości)
    env.enable_checkpointing(60_000)
    env.set_parallelism(4)
    return env


def main():
    env = build_env()

    source = (
        KafkaSource.builder()
        .set_bootstrap_servers("kafka:9092")
        .set_topics("events.raw")
        .set_group_id("realtime-sessions-v1")
        .set_value_only_deserializer(SimpleStringSchema())
        .set_starting_offsets(KafkaOffsetsInitializer.earliest())
        .build()
    )

    watermark_strategy = (
        WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(10))
        .with_timestamp_assigner(EventTimeFromJson())
    )

    stream = (
        env.from_source(source, watermark_strategy=watermark_strategy, source_name="kafka-events-raw")
        .key_by(lambda s: json.loads(s)["user_id"])
        .process(RollingCount(), output_type=Types.STRING())
    )

    record_serializer = (
        KafkaRecordSerializationSchema.builder()
        .set_topic("events.enriched")
        .set_value_serialization_schema(SimpleStringSchema())
        .build()
    )

    sink = (
        KafkaSink.builder()
        .set_bootstrap_servers("kafka:9092")
        .set_record_serializer(record_serializer)
        .set_delivery_guarantee(DeliveryGuarantee.EXACTLY_ONCE)
        .set_transactional_id_prefix("realtime-sessions-txn")
        .build()
    )

    stream.sink_to(sink)

    env.execute("realtime-sessions-pyflink")


if __name__ == "__main__":
    main()

Powyższe wywołania API pasują do wzorca użycia budowni KafkaSource w PyFlink i wymaganych pól. Dla gwarancji dostawy, dokumentacja KafkaSinkBuilder w PyFlink wyraźnie mówi, że dla DeliveryGuarantee.EXACTLY_ONCE musisz ustawić prefiks identyfikatora transakcyjnego. Dla znaczników czasu/watermarków, dokumentacja watermarków Flink wyjaśnia przypisywanie znaczników czasu i generowanie watermarków jako mechanizm do przetwarzania czasu zdarzenia, a PyFlink dostarcza API WatermarkStrategy lustrujące ten model.

Go nie posiada natywnego API autorskiego prac Flink jak Java/Python, więc systemy Go zazwyczaj integrują się z Flink poprzez:

  • Kafka (lub inne brokerzy) jako źródło/odchodzenie.
  • API REST Flink dla działań operacyjnych (przesyłanie JARów, uruchamianie prac, zapytanie o status pracy, wyzwalanie savepointów, przeskalowanie).

Dla konfiguracji Kafka i wzorców lokalnego rozwoju zobacz Szybki start Apache Kafka - Instalacja Kafka 4.2 z CLI i przykładami lokalnymi.

Przykład producenta/konsumera Go Kafka (kafka-go)

package main

import (
	"context"
	"log"
	"time"

	"github.com/segmentio/kafka-go"
)

func main() {
	ctx := context.Background()

	// Producent: zapisz surowe zdarzenia
	writer := &kafka.Writer{
		Addr:         kafka.TCP("kafka:9092"),
		Topic:        "events.raw",
		RequiredAcks: kafka.RequireAll,
	}
	defer writer.Close()

	err := writer.WriteMessages(ctx, kafka.Message{
		Key:   []byte("user:u1"),
		Value: []byte(`{"user_id":"u1","event_time_ms":1710000000000,"event":"click"}`),
		Time:  time.Now(),
	})
	if err != nil {
		log.Fatal(err)
	}

	// Konsumer: odczytaj wzbogacone zdarzenia
	reader := kafka.NewReader(kafka.ReaderConfig{
		Brokers:  []string{"kafka:9092"},
		Topic:    "events.enriched",
		GroupID:  "go-debug-consumer",
		MinBytes: 1e3,
		MaxBytes: 10e6,
	})
	defer reader.Close()

	msg, err := reader.ReadMessage(ctx)
	if err != nil {
		log.Fatal(err)
	}
	log.Printf("enriched key=%s value=%s\n", string(msg.Key), string(msg.Value))
}

To jest kod „okablowania", ale to jest najczęstsza praktyczna powierzchnia integracji: tematy Kafka to granica między Flink a usługami niestandardowymi.

API REST Flink jest częścią serwera WWW JobManager i nasłuchuje na porcie 8081 domyślnie (konfigurowalne poprzez rest.port).

Oficjalna specyfikacja OpenAPI dla dispatcher zawiera /jars/upload i wyraźnie stwierdza:

  • Przesłanie JAR musi być wysłane jako dane multi-part
  • upewnij się, że nagłówek Content-Type jest ustawiony na application/x-java-archive
  • dostarcza przykład curl używający -F jarfile=@path/to/flink-job.jar

Praktyczny fragment kodu Go do przesłania JAR:

package flink

import (
	"bytes"
	"context"
	"fmt"
	"io"
	"mime/multipart"
	"net/http"
	"os"
)

func UploadJar(ctx context.Context, flinkBaseURL, jarPath string) (*http.Response, error) {
	f, err := os.Open(jarPath)
	if err != nil {
		return nil, err
	}
	defer f.Close()

	var body bytes.Buffer
	w := multipart.NewWriter(&body)

	part, err := w.CreateFormFile("jarfile", "job.jar")
	if err != nil {
		return nil, err
	}
	if _, err := io.Copy(part, f); err != nil {
		return nil, err
	}
	if err := w.Close(); err != nil {
		return nil, err
	}

	req, err := http.NewRequestWithContext(ctx, http.MethodPost, flinkBaseURL+"/jars/upload", &body)
	if err != nil {
		return nil, err
	}

	// Ważne: granica multi-part
	req.Header.Set("Content-Type", w.FormDataContentType())

	// Niektóre klienci również ustawiają "Expect:" podobnie jak przykład curl w specyfikacji.
	req.Header.Set("Expect", "")

	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		return nil, err
	}
	if resp.StatusCode >= 300 {
		return resp, fmt.Errorf("upload failed: %s", resp.Status)
	}
	return resp, nil
}

Ten kod jest prowadzony przez opis OpenAPI API REST dla /jars/upload, w tym jego wymóg multi-part i referencję curl.

Aby uruchomić wcześniej przesłane JAR, Flink udostępnia /jars/{jarid}/run i obsługuje przekazywanie argumentów programu poprzez parametry zapytania (i/lub ciało JSON).

Wartościowe operacyjnie punkty końcowe, które prawdopodobnie zautomatyzujesz:

  • /jobs i /jobs/{jobid} do listowania i inspekcji stanu pracy
  • /jobs/{jobid}/savepoints do wyzwalania savepointów (asynchroniczne wyzwalanie + polling)
  • /jobs/{jobid}/rescaling do wyzwalania przeskalowania
Kwestia PyFlink (prace Python) Go (usługi wokół Flink)
Tworzenie logiki Flink Natywne tworzenie poprzez API DataStream/Table; obsługuje stan + timery Brak natywnego API Flink; implementuj logikę w Flink (Java/Python) i integruj z zewnątrz
Łączniki/zależności Musisz dostarczyć JARy łączników poprzez pipeline.jars, add_jars lub --jarfile Nie dotyczy (nie uruchamiasz wewnątrz Flink), ale zarządzasz klientami Kafka/DB
Pobranie/egres Budowni KafkaSource/KafkaSink w PyFlink Biblioteki producenta/konsumera Kafka; standardowe wzorce mikroserwisów
Automatyzacja operacyjna Możesz też wywoływać punkty końcowe REST Flink Często posiada automatyzację: przesłanie JAR, wdrożenie, przeskalowanie, wyzwalanie savepointa poprzez REST

Flink obsługuje eksportowanie metryk poprzez konfigurowanie raportów metryk w pliku konfiguracyjnym Flink; te raporty są instancjonowane na JobManager i TaskManagers.

Dla Prometheus, Flink odsłania metryki w formacie Prometheus, gdy skonfigurowane są z metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory w obsługiwanej wersji Flink.

Zazwyczaj łączysz to z ServiceMonitors Kubernetes (Operator Prometheus) lub z Twoją zarządzaną stosem monitoringu.

Skalowanie: równoległość, sloty i autoskalowanie oparte na operatorach

Model harmonogramowania Flink definiuje zasoby wykonania poprzez sloty zadań, a każdy slot może uruchamiać rurociąg równoległych zadań.

Dla ręcznego skalowania, API REST dostarcza punkt końcowy do przeskalowania pracy (/jobs/{jobid}/rescaling) jako operację asynchroniczną.

Jeśli jesteś w Kubernetes z Operatorem Kubernetes Flink, projekt operatora reklamuje „Flink Job Autoscaler" jako część swojego zestawu funkcji, co jest warte oceny, jeśli Twoje obciążenia znacznie się różnią.

Kopie zapasowe i bezpieczne aktualizacje: checkpointy i savepointy

Checkpointy są do automatycznego odzyskiwania i są zarządzane przez Flink; savepointy są do operacji cyklu życia sterowanych przez użytkownika (zatrzymaj/wznów/fork/aktualizuj).

Z punktu widzenia SRE:

  • Użyj checkpointów do „utrzymywania rurociągu działającego przez awarie".
  • Użyj savepointów do „wdrożenia nowej wersji bez utraty stanu".

API REST Flink obsługuje również asynchroniczne wyzwalanie savepointów, co jest przydatne dla przepływów pracy stylu GitOps „deploy → trigger savepoint → upgrade".

CI/CD: GitOps + Helm + przesłanie pracy REST

Dla Kubernetes:

  • Utrzymuj instalację operatora i Twoje CR FlinkDeployment w Git, wdrażaj poprzez Argo CD/Flux i wersjonuj obrazy kontenerów na build. Dokumentacja Helm operatora wyraźnie omawia „Praca z Argo CD".

Dla klastrów samodzielnych/sesyjnych:

  • Użyj punktów końcowych API REST Flink do przesłania JAR i uruchomienia dla wdrożeń niemutable artefaktów.

Zauważ też subtelną, ale cenną przełączkę bezpieczeństwa/ops: web.submit.enable rządzi przesłaniami poprzez UI WWW, ale dokumentacja zauważa, że nawet gdy jest wyłączona, klastry sesyjne nadal akceptują przesłania prac poprzez żądania REST; to jest istotne przy utwardzaniu powierzchni UI przy zachowaniu automatyzacji CI/CD.

Systemy LLM są często tak dobre jak ich kontekst w czasie rzeczywistym. Flink pasuje do stosów LLM/AI jako komponent, który produkuje „zawsze świeże" cechy, wektory i agregaty zachowań.

Powszechny wzorzec to:

  • pobieranie działań/zdarzeń użytkowników,
  • agregacja sesji i preferencji,
  • tworzenie zadań generowania wektorów,
  • zapisywanie wektorów do magazynu wektorowego i/lub magazynu cech.

Dokumentacja zarządzania zależnościami PyFlink wyraźnie wskazuje „przewidywanie uczenia maszynowego" i ładowanie modeli ML wewnątrz Python UDF (do wykonania na zdalnym klastrze), co bezpośrednio mapuje się na podejścia „inferencji online wewnątrz operatorów Flink".

Aktualizacje magazynu cech online dla rekomendacji i rankingu

Model stanu kluczowanego i checkpointowania Flink jest zbudowany do utrzymania stanu operatora przez zdarzenia i niezawodnego odzyskiwania. To jest naturalne dopasowanie dla ciągłych obliczeń cech (stawki rolkowe, liczniki, metryki z czasem wygasania), których potrzebują dalsze rekomendatory.

Praktyczne kompromisy opóźnienia/spójności dla rurociągów AI

Jeśli Twoja architektura wymaga semantyki dokładnie raz end-to-end (np. unikaj podwójnych aktualizacji cech lub podwójnych zdarzeń rozliczeniowych), zbudujesz źródła i zlewnie wokół checkpointingu i gwarancji transakcyjnych.

W stosach opartych na Kafce konkretnie:

  • Łącznik Kafka Flink może dostarczyć gwarancje dokładnie raz, gdy checkpointing jest włączony i opcje gwarancji dostawy są skonfigurowane.
  • Kafka Streams również obsługuje semantykę dokładnie raz (EOS), co jest istotne, jeśli Twój „rurociąg cech AI" jest wystarczająco mały, aby mieszcząc się w kodzie aplikacji, a nie w klastrze Flink.

Flink as the real-time AI context builder

Ten diagram jest oparty na podstawowych primitive Flink: przetwarzanie czasu zdarzenia (watermarki), backendy stanu (state.backend.type i stan lokalny zarządzany przez system) oraz mechanizmy checkpoint/savepoint dla tolerancji błędów i operacji.