DaJet Stream: Apache Kafka

16.03.24

Интеграция - WEB-интеграция

Команды CONSUME и PRODUCE языка запросов DaJet Stream.

Основная статья про DaJet Stream

DaJet Stream: RabbitMQ

DaJet Stream: HTTP сервисы

Видео: основы DaJet Stream

Видео: знакомство с платформой DaJet

Дистрибутив DaJet 3.3.3 на GitHub

Команды CONSUME и PRODUCE в языке запросов DaJet Stream предназначены для организации потокового обмена данными (сообщениями), в том числе, например, с такими брокерами сообщений, как RabbitMQ.

Команда CONSUME работает с брокерами сообщений в режиме PUSH от сервера, то есть постоянно находится в режиме ожидания входящих сообщений от брокера. Однако, следует иметь в виду, что при работе с таблицами-очередями баз данных (смотри основную статью про DaJet Stream) процессор скриптов DaJet Stream, обрабатывая команду CONSUME, реализует паттерн pooling publisher, который используется совместно с паттерном transactional outbox. Это означает, что DaJet Stream работает с базой данных в режиме PULL на клиенте, реализуя так называемое деструктивное чтение. То есть сообщения удаляются из таблицы-очереди после их отправки получателю в транзакции. Таким образом, можно сказать, что команда CONSUME выполняет своего рода роль координатора распределённой транзакции, обеспечивая требования ACID для каждого пакета данных.

В любом случае, как для брокеров сообщений, так и для баз данных, DaJet Stream гарантирует доставку сообщений на уровне at-least-once-in-order (минимум один раз строго по порядку). Возможную дупликацию сообщений DaJet Stream не контроллирует. Для этого следует использовать, либо логику приложения, либо возможности брокера.

Важно! Все команды CONSUME, как для баз данных, так и для брокеров сообщений, подразумевают "бесконечное" потребление потока данных с автоматическим восстановлением подключений к своим источникам в случае их разрыва.

Ниже приведённые скрипты DaJet Stream демонстрируют основные сценарии использования адаптера Apache Kafka при помощи команд CONSUME и PRODUCE, используя возможные, но не обязательные для всех случаев, варианты настройки.

Для вашего конкретного сценария лучше всего предварительно изучить официальную документацию:

Важно! DaJet Stream ориентируется на использование текстовых сообщений в кодировке UTF-8.

Доступные свойства сообщения Apache Kafka

Свойство Тип данных Описание
Key string Ключ сообщения
Value string Тело сообщения
Topic string Наименование топика
Выполнение скрипта в среде Windows Выполнение скрипта в среде Linux

(из корневого каталога установки)

dajet stream --file ./stream/script.sql

(из корневого каталога установки)

dotnet ./dajet.dll stream --file ./stream/script.sql

 

1. Отправка сообщений: регистр сведений - топик Apache Kafka

DECLARE @message  object -- Запись (сообщение) таблицы-очереди
DECLARE @empty_uuid uuid = "00000000-0000-0000-0000-000000000000"

-- ***********************************************************************
-- * Источник сообщений SQL Server - регистр сведений "ИсходящаяОчередь" *
-- ***********************************************************************
USE "mssql://zhichkin/dajet-exchange"

DECLARE @Отправитель string = SELECT Код
                                FROM ПланОбмена.ПланОбменаРИБ
                               WHERE Предопределённый <> @empty_uuid

CONSUME TOP 1000
        НомерСообщения, ТипСообщения, ТелоСообщения
   INTO @message
   FROM РегистрСведений.ИсходящиеСообщения
  ORDER BY НомерСообщения ASC

-- *****************************************************
-- * Приёмник сообщений Apache Kafka, топик test-topic *
-- *****************************************************

PRODUCE 'kafka'
   WITH Acks             = 'all'                  -- acks
      , ClientId         = 'dajet'                -- client.id
      , MaxInFlight      = 1                      -- max.in.flight
      , BootstrapServers = '192.168.239.177:9092' -- bootstrap.servers (csv)
      , MessageTimeoutMs = 30000                  -- message.timeout.ms
      , EnableIdempotence = false                 -- enable.idempotence
 SELECT Key   = @Отправитель           -- Ключ сообщения
      , Value = @message.ТелоСообщения -- Тело сообщения
      , Topic = @message.ТипСообщения  -- Топик Kafka

