Apache Flink на K8s и Kafka: PyFlink, Go, эксплуатация и управляемое ценообразование

Stateful-стриминг, контрольные точки, K8s, PyFlink, Go.

Содержимое страницы

Apache Flink — это фреймворк для вычислений с сохранением состояния над ограниченным и неограниченным потоками данных.

Команды выбирают его для корректной потоковой обработки с низкой задержкой с семантикой времени события (маркеры воды), отказоустойчивостью (контрольные точки), управляемыми обновлениями (точки сохранения) и поверхностью для эксплуатации (метрики и REST).

Обработка потоков данных Apache Flink

Это руководство предназначено для DevOps-инженеров и разработчиков на Go/Python. В нем сравниваются модели развертывания (самостоятельное управление и управляемые сервисы), объясняется основная архитектура, рассматриваются настройки Kubernetes (Helm и Operator) и автономные установки, проводится сравнение Flink со Spark, Kafka Streams, Beam и потоковыми базами данных, а также демонстрируются паттерны интеграции PyFlink и Go, включая конвейеры для LLM и AI.

Для более широкого контекста паттернов инфраструктуры данных, включая объектное хранилище, базы данных и системы обмена сообщениями, см. Инфраструктура данных для AI-систем: объектное хранилище, базы данных, поиск и архитектура данных для AI.

Apache Flink явно позиционируется как движок потоковой обработки с сохранением состояния: вы моделируете логику как конвейер операторов, а Flink выполняет его как распределенный поток данных с управляемым состоянием и семантикой времени. В современной документации Flink проект описывает себя как фреймворк и распределенный движок обработки для вычислений с сохранением состояния над неограниченными и ограниченными потоками данных.

С практической точки зрения DevOps и инженерии программного обеспечения, Flink хорошо подходит, если вам требуется хотя бы одно из следующих свойств:

Если вам нужна объединение/агрегация/обогащение с низкой задержкой и гарантиями корректности, вы обычно используете обработку времени событий (event-time) в Flink, где «время» — это момент, когда событие произошло (а не когда оно прибыло), а маркеры воды сообщают о прогрессе времени события через конвейер.

Если вам нужны вычисления с сохранением состояния в масштабе (скользящие счетчики, сессии, правила обнаружения мошенничества, конструирование признаков), Flink рассматривает состояние как первоклассную часть модели программирования и делает его отказоустойчивым через контрольные точки.

Если вам нужна эксплуатационно надежная потоковая обработка (сбои, обновляемые развертывания, перезапуски), Flink создает контрольные точки состояния и позиций потока, чтобы задача могла восстановиться и продолжить работу с той же семантикой «как будто сбои не происходили».

Типичные сценарии использования для команд DevOps, Go, Python и AI

Flink широко используется для «конвейеров данных и ETL», «потоковой аналитики» и «событийно-ориентированных приложений» (категории, используемые в документации Flink).

Для стека DevOps + Go/Python типичные паттерны выглядят следующим образом:

Сервис на Go генерирует события в Kafka; Flink потребляет эти события, выполняет обработку с сохранением состояния (например, удаление дубликатов, оконная агрегация, обогащение), а затем записывает производные факты обратно в Kafka или базу данных. Механизмы операторов и контрольных точек Flink существуют для того, чтобы сделать такие конвейеры с сохранением состояния безопасными для продакшена.

Для команд ML/LLM PyFlink явно указывает такие сценарии, как «предсказание машинного обучения» и загрузка моделей машинного обучения внутри Python UDF как мотивация управления зависимостями, что является прямым подтверждением паттернов «задача Flink как среда выполнения для онлайн-инференса / конструирования признаков».

Время выполнения Flink состоит из двух типов процессов: JobManager и TaskManagers. Документация подчеркивает, что клиенты отправляют поток данных в JobManager; затем клиент может отключиться (отсоединенный режим) или оставаться подключенным (присоединенный режим).

JobManager координирует распределенное выполнение: планирование, реакция на завершение/сбой задач, координация контрольных точек и координация восстановления. Внутренне он включает: ResourceManager (слоты/ресурсы), Dispatcher (REST + веб-интерфейс + создание JobMaster для каждой задачи) и JobMaster (управляет одной задачей).

TaskManagers выполняют операторы/задачи и обмениваются/буферизируют потоки данных. Наименьшей единицей планирования является слот задачи; несколько операторов могут выполняться в одном слоте (цепочка операторов и совместное использование слотов влияют на это).

Цепочка операторов и слоты задач для контроля производительности и затрат

Flink объединяет подзадачи операторов в задачи, где каждая задача выполняется одной потоком. Это описывается как оптимизация производительности, которая снижает накладные расходы на передачу потоков и буферизацию, увеличивая пропускную способность и снижая задержку.

Слоты важны с эксплуатационной точки зрения, так как они являются единицей планирования ресурсов/изоляции. Flink отмечает, что у каждого TaskManager может быть один или несколько слотов задач; распределение слотов резервирует управляемую память на слот, но не изолирует CPU.

Обработка времени событий, маркеры воды и поздние данные

Flink поддерживает несколько понятий времени — время события, время загрузки, время обработки — и использует маркеры воды для моделирования прогресса во времени события.

Для работы со временем события Flink требует назначения временных меток событиям и генерации маркеров воды; официальная документация «Генерация маркеров воды» объясняет назначение временных меток и генерацию маркеров воды как основные строительные блоки, где WatermarkStrategy является стандартным способом настройки распространенных стратегий.

Отказоустойчивость: контрольные точки против точек сохранения в реальных системах

Контрольные точки существуют потому, что «каждая функция и оператор в Flink могут иметь состояние»; состояние должно быть сохранено в контрольных точках, чтобы стать отказоустойчивым. Контрольные точки позволяют восстановить как состояние, так и позиции потока, чтобы выполнение могло возобновиться с семантикой без сбоев.

Flink очень четко указывает, что точки сохранения — это «согласованное изображение состояния выполнения потоковой задачи, созданное через механизм контрольных точек Flink», используемое для остановки и возобновления, форка или обновления задач. Точки сохранения хранятся на стабильном хранилище (например, HDFS, S3).

Официальная страница «Контрольные точки против точек сохранения» определяет разницу как резервное копирование против журналов восстановления: контрольные точки — частые, легкие, управляются Flink для восстановления после сбоев; точки сохранения управляются пользователем и используются для управляемых операций, таких как обновления.

Время выполнения Flink с открытым исходным кодом «бесплатно» в смысле лицензирования, но в продакшене вы платите за инфраструктуру и эксплуатационные усилия.

Flink разработан для интеграции с общими менеджерами ресурсов (например, YARN и Kubernetes) и может также работать как автономный кластер или как библиотека.

Затраты на вычисления и память определяются JobManager и TaskManagers, а также вашей конфигурацией параллелизма и слотов. Документация конфигурации Flink явно указывает jobmanager.memory.process.size, taskmanager.memory.process.size, taskmanager.numberOfTaskSlots и parallelism.default как основные параметры для распределенных наборов.

Локальный диск — частая скрытая стоимость для задач с сохранением состояния. Flink отмечает, что io.tmp.dirs хранит локальные данные, включая файлы RocksDB, переливаемые промежуточные результаты и кэшированные JAR; если эти данные удалены, это может вызвать «тяжелую операцию восстановления», поэтому они должны находиться на хранилище, которое не очищается периодически.

Затраты на надежное объектное/файловое хранилище определяются директориями контрольных точек и точек сохранения. В конфигурации Flink 2.x контрольные точки и точки сохранения настраиваются через execution.checkpointing.dir и execution.checkpointing.savepoint-dir и принимают URI, такие как s3://… или hdfs://….

Управляемые сервисы снижают эксплуатационные затраты, но добавляют плату за платформу и ограничения. Детали зависят от провайдера.

Amazon Managed Service for Apache Flink взимает плату за KPU (1 vCPU + 4 ГБ памяти на KPU) и тарифицируется по продолжительности и количеству KPU с шагом в одну секунду. AWS также взимает дополнительную плату за «оркестрацию» KPU на приложение и отдельные сборы за хранилище/резервное копирование.

Confluent Cloud for Apache Flink основан на использовании и является бессерверным: вы создаете пул вычислений, и вам начисляется плата за потребленные CFU в минуту, пока выполняются запросы. Страница биллинга включает пример цены $0.21 за CFU-час (зависит от региона) и подчеркивает, что вы можете ограничить расходы через максимумы пула вычислений.

Aiven и Alibaba Cloud — заметные управляемые провайдеры Flink на рынке, но их публичные цены и детали биллинга варьируются в зависимости от плана/региона и могут требовать калькуляторов или связи с отделом продаж; относитесь к точным затратам как к неуказанным, если вы не запросите цитату для региона+плана из их текущей документации.

