DaJet Stream: RabbitMQ

17.03.24

Интеграция - WEB-интеграция

Команды CONSUME и PRODUCE языка запросов DaJet Stream

Основная статья про DaJet Stream

DaJet Stream: Apache Kafka

DaJet Stream: HTTP сервисы

Видео: основы DaJet Stream

Видео: знакомство с платформой DaJet

Дистрибутив DaJet 3.3.3 на GitHub

Команды CONSUME и PRODUCE в языке запросов DaJet Stream предназначены для организации потокового обмена данными (сообщениями), в том числе, например, с такими брокерами сообщений, как RabbitMQ.

Команда CONSUME работает с брокерами сообщений в режиме PUSH от сервера, то есть постоянно находится в режиме ожидания входящих сообщений от брокера. Однако, следует иметь в виду, что при работе с таблицами-очередями баз данных (смотри основную статью про DaJet Stream) процессор скриптов DaJet Stream, обрабатывая команду CONSUME, реализует паттерн pooling publisher, который используется совместно с паттерном transactional outbox. Это означает, что DaJet Stream работает с базой данных в режиме PULL на клиенте, реализуя так называемое деструктивное чтение. То есть сообщения удаляются из таблицы-очереди после их отправки получателю в транзакции. Таким образом, можно сказать, что команда CONSUME выполняет своего рода роль координатора распределённой транзакции, обеспечивая требования ACID для каждого пакета данных.

В любом случае, как для брокеров сообщений, так и для баз данных, DaJet Stream гарантирует доставку сообщений на уровне at-least-once-in-order (минимум один раз строго по порядку). Возможную дупликацию сообщений DaJet Stream не контроллирует. Для этого следует использовать, либо логику приложения, либо возможности брокера.

Важно! Все команды CONSUME, как для баз данных, так и для брокеров сообщений, подразумевают "бесконечное" потребление потока данных с автоматическим восстановлением подключений к своим источникам в случае их разрыва.

Ниже приведённые скрипты DaJet Stream демонстрируют основные сценарии использования адаптера RabbitMQ при помощи команд CONSUME и PRODUCE, используя возможные, но не обязательные для всех случаев, варианты настройки. Для вашего конкретного сценария лучше всего предварительно изучить официальную документацию разработчиков RabbitMQ.

Важно! DaJet Stream ориентируется на использование текстовых сообщений в кодировке UTF-8.

Выполнение скрипта в среде Windows Выполнение скрипта в среде Linux

(из корневого каталога установки)

dajet stream --file ./stream/script.sql

(из корневого каталога установки)

dotnet ./dajet.dll stream --file ./stream/script.sql

1. Простая отправка сообщения в RabbitMQ ("консольный" вариант)

-- ****************************************************************************
-- * Приёмник сообщений RabbitMQ, виртуальный хост dajet, топик test-exchange *
-- ****************************************************************************

PRODUCE 'amqp://guest:guest@localhost:5672/dajet'
   WITH Exchange   = 'test-exchange' -- Наименование топика
      , RoutingKey = 'AAA'           -- Ключ маршрутизации
      , CarbonCopy = 'BBB,CCC,DDD'   -- Дополнительные ключи маршрутизации (csv)
      , MessageId  = 'msg-no-1234'   -- Идентификатор сообщения
 SELECT AppId       = 'test sender'            -- Отправитель
      , Type        = 'Тестовый тип сообщения' -- Тип сообщения
      , Body        = 'Тестовое сообщение'     -- Тело сообщения
      , ContentType = 'text/plain'

Доступные свойства исходящего сообщения для RabbitMQ (команда PRODUCE)

Свойство Тип данных Описание
AppId string Наименование отправителя
Exchange string Наименование топика
RoutingKey string

Ключ маршрутизации, если свойство Exchange заполнено.

В противном случае - наименование очереди для прямой отправки в очередь без маршрутизации.

Mandatory number

Признак обязательности наличия очереди назначения. Подробнее лучше почитать в документации RabbitMQ.

По умолчанию DaJet Stream не использует этот флаг.

MessageId string Идентификатор сообщения
BlindCopy string (csv) Дополнительные ключи маршрутизации. Значения ключей не доставляются получателю.
CarbonCopy string (csv) Дополнительные ключи маршрутизации. Значения ключей доставляются получателю. Использовать не рекомендуется.
Type string Тип сообщения
Body string Тело сообщения. DaJet Stream ориентируется на текстовые сообщения в кодировке UTF-8.
ContentType string

Тип содержимого тела сообщения.

Значение по умолчанию: application/json

ContentEncoding string

Формат (кодировка) тела сообщения.

Значение по умолчанию: UTF-8

DeliveryMode number

Вид доставки: 1 - in-memory, 2 - persistent.

Значение по умолчанию: 2 (сохранение на диск)

Priority number

Приоритет доставки сообщения от 0 до 9.

Значение по умолчанию: 0

ReplyTo string Адресат для обратной связи, определяемый логикой приложения.
CorrelationId string Идентификатор корреляции сообщений между собой, определяемый логикой приложения.
Expiration string Спецификация устаревания сообщения. Подробнее лучше почитать в документации RabbitMQ. По умолчанию не используется.

