В рамках демонстрации мы будем использовать open-source компоненту для работы с Kafka от компании ДНС.
https://github.com/skalkindv/onec_librdkafka
Для хранения Объектных типов данных в Kafka существует очень удобный функционал в Kafka, который обеспечивает отсутствие необходимости держать дубли данных в Топике.
Чтобы проверить это на практике, вы можете развернуть Kafka в докере, файл в приложенных.
Вот его листинг:
version: "3.9"
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
hostname: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 2181:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
hostname: kafka
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
kafka-ui:
image: provectuslabs/kafka-ui
container_name: kafka-ui
ports:
- 8090:8080
restart: always
environment:
- KAFKA_CLUSTERS_0_NAME=local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:29092
- KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181
links:
- kafka
- zookeeper
Далее с помощью инструмента Offset Explorer 2.0 можно создать свой топик и выставить у него нужные настройки.
Самая важная настройка - это cleanup.policy, она вместо стандартной delete, должна быть compact.
Теперь Kafka с этим Топиком будет работать по другому, она будет удалять сообщения не только стандартным механизмом, но еще и одинаковые по одному и тому же ключу. И будет она это делать особым способом.
Так как в Kafka топике сообщения хранятся не в виде одного отдельного файла привязанного к топику, а во многих файлах - сегментах (Segment).
Kafka пишет новые сообщения только в Активный сегмент, а чистить сообщения будет только в неактивных сегментах.
По умолчанию в настройках топика сегмент становится неактивным когда он вырастает до 1 Гигабайта или сообщения в сегменте старше 7 дней.
Еще одна важная настройка min.cleanable.dirty.ratio - она говорит о том, какое соотношение должно быть очищенных сегментов к грязным, чтобы запустился процесс чистки грязного сегмента.
Т.е. после того как сегмент становится неактивным, он автоматически становится "Грязным" (процедура очистки дублей по нему не срабатывала). После того как она сработает сегмент станет "Чистым" (В чистых сегментах сообщений с одинаковыми ключами быть не может). Из этого можно сделать вывод, что в топике могут быть дубли только из-за того что сообщения с одинаковыми ключами или в Активном или в Грязных сегментах. И следующий вывод - Активный сегмент всегда грязный, по нему очистка не работает.
Мы можем уменьшить размер сегмента и установить его равным, например, 10 МБ, и записать туда много сообщений, где ключи будут дублироваться.
После первой отправки у нас будут сообщения начинаться со смещения 0.
После нескольких отправок одинаковых данных, мы увидим другую картину.
В этом году я подался как докладчик на Инфостарт с темой о Kafka, прошу проголосовать за меня: