Основная статья про DaJet Stream
Видео: знакомство с платформой DaJet
Дистрибутив DaJet 3.3.3 на GitHub
Команды CONSUME и PRODUCE в языке запросов DaJet Stream предназначены для организации потокового обмена данными (сообщениями), в том числе, например, с такими брокерами сообщений, как RabbitMQ.
Команда CONSUME работает с брокерами сообщений в режиме PUSH от сервера, то есть постоянно находится в режиме ожидания входящих сообщений от брокера. Однако, следует иметь в виду, что при работе с таблицами-очередями баз данных (смотри основную статью про DaJet Stream) процессор скриптов DaJet Stream, обрабатывая команду CONSUME, реализует паттерн pooling publisher, который используется совместно с паттерном transactional outbox. Это означает, что DaJet Stream работает с базой данных в режиме PULL на клиенте, реализуя так называемое деструктивное чтение. То есть сообщения удаляются из таблицы-очереди после их отправки получателю в транзакции. Таким образом, можно сказать, что команда CONSUME выполняет своего рода роль координатора распределённой транзакции, обеспечивая требования ACID для каждого пакета данных.
В любом случае, как для брокеров сообщений, так и для баз данных, DaJet Stream гарантирует доставку сообщений на уровне at-least-once-in-order (минимум один раз строго по порядку). Возможную дупликацию сообщений DaJet Stream не контроллирует. Для этого следует использовать, либо логику приложения, либо возможности брокера.
Важно! Все команды CONSUME, как для баз данных, так и для брокеров сообщений, подразумевают "бесконечное" потребление потока данных с автоматическим восстановлением подключений к своим источникам в случае их разрыва.
Ниже приведённые скрипты DaJet Stream демонстрируют основные сценарии использования адаптера Apache Kafka при помощи команд CONSUME и PRODUCE, используя возможные, но не обязательные для всех случаев, варианты настройки.
Для вашего конкретного сценария лучше всего предварительно изучить официальную документацию:
Важно! DaJet Stream ориентируется на использование текстовых сообщений в кодировке UTF-8.
Доступные свойства сообщения Apache Kafka
Свойство | Тип данных | Описание |
---|---|---|
Key | string | Ключ сообщения |
Value | string | Тело сообщения |
Topic | string | Наименование топика |
Выполнение скрипта в среде Windows | Выполнение скрипта в среде Linux |
---|---|
(из корневого каталога установки) dajet stream --file ./stream/script.sql |
(из корневого каталога установки) dotnet ./dajet.dll stream --file ./stream/script.sql |
1. Отправка сообщений: регистр сведений - топик Apache Kafka
DECLARE @message object -- Запись (сообщение) таблицы-очереди
DECLARE @empty_uuid uuid = "00000000-0000-0000-0000-000000000000"
-- ***********************************************************************
-- * Источник сообщений SQL Server - регистр сведений "ИсходящаяОчередь" *
-- ***********************************************************************
USE "mssql://zhichkin/dajet-exchange"
DECLARE @Отправитель string = SELECT Код
FROM ПланОбмена.ПланОбменаРИБ
WHERE Предопределённый <> @empty_uuid
CONSUME TOP 1000
НомерСообщения, ТипСообщения, ТелоСообщения
INTO @message
FROM РегистрСведений.ИсходящиеСообщения
ORDER BY НомерСообщения ASC
-- *****************************************************
-- * Приёмник сообщений Apache Kafka, топик test-topic *
-- *****************************************************
PRODUCE 'kafka'
WITH Acks = 'all' -- acks
, ClientId = 'dajet' -- client.id
, MaxInFlight = 1 -- max.in.flight
, BootstrapServers = '192.168.239.177:9092' -- bootstrap.servers (csv)
, MessageTimeoutMs = 30000 -- message.timeout.ms
, EnableIdempotence = false -- enable.idempotence
SELECT Key = @Отправитель -- Ключ сообщения
, Value = @message.ТелоСообщения -- Тело сообщения
, Topic = @message.ТипСообщения -- Топик Kafka
2. Отправка сообщений: план обмена - топик Apache Kafka
DECLARE @Источник string = 'mssql://zhichkin/dajet-exchange'
DECLARE @Приёмник string = 'test-topic' -- Топик Apache Kafka
DECLARE @changes object -- Запись таблицы регистрации регистрации изменений
DECLARE @message object -- Данные объекта базы данных для отправки
-- ********************************************************************************
-- * Источник сообщений, таблица регистрации изменений справочника "Номенклатура" *
-- ********************************************************************************
USE '{@Источник}'
DECLARE @Получатель entity = SELECT Ссылка
FROM ПланОбмена.ПланОбменаРИБ
WHERE Код = 'KAFKA'
AND ПометкаУдаления = false
CONSUME TOP 1000 Ссылка INTO @changes
FROM Справочник.Номенклатура.Изменения
WHERE УзелОбмена = @Получатель
ORDER BY Ссылка ASC
SELECT Ссылка = UUIDOF(Ссылка)
, Код = LTRIM(RTRIM(Данные.Код))
, Наименование = Данные.Наименование
, ПометкаУдаления = Данные.ПометкаУдаления
INTO @message
FROM Справочник.Номенклатура AS Данные
APPEND (SELECT Период, Цена
FROM РегистрСведений.ЦеныНоменклатуры
WHERE Номенклатура = @changes.Ссылка
ORDER BY Период ASC) AS Цены
WHERE Данные.Ссылка = @changes.Ссылка
-- *******************************************************
-- * Приёмник сообщений Apache Kafka, топик "test-topic" *
-- *******************************************************
PRODUCE 'kafka'
WITH Acks = 'all' -- acks
, ClientId = 'dajet' -- client.id
, MaxInFlight = 1 -- max.in.flight
, BootstrapServers = '192.168.237.77:9092' -- bootstrap.servers (csv)
, MessageTimeoutMs = 30000 -- message.timeout.ms
, EnableIdempotence = false -- enable.idempotence
SELECT Key = 'Справочник.Номенклатура' -- Ключ сообщения строкой
, Value = DaJet.Json(@message) -- Тело сообщения в формате JSON
, Topic = @Приёмник -- Топик Apache Kafka
3. Получение сообщений: топик Apache Kafka - регистр сведений
DECLARE @Источник string = 'test-topic'
DECLARE @Приёмник string = 'pgsql://postgres:postgres@127.0.0.1:5432/dajet-exchange'
DECLARE @message object -- Данные сообщения Apache Kafka
-- *******************************************************
-- * Источник сообщений Apache Kafka, топик "test-topic" *
-- *******************************************************
CONSUME 'kafka'
WITH Topic = @Источник -- Топик Apache Kafka
, GroupId = 'dajet' -- group.id
, ClientId = 'dajet' -- client.id
, BootstrapServers = '192.168.237.77:9092' -- bootstrap.servers (csv)
, EnableAutoCommit = false -- enable.auto.commit
, AutoOffsetReset = 'earliest' -- auto.offset.reset
, SessionTimeoutMs = 60000 -- session.timeout.ms
, HeartbeatIntervalMs = 20000 -- heartbeat.interval.ms
INTO @message
-- ************************************************************
-- * Приёмник сообщений, регистр сведений "ВходящиеСообщения" *
-- ************************************************************
USE '{@Приёмник}'
INSERT РегистрСведений.ВходящиеСообщения
SELECT НомерСообщения = VECTOR('so_incoming_queue')
, ОтметкаВремени = NOW()
, Отправитель = @message.Topic
, ТипСообщения = @message.Key
, ТелоСообщения = @message.Value