Подсистема поверх Внешняя компонента 1с (native) для взаимодействия с Apache Kafka.
Принцип работы
Узлом сервисной шины является кластер серверов Kafka. Для него указываются bootstrap сервера, для подключения и имя группы получателей, для возможности чтения сообщений из узла. В состав узла добавляются метаданные для выгрузки/загрузки изменений, указываются топики в которые будут отправляться или читаться изменения и указывается направление обмена входящее или исходящее. Изменения объектов регистрируются в узлы плана обмена сервисной шины, упаковываются в сообщения и отправляются в брокер. Изменения из шины получаются путем подписки на топики узла с последующим разбором полученных из брокера сообщений. Для отправки и получения сообщений из шины используются специализированные объекты внешней компоненты - Отправитель и Получатель соответственно.
Как с этим работать
Для добавления обработки метаданных сервисной шиной требуется описать соответствующие методы сбора/разбора сообщений. Методы описываются в модуле менеджера метаданных.
Без описанных методов отправка и разбор полученных будет по умолчанию записывать через сериализатор XDTO и через него же и читать.
Сообщения на отправку можно собирать менеджером как по одному. Узел сам получает каждый изменённый объект, передает его менеджеру вместе с сообщением и менеджер наполняет сообщение о изменении на основании переданного объекта.
СообщениеДляОтправкиВУзел(Сообщение, ОбъектИзменений, Узел = Неопределено)
// Процедура - Сообщение для отправки в узел
//
// Параметры:
// Сообщение - Структура - Сообщение для отправки в сервисную шину предприятия см. СервиснаяШинаПредприятияКлиентСервер.Сообщение()
// ОбъектИзменений - СправочникОбъект - Объект справочника изменения которого отправляются
// Узел - ПланОбменаОбъект - Узел для которого формируется сообщение
//
Процедура СообщениеДляОтправкиВУзел(Сообщение, ОбъектИзменений, Узел = Неопределено) Экспорт
Сообщение.Key = XMLСтрока(ОбъектИзменений.Ссылка);
Если ТипЗнч(ОбъектИзменений) = Тип("УдалениеОбъекта") Тогда Возврат; КонецЕсли;
СтруктураОбмена = СтруктураОбмена();
ОписаниеЗаполнения = ?(ОбъектИзменений.ЭтоГруппа, ОписаниеЗаполненияГруппы(), ОписаниеЗаполнения());
УправлениеМастерДаннымиОбменДанными.ЗаполнитьСтруктуруОбменаИзОбъекта(СтруктураОбмена, ОбъектИзменений, ОписаниеЗаполнения);
ЗаписьJSON = Новый ЗаписьJSON;
ЗаписьJSON.УстановитьСтроку();
ЗаписатьJSON(ЗаписьJSON, СтруктураОбмена);
Сообщение.Value = ЗаписьJSON.Закрыть();
КонецПроцедуры
Так и возвратом множества сообщений. В этом случае вся ответственность на получении изменений из узла лежит на менеджере метаданных объекта, узел ожидает от него массива собранных сообщений.
ПолучитьСообщенияДляСервиснойШиныПредприятия(ПараметрыУзла, ПараметрыТопика, НомерСообщенияОбмена)
// Отправка сообщений
Функция ПолучитьСообщенияДляСервиснойШиныПредприятия(ПараметрыУзла, ПараметрыТопика, НомерСообщенияОбмена) Экспорт
СервиснаяШинаПредприятия.ПолучитьПорциюНаОтправку(ПараметрыУзла.Ссылка, "Справочник.Номенклатура", НомерСообщенияОбмена);
Сообщения = Новый Массив;
ДанныеИзменений = СобратьДанныеПоИзменениям(Узел, НомерСообщенияОбмена);
Для Каждого ОбъектИзменений Из ДанныеИзменений Цикл
Сообщение = СервиснаяШинаПредприятияКлиентСервер.Сообщение();
Сообщение.Key = XMLСтрока(ОбъектИзменений.Ссылка);
Сообщения.Добавить(Сообщение);
Если ОбъектИзменений.УдалениеОбъекта_ Тогда Продолжить; КонецЕсли;
СтруктураОбмена = ?(ОбъектИзменений.ЭтоПапка, СтруктураОбменаПапка(), СтруктураОбмена());
УправлениеМастерДаннымиОбменДанными.ЗаполнитьСтруктуруОбменаИзОбъекта(СтруктураОбмена, ОбъектИзменений, ОписаниеЗаполнения());
ЗаписьJSON = Новый ЗаписьJSON;
ЗаписьJSON.УстановитьСтроку(Новый ПараметрыЗаписиJSON(ПереносСтрокJSON.Нет));
ЗаписатьJSON(ЗаписьJSON, СтруктураОбмена);
Сообщение.Value = ЗаписьJSON.Закрыть();
Сообщение.Headers.Добавить(Новый Структура("AvailableCountries", СтруктураОбмена["AvailableCountries"]));
КонецЦикла;
Возврат Сообщения;
КонецФункции
Аналогично и для разбора сообщений. Они либо по одному разбираются:
РазобратьВходящееСообщение(Сообщение, Узел = Неопределено)
// Функция - Разобрать входящее сообщение
//
// Параметры:
// Сообщение - Структура - Входящее сообщение из сервисной шины предприятия (Apache Kafka)
// * Headers - Массив из Структур:
// ** Ключ - Строка - Ключ заголовка
// ** Значение - Произвольный - Значение заголовка
// * Key - Строка - Ключ сообщения
// * Offset - Число - Смещение сообщения
// * Partition - Число - Номер раздела
// * Timestamp - Число - Отпечаток времени
// * Topic - Строка - Имя топика
// * Value - Строка - Тело сообщения
//
// Возвращаемое значение:
// Булево - Истина при успешном результате разбора сообщения
//
Функция РазобратьВходящееСообщение(Сообщение, Узел = Неопределено) Экспорт
Если НРег(Сообщение.Value) = "null" Тогда Возврат Истина; КонецЕсли;
ЧтениеJSON = Новый ЧтениеJSON;
ЧтениеJSON.УстановитьСтроку(Сообщение.Value);
СтруктураОбмена = ПрочитатьJSON(ЧтениеJSON, Истина);
СсылкаИзменяемого = ПолучитьСсылку(Новый УникальныйИдентификатор(Сообщение.Key));
ИзменяемыйОбъект = СсылкаИзменяемого.ПолучитьОбъект();
Если ИзменяемыйОбъект = Неопределено Тогда
ИзменяемыйОбъект = ?(СтруктураОбмена["IsFolder"] = Истина, СоздатьГруппу(), СоздатьЭлемент());
КонецЕсли;
ОписаниеЗаполнения = ?(ИзменяемыйОбъект.ЭтоГруппа, ОписаниеЗаполненияГруппы(), ОписаниеЗаполнения());
УправлениеМастерДаннымиОбменДанными.ЗаполнитьОбъектИзСтруктурыОбмена(ИзменяемыйОбъект, СтруктураОбмена, ОписаниеЗаполнения);
Если ЗначениеЗаполнено(Узел) Тогда
Попытка
ИзменяемыйОбъект.ОбменДанными.Отправитель = Узел;
Исключение
КонецПопытки;
КонецЕсли;
ИзменяемыйОбъект.ОбменДанными.Загрузка = Истина;
ИзменяемыйОбъект.Записать();
Возврат Истина;
КонецФункции
Либо сразу вся полученная порция передается менеджеру для разбора:
РазобратьВходящиеСообщенияСервиснойШины(Сообщения, Узел)
//----- Разбор
Функция РазобратьВходящиеСообщенияСервиснойШины(Сообщения, Узел) Экспорт
ТаблицыВходныхДанных = ТаблицыВходныхДанных();
ОсновныеДанные = ТаблицыВходныхДанных.ОсновныеДанные;
Для Каждого Сообщение Из Сообщения Цикл
// TODO требуется определить поведение: null значения - УдалениеОбъекта, при удалении объекта в базе источнике изменений
Если НРег(Сообщение.Value) = "null" Тогда Продолжить; КонецЕсли;
Для Каждого Заголовок Из Сообщение.Headers Цикл
КлючЗаголовка = ОбщегоНазначенияКлиентСервер.СвойствоСтруктуры(Заголовок, "Key");
Если КлючЗаголовка = "DataCountry" Тогда
КодСтраныДанных = ОбщегоНазначенияКлиентСервер.СвойствоСтруктуры(Заголовок, "Value");
КонецЕсли;
КонецЦикла;
Если ЗначениеЗаполнено(КодСтраныДанных) Тогда
СтранаДанных = Справочники.КлассификаторСтранМира.НайтиПоРеквизиту("КодАльфа2", КодСтраныДанных);
КонецЕсли;
ЧтениеJSON = Новый ЧтениеJSON;
ЧтениеJSON.УстановитьСтроку(Сообщение.Value);
ОбъектXDTO = ФабрикаXDTO.ПрочитатьJSON(ЧтениеJSON, ТипОбъектаXDTO());
СтрокаТз = ОсновныеДанные.Добавить();
СтрокаТз.ОтпечатокВремени = Сообщение.Offset;
СтрокаТз.Представления = ТаблицыВходныхДанных.Представления.СкопироватьКолонки();
СтрокаТз.Родители = ТаблицыВходныхДанных.Родители.СкопироватьКолонки();
СтрокаТз.ДоступныеСтраны = ТаблицыВходныхДанных.ДоступныеСтраны.СкопироватьКолонки();
СтрокаТз.СертификатыРСТ = ТаблицыВходныхДанных.СертификатыРСТ.СкопироватьКолонки();
СтрокаТз.СтранаДанных = СтранаДанных;
ОписаниеСтруктуры = ?(ОбъектXDTO.IsFolder, ОписаниеСтруктурыXDTOДляОбменаГруппа(), ОписаниеСтруктурыXDTOДляОбмена());
УправлениеМастерДаннымиОбменДанными.ЗаполнитьПоСоответствиюРекурсивно(СтрокаТз, ОбъектXDTO, ОписаниеСтруктуры);
// Заполнение доп. данные по номенклатуре
УправлениеМастерДаннымиОбменДанными.ЗаполнитьПоСоответствиюРекурсивно(СтрокаТз, ОбъектXDTO, ОписаниеСтруктурыXDTOСтатусовНоменклатуры());
// Поля могут быть Булево или неопределено, заполняем вручную
ЗаполнитьЗначенияСвойств(СтрокаТз, ОбъектXDTO, "Available, doc_traceability, has_asc_warranty");
КонецЦикла;
ОбъектыНаОбновление = ПолучитьОбъектыНаОбновление(ОсновныеДанные);
Для Каждого Объект Из ОбъектыНаОбновление Цикл
ВТранзакции = ТипЗнч(Объект) = Тип("Массив");
Если ВТранзакции Тогда
НачатьТранзакцию();
Попытка
Для Каждого ОбновляемыйОбъект Из Объект Цикл
ОбновляемыйОбъект.ОбменДанными.Загрузка = Истина;
ОбновляемыйОбъект.Записать();
КонецЦикла;
ЗафиксироватьТранзакцию();
Исключение
ОтменитьТранзакцию();
ВызватьИсключение;
КонецПопытки;
Иначе
Объект.ОбменДанными.Загрузка = Истина;
Объект.Записать();
КонецЕсли;
КонецЦикла;
Возврат Истина;
КонецФункции
В обоих случаях групповая обработка имеет больший приоритет. Если реализован метод для обработки порции изменений/сообщений, то используется он, вне зависимости описан ли метод единичной обработки..
После описания методов обработки сообщений настраивается узел:

- Код и наименование - С указанием кода узла будут формироваться фоновые задания на выгрузку/загрузку сообщений
- Bootstrap servers - Строка подключения к кластеру Kafka, сервера кластера перечисляются через запятую
- Имя группы получателей - Используется брокером кластера для фиксации смещений прочитанных сообщений и отдачи следующих непрочитанных.
- Состав:
- Направление - Следовательно в какую сторону идет обмен, читаем из топика или отправляем в него
- Имя топика - Топик в кластере Kafka
- Метаданные - Изменения каких метаданных отправляем и чьим менеджером собираем/разбираем сообщения.


Из полезных настроек чтения можно выделить:
- Количество потоков чтения - По факту это количество фоновых заданий на чтение из топика. Устанавливать его больше чем количество разделов в топике не имеет смысла, потому что каждый поток подписывается на получение сообщений. А брокер каждому подписчику назначает раздел с которого ему читать сообщения, по одному на подписчика. Если подписчиков будет больше разделов топика то подписчики которым не хватило раздела будут висеть в ожидании не получая никаких сообщений.
- Количество в порции - Количество сообщений получаемых за одну итерацию чтения. Настраивается в зависимости от размера единичного сообщения в топике и трудозатратности его разбора. Если сообщение содержит +100500 строк, а при его разборе приходится обновлять множество объектов или выполнять какие ни будь сложные алгоритмы вычислений, то не стоит вычитывать большое количество единовременно, лучше читать меньшими порциями но чаще.
Для исходящих направлений, как и в любом плане обмена, метаданные должны быть добавлены в состав, для регистрации изменений по ним (ПланОбмена.ApacheKafka). Для разбора входящих сообщений менеджером метаданных, добавлять их в состав не обязательно.