Идея проекта DaJet Stream заключается в том, чтобы описать поток данных между узлами обмена при помощи скрипта на SQL-подобном языке запросов в терминах платформы 1С:Предприятие 8. Синтаксически и архитектурно DaJet Stream больше всего похож на Azure U-SQL.
Специализированный процессор скрипта организует его выполнение в виде высокопроизводительного конвейера обработки и передачи сообщений между узлами. Узлами обмена могут быть базы данных 1С, брокеры сообщений RabbitMQ или Apache Kafka, web api и так далее.
Добавлены адаптеры DaJet Stream для HTTP/S-сервисов, RabbitMQ и Apache Kafka.
DaJet Stream: HTTP сервисы
DaJet Stream: RabbitMQ
DaJet Stream: Apache Kafka
Пример вызова HTTP-сервиса
DECLARE @response object -- HTTP ответ на запрос
REQUEST 'http://localhost/1c/hs/test/query'
WITH User-Agent = 'DaJet Stream'
, Content-Type = 'text/plain; charset=utf-8'
SELECT Method = 'POST' -- HTTP метод запроса
, OnError = 'break' -- break или continue
, Content = 'Привет!' -- Тело HTTP запроса
INTO @response -- { "Code": "200", "Value": "text" }
Пример отправки сообщения в RabbitMQ
PRODUCE 'amqp://guest:guest@localhost:5672/dajet'
WITH Exchange = 'test-exchange' -- Наименование топика
, RoutingKey = 'AAA' -- Ключ маршрутизации
, CarbonCopy = 'BBB,CCC,DDD' -- Дополнительные ключи маршрутизации (csv)
, MessageId = 'msg-no-1234' -- Идентификатор сообщения
SELECT AppId = 'test sender' -- Отправитель
, Type = 'Тестовый тип сообщения' -- Тип сообщения
, Body = 'Тестовое сообщение' -- Тело сообщения
, ContentType = 'text/plain'
Код отправки для Apache Kafka аналогичен (только настройки и имена свойств разные).
Пример получения сообщения из Apache Kafka
DECLARE @Источник string = 'test-topic'
DECLARE @Приёмник string = 'pgsql://postgres:postgres@127.0.0.1:5432/dajet-exchange'
DECLARE @message object
-- *******************************************************
-- * Источник сообщений Apache Kafka, топик "test-topic" *
-- *******************************************************
CONSUME 'kafka'
WITH Topic = @Источник -- Топик Apache Kafka
, GroupId = 'dajet' -- group.id
, ClientId = 'dajet' -- client.id
, BootstrapServers = '192.168.237.77:9092' -- bootstrap.servers (csv)
, EnableAutoCommit = false -- enable.auto.commit
, AutoOffsetReset = 'earliest' -- auto.offset.reset
, SessionTimeoutMs = 60000 -- session.timeout.ms
, HeartbeatIntervalMs = 20000 -- heartbeat.interval.ms
INTO @message
-- ************************************************************
-- * Приёмник сообщений, регистр сведений "ВходящиеСообщения" *
-- ************************************************************
USE '{@Приёмник}'
INSERT РегистрСведений.ВходящиеСообщения
SELECT НомерСообщения = VECTOR('so_incoming_queue')
, ОтметкаВремени = NOW()
, Отправитель = @message.Topic
, ТипСообщения = @message.Key
, ТелоСообщения = @message.Value
Скрипт DaJet Stream обрабатывается и выполняется последовательно "сверху-вниз" и состоит из нескольких основных программных блоков:
0. Контекст выполнения блоков конвейера
Контекст выполнения блоков конвейера определяется командой USE языка запросов DaJet Stream. Параметром этой команды является строка подключения к базе данных или брокеру сообщений. Конвейер может использовать несколько контекстов, переключаясь между ними в процессе своего выполнения.
USE "pgsql://postgres:postgres@127.0.0.1:5432/pg-dajet-stream"
1. Инициализация потока данных (сообщений)
Командой инициализации потока данных может быть обычный SELECT, UPDATE или специальная команда деструктивного чтения CONSUME.
Выполнение команды последовательно изменяет значение переменной контекста выполнения, которая является доступной всем блокам конвейера для обработки. То есть, другими словами, каждая запись выборки из базы данных превращается в сообщение (объект), структура которого определяется и выводится процессором скрипта из описания команды. Далее это сообщение передаётся по конвейеру "сверху-вниз".
Аналогом типа данных такой переменной в 1С является "Структура".
1. Пример SELECT
DECLARE @message object -- Переменная для организации потока
SELECT ТипСообщения, ТелоСообщения -- Структура и значения объекта потока
INTO @message -- Изменение переменной потока
FROM РегистрСведений.ИсходящаяОчередь -- Источник данных для потока
WHERE SUBSTRING(ТипСообщения, 1, 10) = "Справочник" -- Условия отбора из источника данных
ORDER BY НомерСообщения ASC -- Порядок последовательной обработки сообщений
2. Пример UPDATE
DECLARE @message object -- Переменная для организации потока
UPDATE РегистрСведений.ИсходящаяОчередь -- Источник данных для потока
WHERE Статус = Перечисление.СтатусСообщения.Создано -- Отбор по статусу сообщения
SET Статус = Перечисление.СтатусСообщения.Отправлено -- Меняем статус сообщения
, ДатаВремя = NOW() -- Фиксируем время отправки сообщения
OUTPUT ТипСообщения, ТелоСообщения -- Структура и значения объекта потока
INTO @message -- Изменение переменной потока
3. Пример CONSUME
DECLARE @message object -- Переменная для организации потока
CONSUME TOP 1000 -- Размер одного пакета обработки в транзакции
ТипСообщения, ТелоСообщения -- Структура и значения объекта потока
INTO @message -- Изменение переменной потока
FROM РегистрСведений.ИсходящаяОчередь -- Источник данных для потока
WHERE ТипСообщения = "Справочник.Номенклатура" -- Условия отбора из источника данных
ORDER BY НомерСообщения ASC -- Порядок последовательной обработки сообщений
2. Настройка пакетного многопоточного выполнения
Пакетная обработка потока данных выполняется при помощи наборов параметров, которые используются для выборки данных при помощи, например, команды SELECT. Наборы параметров создаются при помощи команды SELECT и помещаются в переменную-итератор (массив структур). Аналогом типа данных такой переменной в 1С является "Массив".
Далее выполняется циклическая обработка каждого пакета. При этом имеется возможность организовать многопоточную обработку этих пакетов при помощи специальной настройки MAXDOP (maximum degree of parallelism).
Пример многопоточного скрипта DaJet Stream
Выполняется обработка записей справочника "Номенклатура". Записи разделяются на четыре пакета с отбором по ставке НДС. Все пакеты записей обрабатываются по отдельности многопоточно. Максимально возможное количество потоков выполнения = 2 (два). При этом процессор выполнения скрипта DaJet Stream сам регулирует загрузку потоков операционной системы наиболее эффективным способом.
DECLARE @message object -- Переменная для организации потока
DECLARE @batch object -- Значения параметров для одного пакета обработки данных
DECLARE @iterator array -- Массив наборов (структур) параметров (итератор)
-- Выборка или формирование значений параметров для пакетов обработки данных
SELECT VAT = Перечисление.СтавкиНДС.НДС0 INTO @iterator -- Пакет 1
UNION ALL SELECT VAT = Перечисление.СтавкиНДС.НДС10 -- Пакет 2
UNION ALL SELECT VAT = Перечисление.СтавкиНДС.НДС18 -- Пакет 3
UNION ALL SELECT VAT = Перечисление.СтавкиНДС.БезНДС -- Пакет 4
FOR EACH @batch IN @iterator MAXDOP 2 -- Команда параллельного выполнения пакетов в два потока ОС
-- Далее выполняется многопоточная обработка: 1 поток = 1 копия конвейера
SELECT Ссылка = UUIDOF(Ссылка) -- Структура и значения объекта потока
, Код = RTRIM(LTRIM(Код)) -- Структура и значения объекта потока
, Наименование = RTRIM(LTRIM(Наименование)) -- Структура и значения объекта потока
INTO @message -- Изменение переменной объекта потока
FROM Справочник.Номенклатура -- Источник данных для потока
WHERE СтавкаНДС = @batch.VAT -- Формирование пакета согласно значениям параметров
Аналогичный код на 1С (код запроса к справочнику "Номенклатура" опущен для краткости)
МассивПараметров = Новый Массив();
СтруктураПараметров1 = Новый Структура();
СтруктураПараметров1.Вставить("СтавкаНДС", Перечисления.СтавкиНДС.НДС0);
СтруктураПараметров2 = Новый Структура();
СтруктураПараметров2.Вставить("СтавкаНДС", Перечисления.СтавкиНДС.НДС10);
СтруктураПараметров3 = Новый Структура();
СтруктураПараметров3.Вставить("СтавкаНДС", Перечисления.СтавкиНДС.НДС18);
СтруктураПараметров4 = Новый Структура();
СтруктураПараметров4.Вставить("СтавкаНДС", Перечисления.СтавкиНДС.БезНДС);
МассивПараметров.Добавить(СтруктураПараметров1);
МассивПараметров.Добавить(СтруктураПараметров2);
МассивПараметров.Добавить(СтруктураПараметров3);
МассивПараметров.Добавить(СтруктураПараметров4);
Для Каждого СтруктураПараметров Из МассивПараметров Цикл
ВыполнитьФоновоеЗадание(СтруктураПараметров);
КонецЦикла;
3. Маршрутизация потока (правила регистрации объектов)
Правила регистрации объектов конвертации данных являются по сути своей функцией, которая по объекту-источнику получает список узлов получателей. Такую функцию можно выразить в виде запроса SELECT.
Например, нижеследующий код получает все регистрации изменений справочника "Номенклатура" по всем планам обмена и при этом все товары без НДС маршрутизируются на узел "N001", а все остальные - на узел "N002". Результат сохраняется в переменную контекста выполнения конвейера @routes, которую можно затем использовать так, как это было описано ранее в пункте "Настройка пакетного многопоточного выполнения" выше.
DECLARE @routes array -- Массив узлов получателей - результат маршрутизации
SELECT DISTINCT
Ссылка = Изменения.Ссылка -- Ключ объета метаданных
, Получатель = CASE WHEN Данные.СтавкаНДС = Перечисление.СтавкиНДС.БезНДС -- Условие маршрутизации
THEN "N001" -- Код узла получателя товаров без НДС
ELSE "N002" -- Код узла получателя товаров с НДС
END
INTO @routes
FROM Справочник.Номенклатура.Изменения AS Изменения
INNER JOIN Справочник.Номенклатура AS Данные
ON Изменения.Ссылка = Данные.Ссылка
P.S. Это всего лишь только один из примеров возможного кода =)
4. Трансформация данных (правила конвертации объектов)
Правила конвертации объектов являются по сути своей функцией преобразования (проекции) значений базы данных в объект, имеющий набор некоторых свойств или даже граф таких объектов. Для формирования произвольной структуры объекта данных можно использовать команду SELECT, а также специализированный оператор обогащения APPEND такой структуры (объекта) табличными частями или другими объектами.
Например, следующий ниже код формирует структуру данных соответствующую вот такому JSON:
{
"Ссылка": "8d400f9d-935c-8ecc-11ee-c2db228ea72a",
"Код": "MS-01",
"Наименование": "Товар 01",
"Цены":
[
{
"Период": "2023-01-01T00:00:00", "Цена": 100.00
},
{
"Период": "2023-01-02T00:00:00", "Цена": 123.00
},
{
"Период": "2023-01-03T00:00:00", "Цена": 321.00
}
]
}
Cоответствующий код DaJet Stream
DECLARE @message object
SELECT Ссылка = UUIDOF(Ссылка)
, Код = RTRIM(LTRIM(Код))
, Наименование = SUBSTRING(Наименование, 1, 10)
INTO @message
FROM Справочник.Номенклатура
APPEND (SELECT Период, Цена
FROM РегистрСведений.ЦеныНоменклатуры
WHERE Номенклатура = @message.Ссылка
ORDER BY Период ASC) AS Цены
Здесь следует обратить внимание на использование переменной @message и обращение к её свойству "Ссылка" во вложенном запросе оператора APPEND (предложение WHERE). Результат выполнения оператора APPEND в данном случае добавляется к структуре данных @message в виде табличной части, однако данные взяты из регистра сведений "ЦеныНоменклатуры".
Формирование JSON из данных переменной @message выполняется специализированной функцией DaJet Stream. Пример такого преобразования приведён в разделе "Запись сообщения во входящую очередь приёмника данных" ниже.
5. Запись сообщения во входящую очередь приёмника данных
Сформированное ранее по конвейеру сообщение данных сохраняется в потоковой переменной контекста выполнения DaJet Stream. В предыдущих примерах такой переменной была @message. Когда мы готовы записать сообщение во входящую очередь приёмника данных нам, прежде всего , нужно переключить контекст при помощи команды USE, и только затем выполнить саму запись. В коде ниже для этого используется команда INSERT. Запись выполняется в регистр сведений базы данных приёмника.
-- Переключение контекста приёмника данных
USE "pgsql://postgres:postgres@127.0.0.1:5432/pg-dajet-stream"
INSERT РегистрСведений.ВходящиеСообщения -- Очередь входящих сообщений
SELECT НомерСообщения = (SELECT ISNULL(MAX(НомерСообщения), 0.0) + 1.0 -- Формирование очередного
FROM РегистрСведений.ВходящиеСообщения) -- номера входящего сообщения
, Отправитель = "MAIN" -- Код узла отправителя
, ТипСообщения = "Справочник.Номенклатура" -- Тип сообщения
, ТелоСообщения = DaJet.Json(@message) -- Сериализация JSON
, ОтметкаВремени = NOW() -- Текущая дата и время
Сравнительный тест производительности
Изначально тестирование проводилось в рамках подготовки к докладу по технологии DaJet Exchange для ежегодной конференции Infostart Tech Event 2023. К сожалению, этот доклад не прошёл отбор. Однако, мне стало интересно сравнить как DaJet Stream покажет себя в тех же самых тестах. Речь идёт о выгрузке объектов из таблицы регистрации изменений плана обмена.
Подробное описание теста
DaJet Stream показал очень похожие на DaJet Exchange результаты, и даже оказался немного быстрее. В приницпе это можно объяснить тем, что "под капотом" у двух этих проектов примерно одни и те же технологии, а именно подсистема DaJet Flow для построения конвейеров потоковой обработки и обмена данными.
Результаты тестирования DaJet Stream
Для выполнения теста использовался следующий скрипт (однопоточный):
-- ********************************************************************************************
-- * Источник сообщений SQL Server - таблица регистрации изменений справочника "Номенклатура" *
-- ********************************************************************************************
USE "mssql://zhichkin/dajet-exchange"
DECLARE @empty_uuid uuid = "00000000-0000-0000-0000-000000000000"
DECLARE @ЭтотУзел string = SELECT Код
FROM ПланОбмена.ПланОбменаРИБ
WHERE Предопределённый <> @empty_uuid
DECLARE @Получатель entity = SELECT Ссылка
FROM ПланОбмена.ПланОбменаРИБ
WHERE Код = "РИБ-0001"
AND ПометкаУдаления = false
DECLARE @message object
CONSUME TOP 1000 Ссылка, УзелОбмена
INTO @message
FROM Справочник.Номенклатура.Изменения
WHERE УзелОбмена = @Получатель
ORDER BY Ссылка ASC
SELECT Ссылка = UUIDOF(Изменения.Ссылка)
, Код = ISNULL(Данные.Код, "deleted")
, Наименование = ISNULL(Данные.Наименование, "")
, ПометкаУдаления = ISNULL(Данные.ПометкаУдаления, true)
INTO @message
FROM (SELECT Ссылка = @message.Ссылка) AS Изменения
LEFT JOIN Справочник.Номенклатура AS Данные
ON Изменения.Ссылка = Данные.Ссылка
-- ************************************************************************
-- * Приёмник сообщений PostgreSQL - регистр сведений "ВходящиеСообщения" *
-- ************************************************************************
USE "pgsql://postgres:postgres@127.0.0.1:5432/dajet-exchange"
INSERT РегистрСведений.ВходящиеСообщения
SELECT НомерСообщения = VECTOR('so_incoming_queue')
, Отправитель = @ЭтотУзел
, ТипСообщения = "Справочник.Номенклатура"
, ТелоСообщения = DaJet.Json(@message)
, ОтметкаВремени = NOW()
Результат выполнения: приблизительно 7 секунд.
Для сравнения:
1. БСП + КД-2 (подсистема "Обмен данными") = 56 секунд.
2. Обработка "УниверсальныйОбменДаннымиXML" + КД-2 по узлу РИБ = 53 секунды.
3. Выгрузка средствами РИБ без КД-2 = 26 секунд.
Для выполнения теста использовался следующий скрипт (многопоточный):
-- ********************************************************************************************
-- * Источник сообщений SQL Server - таблица регистрации изменений справочника "Номенклатура" *
-- ********************************************************************************************
USE "mssql://zhichkin/dajet-exchange"
DECLARE @empty_uuid uuid = "00000000-0000-0000-0000-000000000000"
DECLARE @ЭтотУзел string = SELECT Код
FROM ПланОбмена.ПланОбменаРИБ
WHERE Предопределённый <> @empty_uuid
DECLARE @Получатель entity = SELECT Ссылка
FROM ПланОбмена.ПланОбменаРИБ
WHERE Код = "РИБ-0001"
AND ПометкаУдаления = false
DECLARE @message object
DECLARE @batch object
DECLARE @iterator array
SELECT Size = 2500, Name = CONCAT(@ЭтотУзел, " : Поток 1") INTO @iterator
UNION ALL SELECT Size = 2500, Name = CONCAT(@ЭтотУзел, " : Поток 2")
UNION ALL SELECT Size = 2500, Name = CONCAT(@ЭтотУзел, " : Поток 3")
UNION ALL SELECT Size = 2500, Name = CONCAT(@ЭтотУзел, " : Поток 4")
FOR EACH @batch IN @iterator MAXDOP 4
CONSUME TOP (@batch.Size) Ссылка, УзелОбмена
INTO @message
FROM Справочник.Номенклатура.Изменения
WHERE УзелОбмена = @Получатель
ORDER BY Ссылка ASC
SELECT Ссылка = UUIDOF(Изменения.Ссылка)
, Код = ISNULL(Данные.Код, "deleted")
, Наименование = ISNULL(Данные.Наименование, "")
, ПометкаУдаления = ISNULL(Данные.ПометкаУдаления, true)
INTO @message
FROM (SELECT Ссылка = @message.Ссылка) AS Изменения
LEFT JOIN Справочник.Номенклатура AS Данные
ON Изменения.Ссылка = Данные.Ссылка
-- ************************************************************************
-- * Приёмник сообщений PostgreSQL - регистр сведений "ВходящиеСообщения" *
-- ************************************************************************
USE "pgsql://postgres:postgres@127.0.0.1:5432/dajet-exchange"
INSERT РегистрСведений.ВходящиеСообщения
SELECT НомерСообщения = VECTOR('so_incoming_queue')
, Отправитель = @batch.Name
, ТипСообщения = "Справочник.Номенклатура"
, ТелоСообщения = DaJet.Json(@message)
, ОтметкаВремени = NOW()
Результат выполнения: чуть больше 2-х секунд.
Примеры возможных скриптов DaJet Stream
1. Самый простой пример (исходящая очередь - входящая очередь)
-- ***********************************************************************
-- * Источник сообщений SQL Server - регистр сведений "ИсходящаяОчередь" *
-- ***********************************************************************
USE "mssql://sql-server/ms-dajet-stream"
DECLARE @empty_uuid uuid = "00000000-0000-0000-0000-000000000000"
DECLARE @ЭтотУзел string = SELECT Код
FROM ПланОбмена.ПланОбменаДанными
WHERE Предопределённый <> @empty_uuid
DECLARE @message object
SELECT ТипСообщения, ТелоСообщения INTO @message
FROM РегистрСведений.ИсходящаяОчередь
ORDER BY НомерСообщения ASC
-- ************************************************************************
-- * Приёмник сообщений PostgreSQL - регистр сведений "ВходящиеСообщения" *
-- ************************************************************************
USE "pgsql://postgres:postgres@127.0.0.1:5432/pg-dajet-stream"
INSERT РегистрСведений.ВходящиеСообщения
SELECT НомерСообщения = VECTOR('so_incoming_queue')
, Отправитель = @ЭтотУзел
, ТипСообщения = @message.ТипСообщения
, ТелоСообщения = @message.ТелоСообщения
, ОтметкаВремени = NOW()
2. Сложный пример выгрузки из плана обмена в регистр сведений
-- ********************************************************************************************
-- * Источник сообщений SQL Server - таблица регистрации изменений справочника "Номенклатура" *
-- ********************************************************************************************
USE "mssql://sql-server/ms-dajet-stream"
DECLARE @empty_uuid uuid = "00000000-0000-0000-0000-000000000000"
DECLARE @ЭтотУзел string = SELECT Код
FROM ПланОбмена.ПланОбменаДанными
WHERE Предопределённый <> @empty_uuid
DECLARE @Получатель entity = SELECT Ссылка
FROM ПланОбмена.ПланОбменаДанными
WHERE Код = "N001"
AND ПометкаУдаления = false
DECLARE @message object
CONSUME TOP 2 Ссылка, УзелОбмена
INTO @message
FROM Справочник.Номенклатура.Изменения
WHERE УзелОбмена = @Получатель
SELECT ref = UUIDOF(Изменения.Ссылка)
, code = ISNULL(Данные.Код, "deleted")
, name = ISNULL(Данные.Наименование, "")
INTO @message
FROM (SELECT Ссылка = @message.Ссылка) AS Изменения
LEFT JOIN Справочник.Номенклатура AS Данные
ON Изменения.Ссылка = Данные.Ссылка
-- ************************************************************************
-- * Приёмник сообщений PostgreSQL - регистр сведений "ВходящиеСообщения" *
-- ************************************************************************
USE "pgsql://postgres:postgres@127.0.0.1:5432/pg-dajet-stream"
INSERT РегистрСведений.ВходящиеСообщения
SELECT НомерСообщения = VECTOR('so_incoming_queue')
, Отправитель = @ЭтотУзел
, ТипСообщения = "Справочник.Номенклатура"
, ТелоСообщения = DaJet.Json(@message)
, ОтметкаВремени = NOW()
С целью более предметного обсуждения состоятельности концепции DaJet Stream разработан минимально функциональный прототип утилиты для выполнения скриптов. Подробное описание прототипа и его функциональных возможностей рассмотрено в видео-презентации, где представлено ещё большее количество примеров и различных типовых сценариев использования.
Установка утилиты исполнения скриптов DaJet Stream:
1. Установить платформу Microsoft .NET 7.0
2. Скачать дистрибутив платформы DaJet 3.3.3
3. Распаковать архив дистрибутива в любой каталог.
4. Настроить демонстрационный скрипт под свой контекст.
5. Перейти в каталог установки платформы DaJet
6. Выполнить скрипт при помощи утилиты dajet
Значением параметра --file утилиты dajet (смотри ниже) является путь к соответствующему файлу исполняемого скрипта DaJet Stream.
Поставка платформы DaJet включает в себя набор демонстрационных скриптов. Они располагаются в корневом каталоге установки в папке "stream".
Выполнение скрипта DaJet Stream в среде Windows
(из корневого каталога установки)
dajet stream --file ./stream/script.sql
Выполнение скрипта DaJet Stream в среде Linux
(из корневого каталога установки)
dotnet ./dajet.dll stream --file ./stream/script.sql
Планы развития DaJet Stream
Управление скриптами трансформации/маршрутизации
Для условного динамического выбора, например, скрипта трансформации данных, конвертации сообщения в JSON, предполагается использовать аналогичную описанной выше методику шаблонизации адреса скрипта и динамическое его подключение в процессе выполнения конвейера обмена данными.
При этом предполагается, что такие скрипты хранятся в отдельных файлах, например, по одному файлу для каждого объекта метаданных. Дополнительным преимуществом такого подхода является возможность подключения к каталогу размещения скриптов DaJet Stream такой системы контроля версий как, например, git.
Для включения внешнего скрипта в уже существующий предполагается разработать специальную команду INLINE. Её использование могло бы выглядеть нижеследующим образом.
DECLARE @Настройки object
SELECT TOP 1 ТипОбъекта
INTO @Настройки
FROM РегистрСведений.НастройкиОбменаДанными
INLINE "file://database/ПланОбменаДанными/{@Настройки.ТипОбъекта}"
В данном конкретном случае подстановка значения свойства "ТипОбъекта" переменной @Настройки выполняется для шаблона {@route.КодУзла}.
Дополнительные материалы:
1. Видео-презентация DaJet Stream
2. Видео-презентация платформы DaJet