2.  Отправка сообщений: план обмена - топик Apache Kafka

DECLARE @Источник string = 'mssql://zhichkin/dajet-exchange'
DECLARE @Приёмник string = 'test-topic' -- Топик Apache Kafka
DECLARE @changes  object -- Запись таблицы регистрации регистрации изменений
DECLARE @message  object -- Данные объекта базы данных для отправки

-- ********************************************************************************
-- * Источник сообщений, таблица регистрации изменений справочника "Номенклатура" *
-- ********************************************************************************
USE '{@Источник}'

DECLARE @Получатель entity = SELECT Ссылка
                               FROM ПланОбмена.ПланОбменаРИБ
                              WHERE Код = 'KAFKA'
                                AND ПометкаУдаления = false

CONSUME TOP 1000 Ссылка INTO @changes
   FROM Справочник.Номенклатура.Изменения
  WHERE УзелОбмена = @Получатель
  ORDER BY Ссылка ASC

SELECT Ссылка          = UUIDOF(Ссылка)
     , Код             = LTRIM(RTRIM(Данные.Код))
     , Наименование    = Данные.Наименование
     , ПометкаУдаления = Данные.ПометкаУдаления
  INTO @message
  FROM Справочник.Номенклатура AS Данные
APPEND (SELECT Период, Цена
          FROM РегистрСведений.ЦеныНоменклатуры
         WHERE Номенклатура = @changes.Ссылка
         ORDER BY Период ASC) AS Цены
 WHERE Данные.Ссылка = @changes.Ссылка

-- *******************************************************
-- * Приёмник сообщений Apache Kafka, топик "test-topic" *
-- *******************************************************
PRODUCE 'kafka'
   WITH Acks              = 'all'                 -- acks
      , ClientId          = 'dajet'               -- client.id
      , MaxInFlight       = 1                     -- max.in.flight
      , BootstrapServers  = '192.168.237.77:9092' -- bootstrap.servers (csv)
      , MessageTimeoutMs  = 30000                 -- message.timeout.ms
      , EnableIdempotence = false                 -- enable.idempotence
 SELECT Key   = 'Справочник.Номенклатура' -- Ключ сообщения строкой
      , Value = DaJet.Json(@message)      -- Тело сообщения в формате JSON
      , Topic = @Приёмник                 -- Топик Apache Kafka

3. Получение сообщений: топик Apache Kafka - регистр сведений

DECLARE @Источник string = 'test-topic'
DECLARE @Приёмник string = 'pgsql://postgres:postgres@127.0.0.1:5432/dajet-exchange'
DECLARE @message  object -- Данные сообщения Apache Kafka

-- *******************************************************
-- * Источник сообщений Apache Kafka, топик "test-topic" *
-- *******************************************************

CONSUME 'kafka'
   WITH Topic               = @Источник             -- Топик Apache Kafka
      , GroupId             = 'dajet'               -- group.id
      , ClientId            = 'dajet'               -- client.id
      , BootstrapServers    = '192.168.237.77:9092' -- bootstrap.servers (csv)
      , EnableAutoCommit    = false                 -- enable.auto.commit
      , AutoOffsetReset     = 'earliest'            -- auto.offset.reset
      , SessionTimeoutMs    = 60000                 -- session.timeout.ms
      , HeartbeatIntervalMs = 20000                 -- heartbeat.interval.ms
   INTO @message

-- ************************************************************
-- * Приёмник сообщений, регистр сведений "ВходящиеСообщения" *
-- ************************************************************
USE '{@Приёмник}'

INSERT РегистрСведений.ВходящиеСообщения
SELECT НомерСообщения = VECTOR('so_incoming_queue')
     , ОтметкаВремени = NOW()
     , Отправитель    = @message.Topic
     , ТипСообщения   = @message.Key
     , ТелоСообщения  = @message.Value

 

Вступайте в нашу телеграмм-группу Инфостарт