Ververica предлагает как самостоятельные, так и управляемые варианты развертывания вокруг Flink; публичные страницы подчеркивают выбор развертывания и позиционирование управляемого сервиса, в то время как точные цены обычно обрабатываются через потоки «связь/детали цен» (поэтому конкретные цифры часто не указаны публично).

Вариант развертывания Лучше всего подходит для Эксплуатационная сложность Ключевые преимущества Ключевые риски / компромиссы
Автономный кластер (ВМ/голое железо) Малые команды, фиксированная емкость Средняя–Высокая Полный контроль; самая простая ментальная модель HA, автоскейлинг, обновления — DIY (больше ручной работы)
Kubernetes с оператором Flink Kubernetes Большинство современных платформенных команд Средняя Декларативные развертывания; управление жизненным циклом через цикл управления; оператор поддерживает развертывания Application/Session/Job Требуется опыт работы с Kubernetes и оператором
Нативный Kubernetes (без оператора) Команды K8s, желающие прямой интеграции Средняя–Высокая Прямая интеграция ресурсов; динамическое распределение/освобождение TaskManager, описанное в документации Flink-on-K8s Более индивидуальная автоматизация, чем с оператором
YARN Платформы на базе Hadoop Средняя Интеграция с управлением ресурсами YARN Сложность стека Hadoop
AWS Managed Service for Apache Flink Нативные для AWS стеки данных Низкая–Средняя Управляемая оркестрация + опции масштабирования; предсказуемая единица биллинга (KPU) Привязка к платформе; дополнительная плата за KPU оркестрации на приложение + сборы за хранилище
Confluent Cloud for Apache Flink Магазины с приоритетом Kafka, потоковые приложения на SQL Низкая Бессерверная тарификация по использованию; учет CFU-минут; пулы вычислений для ограничения расходов Затраты на CFU + затраты на сеть Kafka; специфичные для сервиса API
Управляемые предложения Ververica Предприятиям, нуждающимся в экспертной эксплуатации Flink Низкая–Средняя Позиционирование как управляемый сервис от «экспертов по Flink» Цены часто не прозрачны (не указаны)

Таблица управляемых провайдеров и затрат

Цены меняются в зависимости от региона и времени; если вам нужны точные цифры для вашего региона, используйте это как отправную точку и проверяйте текущие страницы цен провайдера (регионы без цитат не указаны).

Провайдер Форма «плана» Единица биллинга Пример цены за вычисления Заметные дополнительные факторы затрат
Amazon Managed Service for Apache Flink Управляемое время выполнения KPU (1 vCPU + 4 ГБ) Пример: $0.11 за KPU-час (США Восток, Северная Вирджиния) +1 KPU оркестрации на приложение; работающее хранилище; опциональные надежные резервные копии
Confluent Cloud for Apache Flink Бессерверная SQL/обработка CFU-минута/CFU-час Пример: $0.21 за CFU-час (зависит от региона) Все еще действуют тарифы сети Kafka; максимум пула вычислений для ограничения расходов
Ververica (управляемый) Управляемая «Единая платформа потоковых данных» Не указано (публичные страницы) Не указано Функции платформы/SLA; цены обычно через отдел продаж (не указаны)
Aiven for Apache Flink Управляемый сервис Модель почасовой оплаты использования (по платформе) Не указано без плана/региона Уровень плана + облачный регион + дополнительные опции (не указаны)
Alibaba Cloud Realtime Compute for Apache Flink Управляемый/бессерверный Смешанная оплата (оплата по факту использования + подписка) Не указано без деталей региона/рабочего пространства Ограничения на основе CU и модель рабочего пространства (детали варьируются; здесь не указаны)

Flink находится в занятой экосистеме. «Лучший» выбор зависит от задержки, способности сохранять состояние, эксплуатационных предпочтений и модели авторинга.

Инструмент Что это такое Модель выполнения потоковой обработки История состояния и «точно один раз» Где он сияет Типичные проблемы
Apache Flink Распределенный движок потоковой обработки для вычислений с сохранением состояния Непрерывная потоковая обработка + время события через маркеры воды Отказоустойчивость на основе контрольных точек; точки сохранения для управляемых обновлений Потоки с низкой задержкой и сохранением состояния; сложная логика времени события Корректная эксплуатация состояния, контрольных точек и обновлений требует дисциплины
Apache Spark Structured Streaming Потоковый движок Spark, построенный вокруг DataFrames/Datasets По умолчанию модель микробатчей (с непрерывным режимом, обсуждаемым отдельно) Сильный для аналитических конвейеров; состояние существует, но часто с более высокой задержкой Объединенные API пакетной и потоковой обработки; экосистема Spark Задержка микробатчей и ментальная модель «потоковая обработка как инкрементальные батчи»
Kafka Streams Библиотека для создания приложений потоковой обработки на Kafka Обработка по одному записи Поддерживает семантику обработки «точно один раз» (EOS) Простые приложения на базе Kafka; встраивание в сервис JVM Только JVM; менее гибкий для крупных распределенных паттернов вычислений
Apache Beam Единая модель программирования + SDK; выполняется через раннеры (Flink, Spark, Dataflow и т.д.) Зависит от раннера; конвейеры Beam переводятся в задачи раннера Семантика зависит от матрицы возможностей раннера (специфично для раннера) Портативность, мультиязычные конвейеры; избегание привязки к движку Эксплуатационная настройка все равно оказывается специфичной для раннера
Materialize «Слой живых данных» / потоковая SQL БД; инкрементально обновляет результаты по мере поступления данных Непрерывное инкрементальное обслуживание представлений Сильные заявления о согласованности в документации продукта (детали специфичны для продукта) Предоставление свежих производных представлений для приложений/агентов AI Другая эксплуатационная модель, чем задачи Flink; не среда выполнения общего API операторов
RisingWave Потоковая база данных, где потоковая обработка выражается как материализованные представления Непрерывное обслуживание материализованных представлений SQL-ориентированная; семантика специфична для движка Потоковые приложения на SQL без создания задач Flink Меньшая гибкость для произвольных конвейеров с большим количеством кода

Полезное правило: если вам нужна среда выполнения для сложных потоковых задач с сохранением состояния с глубоким контролем времени события, логики операторов и развертывания, Flink — основной кандидат. Если вам нужны инкрементальные представления SQL-первого для обслуживания, потоковые базы данных могут быть альтернативой. Если вам нужна библиотека, встроенная в сервис, Kafka Streams конкурентоспособна. Если вам нужно одно портативное определение конвейера для всех движков, Beam привлекателен.

Для облачно-нативных событийно-ориентированных архитектур с использованием AWS, Построение событийно-ориентированных микросервисов с AWS Kinesis охватывает паттерны Kinesis Data Streams для обработки в реальном времени и развязки сервисов.

Этот раздел намеренно практичен: конфигурация, развертывание и то, как ваши сервисы на Go/Python обычно взаимодействуют с Flink.

Сервисы на Go + Kafka + Flink + слой обслуживания

Flink часто является «состоянием посередине», которое превращает высокообъемные события в надежные сигналы (счетчики, сессии, аномалии, обогащенные записи). Контрольные точки и бэкенды состояния делают эту середину надежной в продакшене.

Важное примечание о версии: начиная с Flink 2.0, поддерживаемым файлом конфигурации является conf/config.yaml; предыдущий flink-conf.yaml «больше не поддерживается».

Минимальный (иллюстративный) conf/config.yaml для небольшого самостоятельно управляемого кластера:

# conf/config.yaml (стиль Flink 2.x)
rest:
  address: flink-jobmanager.example.internal
  port: 8081

jobmanager:
  rpc:
    address: flink-jobmanager.example.internal
    port: 6123
  memory:
    process:
      size: 2048m

taskmanager:
  memory:
    process:
      size: 4096m
  numberOfTaskSlots: 2

parallelism:
  default: 2

# Значения по умолчанию для контрольных точек (задачи все еще могут переопределять в коде)
state:
  backend:
    type: rocksdb
execution:
  checkpointing:
    dir: s3://my-bucket/flink/checkpoints
    savepoint-dir: s3://my-bucket/flink/savepoints
    interval: 60 s

# Избегайте временных директорий, которые очищаются (файлы RocksDB, кэшированные JAR и т.д.)
io:
  tmp:
    dirs: ["/var/lib/flink/tmp"]

Почему эти ключи: справочник конфигурации Flink явно документирует детали обнаружения rest.* и jobmanager.rpc.*, ключи памяти процесса, ключи слотов/параллелизма и настройки контрольных точек по умолчанию, включая state.backend.type, execution.checkpointing.dir, execution.checkpointing.savepoint-dir и execution.checkpointing.interval.

Выбор io.tmp.dirs эксплуатационно важен, потому что Flink использует его для локальных файлов RocksDB и кэшируемых артефактов; удаление может вызвать тяжелое восстановление.