2. Отправка сообщений: регистр сведений - RabbitMQ

DECLARE @message  object -- Запись (сообщение) таблицы-очереди
DECLARE @empty_uuid uuid = "00000000-0000-0000-0000-000000000000"

-- ***********************************************************************
-- * Источник сообщений SQL Server - регистр сведений "ИсходящаяОчередь" *
-- ***********************************************************************
USE "mssql://zhichkin/dajet-exchange"

DECLARE @Отправитель string = SELECT Код
                                FROM ПланОбмена.ПланОбменаРИБ
                               WHERE Предопределённый <> @empty_uuid

CONSUME TOP 1000
        НомерСообщения, ТипСообщения, ТелоСообщения
   INTO @message
   FROM РегистрСведений.ИсходящиеСообщения
  ORDER BY НомерСообщения ASC

-- ****************************************************************************
-- * Приёмник сообщений RabbitMQ, виртуальный хост dajet, топик test-exchange *
-- ****************************************************************************

PRODUCE 'amqp://guest:guest@localhost:5672/dajet'
   WITH AppId     = @Отправитель
 SELECT Type      = @message.ТипСообщения
      , Body      = @message.ТелоСообщения
      , Exchange  = 'test-exchange'
      , MessageId = @message.НомерСообщения

 

Доступные настройки для получения входящих сообщений от RabbitMQ (команда CONSUME)

Свойство Тип данных Описание
QueueName string Наименование очереди для получения сообщений
Heartbeat number Период проверки в секундах наличия подключения к RabbitMQ. В случае его потери - автоматическое восстановление. Кроме этого данное значение используется для периодического вывода количества обработанных (успешно полученных) за это время сообщений в консоль или журнал DaJet Stream.
PrefetchSize number

Размер клиентского буфера в байтах.

Значение по умолчанию: неограниченно.

Подробнее лучше почитать в документации RabbitMQ.

PrefetchCount number

Количество сообщений, которые могут быть отправлены сервером без подтверждения.

Значение по умолчанию: 1.

Значение 1 выбрано DaJet Stream для обеспечения гарантий доставки at-least-once-in-order.

Подробнее лучше почитать в документации RabbitMQ.

3. Получение сообщений: RabbitMQ - регистр сведений

DECLARE @source  string = 'guest:guest@localhost:5672/dajet'
DECLARE @target  string = 'postgres:postgres@127.0.0.1:5432/dajet-demo-pg'
DECLARE @message object -- Данные входящего сообщения RabbitMQ

-- ***************************************************************************
-- * Источник сообщений RabbitMQ, виртуальный хост dajet, очередь test-queue *
-- ***************************************************************************

CONSUME 'amqp://{@source}'
   WITH QueueName = 'test-queue', Heartbeat = 5
   INTO @message

-- ************************************************************************
-- * Приёмник сообщений Postgre SQL - регистр сведений "ВходящиеСообщения" *
-- ************************************************************************
USE 'pgsql://{@target}'

INSERT РегистрСведений.ВходящиеСообщения
SELECT НомерСообщения = VECTOR('so_incoming_queue')
     , ОтметкаВремени = NOW()
     , Отправитель    = @message.AppId
     , ТипСообщения   = @message.Type
     , ТелоСообщения  = @message.Body
     , Получатели     = @message.ContentType

 

4. DaJet Stream RabbitMQ Shovel

Ниже приводится пример скрипта получения сообщений из одной очереди RabbitMQ и их отправки в другую очередь или топик RabbitMQ средствами DaJet Stream. По сути своей это аналог плагина Shovel для RabbitMQ. При этом очереди могут находиться на разных серверах RabbitMQ - нужно только поменять соответствующие адреса подключений в командах CONSUME и PRODUCE скрипта DaJet Stream.

DECLARE @source  string = 'guest:guest@localhost:5672/dajet'
DECLARE @target  string = 'guest:guest@localhost:5672/dajet'
DECLARE @message object -- Объект сообщения RabbitMQ

-- ***************************************************************************
-- * Источник сообщений RabbitMQ, виртуальный хост dajet, очередь test-queue *
-- ***************************************************************************

CONSUME 'amqp://{@source}'
   WITH QueueName = 'test-queue', Heartbeat = 5
   INTO @message

-- ****************************************************************************
-- * Приёмник сообщений RabbitMQ, виртуальный хост dajet, очередь test-shovel *
-- ****************************************************************************
PRODUCE 'amqp://{@target}'
 SELECT AppId      = @message.AppId
      , Type       = @message.Type
      , Body       = @message.Body
      , RoutingKey = 'test-shovel'
      , MessageId  = @message.MessageId

 

Вступайте в нашу телеграмм-группу Инфостарт

DaJet Stream обмен данными интеграция RabbitMQ

См. также

Оптовая торговля Розничная торговля WEB-интеграция 1С:Управление торговлей 10 1С:Управление производственным предприятием 1С:Управление нашей фирмой 1.6 1С:Управление торговлей 11 1С:Комплексная автоматизация 2.х 1С:Управление нашей фирмой 3.0 Платные (руб)