DaJet Stream обмен данными интеграция Apache Kafka

См. также

WEB-интеграция Загрузка и выгрузка в Excel Программист Пользователь 1С:Предприятие 8 1С:ERP Управление предприятием 2 1С:Управление торговлей 11 Розничная и сетевая торговля (FMCG) Россия Платные (руб)

Расширение освободит вас от необходимости вручную обновлять товары в группах ВКонтакте. Достаточно задать правила один раз, и система автоматически синхронизирует ваш каталог. Вы сможете легко выбирать, какие товары публиковать, создавая гибкие критерии отбора. Например, можно добавить важные для покупателей параметры: цвет, размер или другие характеристики.

12200 руб.

29.08.2025    1840    5    0    

6

WEB-интеграция Программист 1С:Предприятие 8 1С:Бухгалтерия 3.0 Бытовые услуги, сервис Платные (руб)

Расширение для автоматизации передачи данных между сервисом Vetmanager с 1С: Бухгалтерия 3.0. Решение позволяет загружать документы и справочники из Ветменеджер в 1С:Бухгалтерию, сокращая время на ручной ввод данных и минимизируя ошибки.

24000 руб.

02.02.2021    22128    65    52    

41

WEB-интеграция Анализ продаж Системный администратор Программист Пользователь 1С:Предприятие 8 1С:Розница 2 1С:Управление нашей фирмой 1.6 1С:ERP Управление предприятием 2 1С:Бухгалтерия 3.0 1С:Управление торговлей 11 1С:Комплексная автоматизация 2.х 1С:Управление нашей фирмой 3.0 1С:Розница 3.0 Управленческий учет Платные (руб)

Модуль "Подсистема интеграции AmoCRM с 1С" позволяет обеспечить единое информационное пространство, в котором пользователи могут эффективно управлять клиентской базой, следить за статусами сделок и поддерживать актуальность данных как в AmoCRM, так и в 1С.

60000 руб.

07.05.2019    41692    75    45    

31

WEB-интеграция Программист Бизнес-аналитик 1С:Предприятие 8 1С:ERP Управление предприятием 2 1С:Бухгалтерия 3.0 1С:Управление торговлей 11 1С:Комплексная автоматизация 2.х 1С:Управление нашей фирмой 3.0 1С:Розница 3.0 Оптовая торговля, дистрибуция, логистика ИТ-компания Платные (руб)

Модуль "Экспортер" — это расширение для 1С, предназначенное для автоматизации процессов выгрузки данных. Оно позволяет эффективно извлекать, преобразовывать и передавать данные из систем 1С в интеграционную платформу Spot2D. Подсистема упрощает настройку, снижает количество ручных операций и обеспечивает удобный контроль данных.

17568 руб.

20.12.2024    5504    23    4    

26

Обмен с ГосИС WEB-интеграция Бухгалтер Пользователь 1С:Предприятие 8 1С:Комплексная автоматизация 1.х 1С:Бухгалтерия 2.0 1С:Управление торговлей 10 1С:Управление производственным предприятием 1С:Управление нашей фирмой 1.6 1С:Бухгалтерия государственного учреждения 1С:Документооборот 1С:ERP Управление предприятием 2 1С:Бухгалтерия 3.0 1С:Управление торговлей 11 1С:Комплексная автоматизация 2.х Платные (руб)

Обработка является альтернативой механизму, разработанному фирмой 1С и заполняющему реквизиты контрагента по ИНН или наименованию. Не требуется действующей подписки ИТС. Вызывается как внешняя дополнительная обработка, т.е. используется, непосредственно, из карточки контрагента. Заполнение по ИНН или наименованию реквизитов контрагента по данным сайта ФНС (egrul.nalog.ru) для БП 2.0, БП 3.0, БГУ 1.0, БГУ 2.0, УТ 10.3, УТ 11.x, КА 1.1, КА 2.x, УПП 1.x, ERP 2.x, УНФ 1.5, УНФ 1.6, УНФ 3.0, ДО 2.1

5283 руб.

28.04.2016    100259    116    219    

364
Для отправки сообщения требуется регистрация/авторизация