Если вы используете Flink 1.x (все еще распространенный в некоторых управляемых средах), вы встретите flink-conf.yaml в дикой природе. Это устарело для пользователей Flink 2.x.

# conf/flink-conf.yaml (устаревший стиль 1.x; НЕ поддерживается в Flink 2.x)
jobmanager.rpc.address: flink-jobmanager
rest.port: 8081
taskmanager.numberOfTaskSlots: 2
parallelism.default: 2

# Устаревшие ключи контрольных точек отличаются по версии; воспринимайте как иллюстративные.
state.backend.type: rocksdb
state.checkpoints.dir: s3://my-bucket/flink/checkpoints
state.savepoints.dir: s3://my-bucket/flink/savepoints

Если вы мигрируете, Flink предоставляет скрипт миграции (bin/migrate-config-file.sh) для конвертации flink-conf.yaml в config.yaml.

Оператор Flink Kubernetes действует как плоскость управления для управления жизненным циклом приложений Flink и устанавливается с помощью Helm.

Согласно официальной документации Helm оператора, вы можете установить как из диаграммы в исходном дереве, так и из репозитория диаграмм, размещенного Apache:

# установка из встроенной диаграммы в исходном дереве
helm install flink-kubernetes-operator helm/flink-kubernetes-operator

# установка из репозитория Helm загрузок Apache (замените <OPERATOR-VERSION>)
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-<OPERATOR-VERSION>/
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator

Эти точные команды показаны в документации по установке Helm оператора.

Пример CR FlinkDeployment (иллюстративный)

Это упрощенный пример, показывающий точки интеграции, которые вы обычно настраиваете (образ, ресурсы, места контрольных точек, логирование/метрики). Оператор согласует это желаемое состояние через свой цикл управления.

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: realtime-sessions
  namespace: flink
spec:
  image: my-registry.example.com/flink/realtime-sessions:2026-03-06
  flinkVersion: v2_2
  serviceAccount: flink
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.backend.type: "rocksdb"
    execution.checkpointing.dir: "s3://my-bucket/flink/checkpoints/realtime-sessions"
    execution.checkpointing.savepoint-dir: "s3://my-bucket/flink/savepoints/realtime-sessions"
    execution.checkpointing.interval: "60 s"
    rest.port: "8081"
  jobManager:
    resource:
      cpu: 1
      memory: "2048m"
  taskManager:
    resource:
      cpu: 2
      memory: "4096m"
  job:
    jarURI: local:///opt/flink/usrlib/realtime-sessions.jar
    parallelism: 4
    upgradeMode: savepoint
    state: running

Паттерн upgradeMode: savepoint распространен, когда вы хотите безопасные обновления с сохранением состояния; точки сохранения предназначены для рабочих процессов остановки/возобновления/форка/обновления и указывают на стабильные места хранения.

PyFlink — это API Python для Apache Flink, явно продвигаемый для масштабируемых пакетных/потоковых нагрузок, включая конвейеры ML и ETL.

Когда вы используете коннекторы JVM (Kafka, JDBC и т.д.) из PyFlink, вы должны убедиться, что соответствующие JAR доступны задаче. Документация Flink «Управление зависимостями» для Python показывает три стандартных механизма:

Установка pipeline.jars (Table API), вызов add_jars() (DataStream API) или CLI --jarfile при отправке.

Этот пример читает события JSON из Kafka, назначает временные метки времени события (с ограниченным беспорядком), поддерживает подсчет по пользователям в состоянии с ключом и записывает обогащенное событие в выходную тему.

Примечания:

  • KafkaSource строится через KafkaSource.builder() и требует серверов bootstrap, тем и десериализатора.
  • Конфигурация sink Kafka «точно один раз» в PyFlink требует установки гарантии доставки и префикса транзакционного ID.
  • Значения контрольных точек по умолчанию можно настроить в конфигурации Flink (execution.checkpointing.*) и/или в коде; ключи конфигурации задокументированы в справочнике конфигурации Flink.
import json
from typing import Any

from pyflink.common import Duration, Types
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.watermark_strategy import WatermarkStrategy, TimestampAssigner
from pyflink.common.configuration import Configuration

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
from pyflink.datastream.state import ValueStateDescriptor

from pyflink.datastream.connectors import DeliveryGuarantee
from pyflink.datastream.connectors.kafka import (
    KafkaSource,
    KafkaOffsetsInitializer,
    KafkaSink,
    KafkaRecordSerializationSchema,
)