Онлайн-заказ - это решение для автоматизации процесса оформления заказов на сайте в торговых организациях. Продукт обеспечивает легкое взаимодействие между компанией и клиентами через веб-интерфейс, интегрированный с 1С:Предприятие. Система позволяет снизить операционные расходы, повысить лояльность клиентов и оптимизировать работу отдела продаж.

57600 руб.

26.11.2024    5941    4    3    

7

WEB-интеграция Программист Бизнес-аналитик 1С v8.3 1С:ERP Управление предприятием 2 1С:Бухгалтерия 3.0 1С:Управление торговлей 11 1С:Комплексная автоматизация 2.х 1С:Управление нашей фирмой 3.0 1С:Розница 3.0 Оптовая торговля, дистрибуция, логистика ИТ-компания Платные (руб)

Модуль "Экспортер" — это расширение для 1С, предназначенное для автоматизации процессов выгрузки данных. Оно позволяет эффективно извлекать, преобразовывать и передавать данные из систем 1С в интеграционную платформу Spot2D. Подсистема упрощает настройку, снижает количество ручных операций и обеспечивает удобный контроль данных.

14400 руб.

20.12.2024    3313    17    2    

19

WEB-интеграция 1С v8.3 1С:ERP Управление предприятием 2 1С:Управление торговлей 11 1С:Комплексная автоматизация 2.х Оптовая торговля, дистрибуция, логистика Россия Платные (руб)

В расширении реализован механизм интеграции между системой поставщика и Личным кабинетом СДТ. Реализован обмен заказами и реализациями (накладными), предусмотрено отслеживание статусов документов. Расширение предназначено для 1С:УТ 11.4.

35856 руб.

27.11.2024    1776    1    0    

1

Обмен с ГосИС WEB-интеграция Бухгалтер Пользователь 1С v8.3 Управляемые формы 1С:Комплексная автоматизация 1.х 1С:Бухгалтерия 2.0 1С:Управление торговлей 10 1С:Управление производственным предприятием 1С:Управление нашей фирмой 1.6 1С:Бухгалтерия государственного учреждения 1С:Документооборот 1С:ERP Управление предприятием 2 1С:Бухгалтерия 3.0 1С:Управление торговлей 11 1С:Комплексная автоматизация 2.х Платные (руб)

Обработка является альтернативой механизму, разработанному фирмой 1С и заполняющему реквизиты контрагента по ИНН или наименованию. Не требуется действующей подписки ИТС. Вызывается как внешняя дополнительная обработка, т.е. используется, непосредственно, из карточки контрагента. Заполнение по ИНН или наименованию реквизитов контрагента по данным сайта ФНС (egrul.nalog.ru) для БП 2.0, БП 3.0, БГУ 1.0, БГУ 2.0, УТ 10.3, УТ 11.x, КА 1.1, КА 2.x, УПП 1.x, ERP 2.x, УНФ 1.5, УНФ 1.6, УНФ 3.0, ДО 2.1

5196 руб.

28.04.2016    97275    109    218    

359

Обмен с ГосИС Мастера заполнения WEB-интеграция Бухгалтер Пользователь 1С v8.3 Бухгалтерский учет Оперативный учет Управляемые формы 1С:Управление производственным предприятием 1С:ERP Управление предприятием 2 1С:Бухгалтерия 3.0 1С:Управление торговлей 11 1С:Комплексная автоматизация 2.х Россия Платные (руб)

Универсальное расширение конфигурации для автоматической загрузки и заполнения реквизитов контрагентов (партнеров) из ОГРН для 1С:ERP Управление предприятием 2 (1С:ERP Управление предприятием 2, редакция 2.4), 1С:ERP Управление предприятием 2 (1С:ERP Управление предприятием 2, редакция 2.2), 1С:Управление торговлей 8 (Управление торговлей, редакция 11.5), 1С:Управление торговлей 8 (Управление торговлей, редакция 11.4), 1С:Управление торговлей 8 (Управление торговлей, редакция 11.3), 1С:Управление торговлей 8 (Управление торговлей, редакция 11.2), 1С:Комплексная автоматизация 8 (1С:Комплексная автоматизация, редакция 2.4), 1С:Комплексная автоматизация 8 (1С:Комплексная автоматизация, редакция 2.2), 1С:Комплексная автоматизация 8 (1С:Комплексная автоматизация, редакция 2.0) и 1С:Бухгалтерия 8 (Бухгалтерия предприятия, редакция 3.0).

5000 руб.

08.11.2017    69357    415    298    

84

WEB-интеграция Программист 1С v8.3 Бухгалтерский учет 1С:Бухгалтерия 3.0 Бытовые услуги, сервис Платные (руб)

Внешняя обработка разработана для автоматизации передачи данных между сервисом Vetmanager с 1С: Бухгалтерия 3.0. Решение позволяет загружать документы и справочники из Ветменеджер в 1С:Бухгалтерию, сокращая время на ручной ввод данных и минимизируя ошибки.

12000 руб.

02.02.2021    20189    58    52    

36
Для отправки сообщения требуется регистрация/авторизация