Использовать компоненту можно на Сервере, так и на Клиенте.
Для использования потребуется Kafka, я запустил в Docker с дефолтными параметрами, а также внешняя компонента (v8Kafka.zip /win64)
Пример использования:
// Producer
ПутьККомпоненте = "...\v8AddInNative.dll";
Попытка
Результат = ПодключитьВнешнююКомпоненту(ПутьККомпоненте ,"Kafka", ТипВнешнейКомпоненты.Native);
Компонента = Новый ("AddIn.Kafka.Producer");
Исключение
Сообщить(ОписаниеОшибки());
КонецПопытки;
Компонента.УстановитьПараметр("bootstrap.servers","127.0.0.1:9092");
Компонента.ВызватьОтчетОДоставке = Истина;
Попытка
Результат = Компонента.Поставщик();
Результат = Компонента.Опубликовать("topic.test", "hello from 1C", "1С", "key1:value1,key2:value2");
Результат = Компонента.ОжидатьДоставкуСообщений(1000);
ОтчетОДоставке = Компонента.ОтчетОДоставке();
Исключение
ОбщегоНазначенияКлиентСервер.СообщитьПользователю(Компонента.ТекстОшибки);
КонецПопытки;
// KafkaConsumer
ПутьККомпоненте = "...\v8AddInNative.dll";
Попытка
Результат = ПодключитьВнешнююКомпоненту(ПутьККомпоненте ,"Kafka", ТипВнешнейКомпоненты.Native, ТипПодключенияВнешнейКомпоненты.НеИзолированно);
Компонента = Новый("AddIn.Kafka.KafkaConsumer");
Исключение
Сообщить(ОписаниеОшибки());
КонецПопытки;
Попытка
Компонента.УстановитьПараметр("bootstrap.servers","127.0.0.1:9092");
Компонента.УстановитьПараметр("group.id", "group_1");
Компонента.УстановитьПараметр("enable.auto.commit", "false");
Компонента.УстановитьПараметр("auto.offset.reset", "earliest");
Компонента.УстановитьПараметр("enable.partition.eof", "true");
Результат = Компонента.Потребитель();
Результат = Компонента.Подписаться("topic.test");
Пока Истина Цикл
Результат = Компонента.Читать(1000);
Если Результат = 0 Тогда
Топик = Компонента.ТопикСообщения();
Ключ = Компонента.Ключ();
Текст = Компонента.Текст();
Заголовки = Компонента.Заголовки();
Раздел = Компонента.Раздел();
Смещение = Компонента.Смещение();
ВремяЧислом = Компонента.ОтметкаВремени();
Резельтат = Компонента.Зафиксировать();
КонецЕсли;
Если Результат = -191 Тогда // Дошли до конца очереди
Прервать;
КонецЕсли;
КонецЦикла;
Результат = Компонента.Закрыть();
Исключение
КонецПопытки;
// Consumer
ПутьККомпоненте = "...\v8AddInNative.dll";
Попытка
Результат = ПодключитьВнешнююКомпоненту(ПутьККомпоненте ,"Kafka", ТипВнешнейКомпоненты.Native, ТипПодключенияВнешнейКомпоненты.НеИзолированно);
Компонента = Новый("AddIn.Kafka.Consumer");
Исключение
Сообщить(ОписаниеОшибки());
КонецПопытки;
Попытка
Компонента.УстановитьПараметр("bootstrap.servers","127.0.0.1:9092");
Компонента.УстановитьПараметр("group.id", "group_1");
Компонента.УстановитьПараметр("auto.offset.reset", "earliest");
Компонента.УстановитьПараметр("enable.partition.eof", "true");
Результат = Компонента.Потребитель();
Результат = Компонента.Топик("topic.test");
Раздел = 0;
Смещение = "27";
Результат = Компонента.Старт(Раздел, Смещение); // start(partition, offset)
Пока Истина Цикл
Результат = Компонента.Читать(Раздел, 1000);
Если Результат = 0 Тогда
Топик = Компонента.Топик();
Ключ = Компонента.Ключ();
Текст = Компонента.Текст();
Заголовки = Компонента.Заголовки();
Раздел = Компонента.Раздел();
Смещение = Компонента.Смещение();
ВремяЧислом = Компонента.ОтметкаВремени();
КонецЕсли;
Если Результат = -191 Тогда // Дошли до конца очереди
Прервать;
КонецЕсли;
КонецЦикла;
Результат = Компонента.Стоп(Раздел);
Исключение
КонецПопытки;
Поддерживает:
- сжатие gzip, lz4, zstd
- SSL
Полный список объектов и методов в документации, кратко:
Общие методы
ru | en | описание |
---|---|---|
УстановитьПараметр | SetParam | принимает 2 значение |
КодОшибки | ErrorCode | возвращает описание кода ошибки |
РезультатСобытия | EventResult | event callback |
Producer
ru | en | описание |
---|---|---|
Поставщик | Init | Создает экземпляр объекта Producer |
Опубликовать | SendMessage | Отправить сообщение |
ОпроситьОчередь | Poll | проталкивает из буффера |
ОжидатьДоставкуСообщений | Flush | обертка poll ожидает мах переданное время |
ДлинаИсходящейОчереди | OutQueueLength | число сообщений в буфере, не отправленных |
KafkaConsumer
ru | en | описание |
---|---|---|
Потребитель | Init | Создает экземпляр объекта KafkaConsumer |
Подписаться | Subscribe | Подписка на топики |
Отписаться | Unsubscribe | Отписка от топиков |
Читать | Consume | вычитывает сообщения |
Зафиксировать | CommitSync | коммитит сообщения в очереди |
Назначить | Assign | Позволяет явно подключиться к конкретному разделу |
Закрыть | Close | закрывает чтение |
** компонента предоставлена для тестового использования, будет активна до 23.12.2023