class EventTimeFromJson(TimestampAssigner):
    """
    Извлекает event_time_ms из JSON-нагрузки.
    Ожидаемый формат: {"user_id":"u1","event_time_ms":1710000000000,"event":"click",...}
    """
    def extract_timestamp(self, value: str, record_timestamp: int) -> int:
        try:
            obj = json.loads(value)
            return int(obj["event_time_ms"])
        except Exception:
            # откат: использовать временную метку записи (загрузки) при ошибке
            return record_timestamp


class RollingCount(KeyedProcessFunction):
    def open(self, runtime_context: RuntimeContext):
        desc = ValueStateDescriptor("rolling_count", Types.LONG())
        self.count_state = runtime_context.get_state(desc)

    def process_element(self, value: str, ctx: 'KeyedProcessFunction.Context'):
        obj = json.loads(value)
        current = self.count_state.value()
        if current is None:
            current = 0
        current += 1
        self.count_state.update(current)

        # эмитировать обогащенное событие
        obj["rolling_count"] = current
        obj["event_time_ms"] = int(obj.get("event_time_ms", 0))
        yield json.dumps(obj)


def build_env() -> StreamExecutionEnvironment:
    # Значения по умолчанию для кластера/задачи (можно также установить в config.yaml)
    cfg = Configuration()
    cfg.set_string("state.backend.type", "rocksdb")
    cfg.set_string("execution.checkpointing.dir", "s3://my-bucket/flink/checkpoints/realtime-sessions")
    cfg.set_string("execution.checkpointing.interval", "60 s")
    env = StreamExecutionEnvironment.get_execution_environment(cfg)

    # В PyFlink JAR коннекторов должны быть доступны; используйте env.add_jars(...) при необходимости.
    # env.add_jars("file:///opt/flink/lib/flink-connector-kafka-<VERSION>.jar")

    # Включите контрольные точки явно (задачи могут переопределять значения по умолчанию)
    env.enable_checkpointing(60_000)
    env.set_parallelism(4)
    return env


def main():
    env = build_env()

    source = (
        KafkaSource.builder()
        .set_bootstrap_servers("kafka:9092")
        .set_topics("events.raw")
        .set_group_id("realtime-sessions-v1")
        .set_value_only_deserializer(SimpleStringSchema())
        .set_starting_offsets(KafkaOffsetsInitializer.earliest())
        .build()
    )

    watermark_strategy = (
        WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(10))
        .with_timestamp_assigner(EventTimeFromJson())
    )

    stream = (
        env.from_source(source, watermark_strategy=watermark_strategy, source_name="kafka-events-raw")
        .key_by(lambda s: json.loads(s)["user_id"])
        .process(RollingCount(), output_type=Types.STRING())
    )

    record_serializer = (
        KafkaRecordSerializationSchema.builder()
        .set_topic("events.enriched")
        .set_value_serialization_schema(SimpleStringSchema())
        .build()
    )

    sink = (
        KafkaSink.builder()
        .set_bootstrap_servers("kafka:9092")
        .set_record_serializer(record_serializer)
        .set_delivery_guarantee(DeliveryGuarantee.EXACTLY_ONCE)
        .set_transactional_id_prefix("realtime-sessions-txn")
        .build()
    )

    stream.sink_to(sink)

    env.execute("realtime-sessions-pyflink")


if __name__ == "__main__":
    main()

Вызовы API выше соответствуют паттерну использования конструктора KafkaSource PyFlink и обязательным полям. Для гарантий доставки документация PyFlink KafkaSinkBuilder явно указывает, что для DeliveryGuarantee.EXACTLY_ONCE вы должны установить префикс транзакционного ID. Для временных меток/маркеров воды документация Flink объясняет назначение временных меток и генерацию маркеров воды как механизм обработки времени события, а PyFlink предоставляет API WatermarkStrategy, зеркально отражающий эту модель.

У Go нет нативного API авторинга задач Flink, как у Java/Python, поэтому системы на Go обычно интегрируются с Flink через:

  • Kafka (или другие брокеры) как точку входа/выхода.
  • REST API Flink для эксплуатационных действий (загрузка JAR, запуск задач, запрос состояния задачи, триггеринг точек сохранения, рескейлинг).

Для настройки Kafka и паттернов локальной разработки см. Быстрый старт Apache Kafka - Установка Kafka 4.2 с CLI и локальными примерами.

Пример производителя/потребителя Go Kafka (kafka-go)

package main

import (
	"context"
	"log"
	"time"

	"github.com/segmentio/kafka-go"
)

