Основная статья про DaJet Stream
Видео: знакомство с платформой DaJet
Дистрибутив DaJet 3.3.3 на GitHub
Команды CONSUME и PRODUCE в языке запросов DaJet Stream предназначены для организации потокового обмена данными (сообщениями), в том числе, например, с такими брокерами сообщений, как RabbitMQ.
Команда CONSUME работает с брокерами сообщений в режиме PUSH от сервера, то есть постоянно находится в режиме ожидания входящих сообщений от брокера. Однако, следует иметь в виду, что при работе с таблицами-очередями баз данных (смотри основную статью про DaJet Stream) процессор скриптов DaJet Stream, обрабатывая команду CONSUME, реализует паттерн pooling publisher, который используется совместно с паттерном transactional outbox. Это означает, что DaJet Stream работает с базой данных в режиме PULL на клиенте, реализуя так называемое деструктивное чтение. То есть сообщения удаляются из таблицы-очереди после их отправки получателю в транзакции. Таким образом, можно сказать, что команда CONSUME выполняет своего рода роль координатора распределённой транзакции, обеспечивая требования ACID для каждого пакета данных.
В любом случае, как для брокеров сообщений, так и для баз данных, DaJet Stream гарантирует доставку сообщений на уровне at-least-once-in-order (минимум один раз строго по порядку). Возможную дупликацию сообщений DaJet Stream не контроллирует. Для этого следует использовать, либо логику приложения, либо возможности брокера.
Важно! Все команды CONSUME, как для баз данных, так и для брокеров сообщений, подразумевают "бесконечное" потребление потока данных с автоматическим восстановлением подключений к своим источникам в случае их разрыва.
Ниже приведённые скрипты DaJet Stream демонстрируют основные сценарии использования адаптера RabbitMQ при помощи команд CONSUME и PRODUCE, используя возможные, но не обязательные для всех случаев, варианты настройки. Для вашего конкретного сценария лучше всего предварительно изучить официальную документацию разработчиков RabbitMQ.
Важно! DaJet Stream ориентируется на использование текстовых сообщений в кодировке UTF-8.
Выполнение скрипта в среде Windows | Выполнение скрипта в среде Linux |
---|---|
(из корневого каталога установки) dajet stream --file ./stream/script.sql |
(из корневого каталога установки) dotnet ./dajet.dll stream --file ./stream/script.sql |
1. Простая отправка сообщения в RabbitMQ ("консольный" вариант)
-- ****************************************************************************
-- * Приёмник сообщений RabbitMQ, виртуальный хост dajet, топик test-exchange *
-- ****************************************************************************
PRODUCE 'amqp://guest:guest@localhost:5672/dajet'
WITH Exchange = 'test-exchange' -- Наименование топика
, RoutingKey = 'AAA' -- Ключ маршрутизации
, CarbonCopy = 'BBB,CCC,DDD' -- Дополнительные ключи маршрутизации (csv)
, MessageId = 'msg-no-1234' -- Идентификатор сообщения
SELECT AppId = 'test sender' -- Отправитель
, Type = 'Тестовый тип сообщения' -- Тип сообщения
, Body = 'Тестовое сообщение' -- Тело сообщения
, ContentType = 'text/plain'
Доступные свойства исходящего сообщения для RabbitMQ (команда PRODUCE)
Свойство | Тип данных | Описание |
---|---|---|
AppId | string | Наименование отправителя |
Exchange | string | Наименование топика |
RoutingKey | string |
Ключ маршрутизации, если свойство Exchange заполнено. В противном случае - наименование очереди для прямой отправки в очередь без маршрутизации. |
Mandatory | number |
Признак обязательности наличия очереди назначения. Подробнее лучше почитать в документации RabbitMQ. По умолчанию DaJet Stream не использует этот флаг. |
MessageId | string | Идентификатор сообщения |
BlindCopy | string (csv) | Дополнительные ключи маршрутизации. Значения ключей не доставляются получателю. |
CarbonCopy | string (csv) | Дополнительные ключи маршрутизации. Значения ключей доставляются получателю. Использовать не рекомендуется. |
Type | string | Тип сообщения |
Body | string | Тело сообщения. DaJet Stream ориентируется на текстовые сообщения в кодировке UTF-8. |
ContentType | string |
Тип содержимого тела сообщения. Значение по умолчанию: application/json |
ContentEncoding | string |
Формат (кодировка) тела сообщения. Значение по умолчанию: UTF-8 |
DeliveryMode | number |
Вид доставки: 1 - in-memory, 2 - persistent. Значение по умолчанию: 2 (сохранение на диск) |
Priority | number |
Приоритет доставки сообщения от 0 до 9. Значение по умолчанию: 0 |
ReplyTo | string | Адресат для обратной связи, определяемый логикой приложения. |
CorrelationId | string | Идентификатор корреляции сообщений между собой, определяемый логикой приложения. |
Expiration | string | Спецификация устаревания сообщения. Подробнее лучше почитать в документации RabbitMQ. По умолчанию не используется. |
2. Отправка сообщений: регистр сведений - RabbitMQ
DECLARE @message object -- Запись (сообщение) таблицы-очереди
DECLARE @empty_uuid uuid = "00000000-0000-0000-0000-000000000000"
-- ***********************************************************************
-- * Источник сообщений SQL Server - регистр сведений "ИсходящаяОчередь" *
-- ***********************************************************************
USE "mssql://zhichkin/dajet-exchange"
DECLARE @Отправитель string = SELECT Код
FROM ПланОбмена.ПланОбменаРИБ
WHERE Предопределённый <> @empty_uuid
CONSUME TOP 1000
НомерСообщения, ТипСообщения, ТелоСообщения
INTO @message
FROM РегистрСведений.ИсходящиеСообщения
ORDER BY НомерСообщения ASC
-- ****************************************************************************
-- * Приёмник сообщений RabbitMQ, виртуальный хост dajet, топик test-exchange *
-- ****************************************************************************
PRODUCE 'amqp://guest:guest@localhost:5672/dajet'
WITH AppId = @Отправитель
SELECT Type = @message.ТипСообщения
, Body = @message.ТелоСообщения
, Exchange = 'test-exchange'
, MessageId = @message.НомерСообщения
Доступные настройки для получения входящих сообщений от RabbitMQ (команда CONSUME)
Свойство | Тип данных | Описание |
---|---|---|
QueueName | string | Наименование очереди для получения сообщений |
Heartbeat | number | Период проверки в секундах наличия подключения к RabbitMQ. В случае его потери - автоматическое восстановление. Кроме этого данное значение используется для периодического вывода количества обработанных (успешно полученных) за это время сообщений в консоль или журнал DaJet Stream. |
PrefetchSize | number |
Размер клиентского буфера в байтах. Значение по умолчанию: неограниченно. Подробнее лучше почитать в документации RabbitMQ. |
PrefetchCount | number |
Количество сообщений, которые могут быть отправлены сервером без подтверждения. Значение по умолчанию: 1. Значение 1 выбрано DaJet Stream для обеспечения гарантий доставки at-least-once-in-order. Подробнее лучше почитать в документации RabbitMQ. |
3. Получение сообщений: RabbitMQ - регистр сведений
DECLARE @source string = 'guest:guest@localhost:5672/dajet'
DECLARE @target string = 'postgres:postgres@127.0.0.1:5432/dajet-demo-pg'
DECLARE @message object -- Данные входящего сообщения RabbitMQ
-- ***************************************************************************
-- * Источник сообщений RabbitMQ, виртуальный хост dajet, очередь test-queue *
-- ***************************************************************************
CONSUME 'amqp://{@source}'
WITH QueueName = 'test-queue', Heartbeat = 5
INTO @message
-- ************************************************************************
-- * Приёмник сообщений Postgre SQL - регистр сведений "ВходящиеСообщения" *
-- ************************************************************************
USE 'pgsql://{@target}'
INSERT РегистрСведений.ВходящиеСообщения
SELECT НомерСообщения = VECTOR('so_incoming_queue')
, ОтметкаВремени = NOW()
, Отправитель = @message.AppId
, ТипСообщения = @message.Type
, ТелоСообщения = @message.Body
, Получатели = @message.ContentType
4. DaJet Stream RabbitMQ Shovel
Ниже приводится пример скрипта получения сообщений из одной очереди RabbitMQ и их отправки в другую очередь или топик RabbitMQ средствами DaJet Stream. По сути своей это аналог плагина Shovel для RabbitMQ. При этом очереди могут находиться на разных серверах RabbitMQ - нужно только поменять соответствующие адреса подключений в командах CONSUME и PRODUCE скрипта DaJet Stream.
DECLARE @source string = 'guest:guest@localhost:5672/dajet'
DECLARE @target string = 'guest:guest@localhost:5672/dajet'
DECLARE @message object -- Объект сообщения RabbitMQ
-- ***************************************************************************
-- * Источник сообщений RabbitMQ, виртуальный хост dajet, очередь test-queue *
-- ***************************************************************************
CONSUME 'amqp://{@source}'
WITH QueueName = 'test-queue', Heartbeat = 5
INTO @message
-- ****************************************************************************
-- * Приёмник сообщений RabbitMQ, виртуальный хост dajet, очередь test-shovel *
-- ****************************************************************************
PRODUCE 'amqp://{@target}'
SELECT AppId = @message.AppId
, Type = @message.Type
, Body = @message.Body
, RoutingKey = 'test-shovel'
, MessageId = @message.MessageId