func main() {
	ctx := context.Background()

	// Производитель: запись сырых событий
	writer := &kafka.Writer{
		Addr:         kafka.TCP("kafka:9092"),
		Topic:        "events.raw",
		RequiredAcks: kafka.RequireAll,
	}
	defer writer.Close()

	err := writer.WriteMessages(ctx, kafka.Message{
		Key:   []byte("user:u1"),
		Value: []byte(`{"user_id":"u1","event_time_ms":1710000000000,"event":"click"}`),
		Time:  time.Now(),
	})
	if err != nil {
		log.Fatal(err)
	}

	// Потребитель: чтение обогащенных событий
	reader := kafka.NewReader(kafka.ReaderConfig{
		Brokers:  []string{"kafka:9092"},
		Topic:    "events.enriched",
		GroupID:  "go-debug-consumer",
		MinBytes: 1e3,
		MaxBytes: 10e6,
	})
	defer reader.Close()

	msg, err := reader.ReadMessage(ctx)
	if err != nil {
		log.Fatal(err)
	}
	log.Printf("enriched key=%s value=%s\n", string(msg.Key), string(msg.Value))
}

Это «трубопроводный» код, но это самая распространенная практическая поверхность интеграции: темы Kafka являются границей между Flink и кастомными сервисами.

REST API Flink является частью веб-сервера JobManager и по умолчанию слушает порт 8081 (настраивается через rest.port).

Официальная спецификация OpenAPI для диспетчера включает /jars/upload и явно указывает:

  • Загрузка JAR должна отправляться как multi-part данные
  • убедитесь, что заголовок Content-Type установлен в application/x-java-archive
  • предоставляет пример curl с использованием -F jarfile=@path/to/flink-job.jar

Практический фрагмент Go для загрузки JAR:

package flink

import (
	"bytes"
	"context"
	"fmt"
	"io"
	"mime/multipart"
	"net/http"
	"os"
)

func UploadJar(ctx context.Context, flinkBaseURL, jarPath string) (*http.Response, error) {
	f, err := os.Open(jarPath)
	if err != nil {
		return nil, err
	}
	defer f.Close()

	var body bytes.Buffer
	w := multipart.NewWriter(&body)

	part, err := w.CreateFormFile("jarfile", "job.jar")
	if err != nil {
		return nil, err
	}
	if _, err := io.Copy(part, f); err != nil {
		return nil, err
	}
	if err := w.Close(); err != nil {
		return nil, err
	}

	req, err := http.NewRequestWithContext(ctx, http.MethodPost, flinkBaseURL+"/jars/upload", &body)
	if err != nil {
		return nil, err
	}

	// Важно: граница multi-part
	req.Header.Set("Content-Type", w.FormDataContentType())

	// Некоторые клиенты также устанавливают "Expect:" аналогично примеру curl в спецификации.
	req.Header.Set("Expect", "")

	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		return nil, err
	}
	if resp.StatusCode >= 300 {
		return resp, fmt.Errorf("upload failed: %s", resp.Status)
	}
	return resp, nil
}

Этот код основан на описании REST API OpenAPI для /jars/upload, включая требование multi-part и ссылку на curl.

Для запуска ранее загруженного JAR Flink предоставляет /jars/{jarid}/run и поддерживает передачу аргументов программы через параметры запроса (и/или тело JSON).

Эксплуатационно ценные конечные точки, которые вы, вероятно, автоматизируете:

  • /jobs и /jobs/{jobid} для списка и проверки состояния задачи
  • /jobs/{jobid}/savepoints для триггеринга точек сохранения (асинхронный триггер + опрос)
  • /jobs/{jobid}/rescaling для триггеринга рескейлинга
Аспект PyFlink (задачи Python) Go (сервисы вокруг Flink)
Авторинг логики Flink Нативное авторинг через API DataStream/Table; поддержка состояния + таймеров Нет нативного API Flink; реализуйте логику в Flink (Java/Python) и интегрируйте внешним образом
Коннекторы/зависимости Необходимо поставлять JAR коннекторов через pipeline.jars, add_jars или --jarfile Не применимо (вы не работаете внутри Flink), но вы управляете клиентами Kafka/DB
Ввод/вывод Строители KafkaSource/KafkaSink в PyFlink Библиотеки производителя/потребителя Kafka; стандартные паттерны микросервисов
Автоматизация эксплуатации Можно вызывать конечные точки REST Flink Часто владеет автоматизацией: загрузка JAR, развертывание, рескейлинг, триггеринг точки сохранения через REST

Flink поддерживает экспорт метрик путем настройки отчетчиков метрик в файле конфигурации Flink; эти отчетчики создаются на JobManager и TaskManagers.

Для Prometheus Flink экспонирует метрики в формате Prometheus при настройке metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory в поддерживаемой среде версии Flink.

Обычно вы комбинируете это с ServiceMonitors Kubernetes (Prometheus Operator) или с вашим управляемым стеком мониторинга.

Масштабирование: параллелизм, слоты и автоскейлинг на основе операторов

Модель планирования Flink определяет ресурсы выполнения через слоты задач, и каждый слот может выполнять конвейер параллельных задач.

Для ручного масштабирования REST API предоставляет конечную точку рескейлинга для задачи (/jobs/{jobid}/rescaling) как асинхронную операцию.

Если вы используете Kubernetes с оператором Flink Kubernetes, проект оператора рекламирует «Автоскейлер задач Flink» как часть своего набора функций, что стоит оценить, если ваши рабочие нагрузки существенно варьируются.

Резервное копирование и безопасные обновления: контрольные точки и точки сохранения

Контрольные точки предназначены для автоматического восстановления и управляются Flink; точки сохранения предназначены для операций жизненного цикла, управляемых пользователем (остановка/возобновление/форк/обновление).

С точки зрения SRE:

  • Используйте контрольные точки для «поддержания работы конвейера через сбои».
  • Используйте точки сохранения для «развертывания новой версии без потери состояния».

REST API Flink также поддерживает асинхронный триггеринг точек сохранения, что полезно для рабочих процессов в стиле GitOps «развертывание → триггер точки сохранения → обновление».

CI/CD: GitOps + Helm + отправка задач через REST

Для Kubernetes:

  • Храните установку оператора и ваши CR FlinkDeployment в Git, развертывайте через Argo CD/Flux и версионируйте образы контейнеров на сборку. Документация Helm оператора явно обсуждает «Работу с Argo CD».

Для автономных/сессийных кластеров:

  • Используйте конечные точки загрузки JAR и запуска REST API Flink для развертывания неизменяемых артефактов.

Также обратите внимание на тонкий, но ценный переключатель безопасности/эксплуатации: web.submit.enable управляет загрузками через веб-интерфейс, но документация отмечает, что даже при отключении сессийные кластеры все еще принимают отправки задач через REST-запросы; это актуально при усилении поверхностей UI с сохранением автоматизации CI/CD.

Системы LLM часто так же хороши, как и их контекст в реальном времени. Flink вписывается в стеки LLM/AI как компонент, который производит «всегда свежие» признаки, эмбеддинги и поведенческие агрегаты.

Распространенный паттерн:

  • ввод действий/событий пользователей,
  • агрегация сессий и предпочтений,
  • генерация задач создания эмбеддингов,
  • запись эмбеддингов в векторное хранилище и/или хранилище признаков.

Документация управления зависимостями PyFlink явно указывает «предсказание машинного обучения» и загрузку моделей ML внутри Python UDF (для выполнения на удаленном кластере), что напрямую соответствует подходам «онлайн-инференс внутри операторов Flink».

Обновления хранилища признаков онлайн для рекомендаций и ранжирования

Модель состояния с ключом и контрольных точек Flink построена для поддержания состояния оператора через события и надежного восстановления. Это естественное совпадение для непрерывного вычисления признаков (скользящие скорости, счетчики, метрики с временным затуханием), в которых нуждаются downstream-рекомендаторы.

Практические компромиссы задержки/согласованности для AI-конвейеров

Если ваша архитектура требует семантики «точно один раз» end-to-end (например, избегайте дубликатов обновлений признаков или дубликатов событий биллинга), вы структурируете sink и source вокруг контрольных точек и транзакционных гарантий.

В стеках на базе Kafka в частности:

  • Коннектор Kafka Flink может предоставлять гарантии «точно один раз», когда включены контрольные точки и настроены опции гарантии доставки.
  • Kafka Streams также поддерживает семантику «точно один раз» (EOS), что актуально, если ваш «конвейер признаков AI» достаточно мал, чтобы жить внутри кода приложения, а не в кластере Flink.

Flink как строитель контекста AI в реальном времени

Эта диаграмма основана на основных примитивах Flink: обработка времени события (маркеры воды), бэкенды состояния (state.backend.type и локальное состояние, управляемое системой) и механизмы контрольных точек/точек сохранения для отказоустойчивости и эксплуатации.