Мы выполняли задачу для компании «Галамарт» – это сеть из 500+ магазинов, расположенных по всей России, и 100+ информационных систем, функционирующих в едином контуре. В качестве решения было предложено создать брокер сообщений на базе ActiveMQ.
В этой статье я сначала объясню, зачем вообще нужен брокер сообщений и в каких случаях он применяется. Затем расскажу, из каких компонентов он состоит. Опишу, почему мы дорабатывали стандартное решение, что именно нас не устраивало, и какими преимуществами оно обладает сейчас. А также затрону вопрос выбора ActiveMQ вместо Kafka – это часто возникающий запрос. Так как у нас есть брокер и на Kafka, я могу поделится опытом: рассказать, в чем принципиальные различия между этими технологиями.
Для чего нужна шина передачи данных
Шина передачи данных – это центральный элемент информационной системы, предназначенный для обмена сообщениями между источниками и получателями.
Если у вас небольшое количество систем – например, две или три – их можно соединить напрямую по принципу Mesh («каждый с каждым»). Такая архитектура проста и работает эффективно на начальных этапах. Однако по мере роста инфраструктуры, когда количество информационных систем превышает десяток или даже полтора десятка, такой подход становится неэффективным. Любое изменение в одной из систем требует модификации всех подключенных к ней коннекторов – это трудоемко, долго и подвержено ошибкам.
Именно в таких случаях и применяется шина передачи данных. Она выступает в роли единого централизованного канала обмена, упрощая интеграцию и снижая связанность между системами.
Яркий пример – компания с развитой филиальной сетью: множество офисов, расположенных в разных городах и даже странах, работающих в разных часовых поясах. При этом важно гарантировать доставку данных независимо от состояния сети – была ли связь, пропала ли она или восстановилась.
Шина данных как раз обеспечивает надежную доставку сообщений в таких условиях. Например, она может использоваться для передачи чеков продаж из удаленных магазинов, актуальных остатков товаров, цен на номенклатуру и другой критически важной информации в центральный офис.
Второй типичный сценарий – это интеграция разрозненных систем внутри одного контура. Ситуация знакомая многим: помимо 1С, в компании используются КХД, WMS, MDM, MDO и другие специализированные системы. Все их нужно друг с другом подружить и передавать данные внутри одного контура.
Вот здесь и становится очевидно, что центральным звеном всей инфраструктуры снова выступает шина передачи данных.
Третий, и сейчас особенно актуальный кейс – передача данных с носимых устройств. Например, с кардиомониторов, которые пациенты носят в повседневной жизни. Раньше человеку надевали устройство и он на сутки оставался в больнице, занимая койку, хотя при этом мог быть практически здоров и не нуждаться в стационарной помощи. Сейчас подход изменился: пациент надевает кардиомонитор и продолжает жить обычной жизнью – ходит на работу, занимается делами. Устройство в режиме реального времени передает данные через шину данных в центральное хранилище, например, в медицинский центр или больницу. Позже врач анализирует собранные данные и, исходя из результатов, принимает решение – направить на госпитализацию или, наоборот, успокоить пациента и пожелать ему крепкого здоровья.
Архитектура шины данных для «Галамарт»
Итак, из чего состоит шина данных, разработанная нами для компании «Галамарт»? Ниже – ее общая архитектурная схема.
Давайте подробно рассмотрим каждый компонент.
У компании настроено два интернет-канала: один – основной, второй – резервный. Переключение между ними осуществляется с помощью keepalived. Стоят два haproxy, чтобы сохранить IP-адреса при переключении.
Далее – роутер ESB, разработанный нами самостоятельно. Он подключен к кластеру PostgreSQL (на базе Patroni), в котором хранятся все сообщения.
Центральным элементом является кластер ActiveMQ, состоящий из трех брокеров. Отдельно выделен кластер на базе SpringBoot для запуска Java-приложений, а также кластер из адаптеров, о которых я расскажу чуть ниже.
Вся логи системы собираются в Graylog, Elasticsearch и другие компоненты, а визуализируются в Grafana.
Процесс передачи сообщений через шину
С чего начинается работа по передаче сообщений?
У нас есть информационная система – например, 1С:Предприятие. База данных этой системы публикуется с помощью стандартных средств: Internet Information Server (IIS), Apache или Nginx.
Далее устанавливается адаптер – наше собственное решение. Вы можете либо разработать его самостоятельно, либо использовать готовый вариант от нас.Основная задача адаптера – подключиться к публикации базы данных и извлекать из нее сообщения, которые необходимо передать дальше. После этого адаптер отправляет эти сообщения в шину данных.
В конфигурации адаптера указывается IP-адрес публикации, благодаря чему он «знает», откуда забирать данные. Если на одном сервере развернуто несколько баз данных, для каждой можно настроить отдельную публикацию и установить свой адаптер. Это не создает ограничений.
Структура сообщения в шине данных
Сообщение в шине данных состоит из двух основных частей.
Тело сообщения – это полезная нагрузка, то есть непосредственные данные, которые необходимо передать из одной информационной системы в другую.
Конверт сообщения – это набор метаданных, позволяющих выполнить маршрутизацию сообщения и его унифицированную обработку. В него входят такие параметры, как:
-
адрес источника,
-
адрес получателя,
-
маршрут доставки,
-
тип данных,
-
приоритет сообщения,
-
другие технические атрибуты.
Эти поля заранее согласовываются с заказчиком на этапе проектирования и фиксируются в техническом задании.
Конверт размещается в заголовке транспортного протокола:
-
при использовании JMS (Java Message Service) – в заголовке сообщения брокера (например, в JMS Header);
-
при передаче по HTTP – в HTTP-заголовках.
Тело сообщения, в свою очередь, помещается в основную часть транспортного пакета.
Формат полезной нагрузки может быть любым: XML, JSON, CSV, DB или др. Шина данных не привязана к конкретному формату – она способна принимать данные в любом виде и при необходимости трансформировать их в тот формат, который ожидает система-получатель.
Функции адаптера: маршрутизация, доставка и нагрузка
Одна из ключевых задач, выполняемых адаптером – формирование конверта сообщения на основе HTTP-заголовков исходной информационной системы при отправке в шину данных. После парсинга метаданных адаптер формирует корректный запрос (например, REST-вызов), который затем направляется в следующую систему через шину.
Важнейшей функцией адаптера является гарантия доставки сообщений. Часто источник и получатель данных находятся в разных сетях – например, одна система работает в локальной сети магазина, а другая – удаленно, возможно, даже в другой стране. При этом связь может временно отсутствовать.
Если адаптер не может подключиться к шине данных, он не теряет сообщение, а сохраняет его в собственной локальной базе данных. Затем, в соответствии с настроенными сценариями доставки, он автоматически пытается отправить сообщение повторно.
Настройка таких сценариев в нашей реализации простая и гибкая. Например, можно задать попытку отправки раз в час в течение 24 часов, или 10 попыток подряд с коротким интервалом, или любую другую стратегию в зависимости от требований системы.
Кроме того, адаптер позволяет регулировать нагрузку. В течение дня объем сообщений может значительно возрастать, а вечером – снижаться. Чтобы сгладить пиковые нагрузки, в адаптере можно динамически увеличивать количество так называемых консьюмеров – компонентов, отвечающих за прием и обработку сообщений из информационной системы. Это позволяет равномерно распределять нагрузку и избегать перегрузок.
Адаптер также выполняет проверку сообщений на версионность – это необходимо для предотвращения дублирования. Часто возникает ситуация, когда одно и то же сообщение по ошибке или из-за сбоев в системе многократно отправляется – например, из-за зацикливания в информационной системе или повторной публикации. Если не перехватывать такие дубликаты на раннем этапе, они могут попасть в шину в огромном количестве – мы сталкивались с кейсами, когда в шину поступало по 100 тысяч, миллиону и даже по 20 миллионов одинаковых сообщений. Шина, в свою очередь, будет пытаться их все доставить в систему-получатель, что приводит к перегрузке, ложным срабатываниям и серьезным сбоям. Именно поэтому проверка на дубликаты и контроль версионности реализованы на уровне адаптера.
Еще одна важная задача адаптера – обработка ошибок при приеме и отправке сообщений. Мы разработали единый стандартный набор кодов ошибок, охватывающий все возможные сценарии сбоев при обработке сообщений, и согласовали его с заказчиком.
На этапе проектирования информационных систем разработчики закладывают поддержку этих кодов – вне зависимости от того, используется ли 1С или другая платформа. Это обеспечивает единообразие и понятность при интеграции.
Когда система-получатель не может обработать сообщение, она возвращает ошибку. Шина перехватывает ее и отправляет обратно в систему-источник, указывая точную причину сбоя. Таким образом процесс передачи данных перестает быть «черным ящиком», где неясно, что пошло не так, дошло ли сообщение и обработано ли оно. Коды ошибок возвращаются в заголовке ответа "x-gmt-error-code",
а также в заголовке "x-gmt-error-message" текста ошибки в Base64.
Код ответа от сервера в случае успеха 200, в случае возникновении ошибки – 500 с описанием в заголовке кода и текста ошибки.
Пример ошибки от адаптера:
<logs>
<log error-code="ADP-1403" timestamp="2023-12-17T09:30:47+05:00”
system="adr:shop:550e8400-e29b-41d4- a716-446655440000">
"x-gmt-data-type not found x-gmt-data-action not found"
</log>
</logs>
Код ошибки ADP-1403 означает «указаны некорректные дата и время сообщения».
В таблице приведен список кодов ошибок. Так он выглядит на этапе формирования ТЗ – номер ошибки и ее расшифровка. На маршрутизаторе есть этот список ошибок: он берет текст расшифровки и посылает в систему-источник, чтобы было понятно, почему сообщение не дошло.
Выполнение команд ОС и обращение к REST-сервисам
Следующая задача адаптера – выполнение команд операционной системы.
Адаптер предназначен не только для передачи сообщений: мы также разработали возможность для того, чтобы на компьютере, на котором установлен адаптер, можно было выполнить какие-то примитивные действия самой операционной системы и даже перезагрузить компьютер.
Это особенно актуально в случае с магазинами «Галамарт», многие из которых находятся в удаленных регионах. Если на месте возникает сбой, выезд специалиста для диагностики и перезагрузки оборудования может занять много времени. Проще послать в адаптер обычное сообщение о перезагрузке, адаптер распарсит это сообщение, поймет, что это команды для операционной системы и перезагрузит компьютер.
Когда адаптер получает специализированное технологическое сообщение, он становится на блокировку, то есть сообщения он не принимает. Разблокировка происходит только после успешного выполнения всех команд, которые ему послали.
Еще одна задача – обращение к REST-сервисам информационной системы.
<rest name=" " url-path=" " method="HTT-">
<param name=" 1"> 1</param>
…
<param name=" N"> N</param>
<body> </body>
</rest>
Для этого достаточно указать атрибут url-path, который определяет относительный адрес сервера, а также префикс пути. Тогда полный путь будет выглядеть привычным для всех программистов способом. Ничего сложного для обращения к REST-сервису здесь нет.
Работа маршрутизатора и обработка недоставленных сообщений
Итак, адаптер отправил сообщение в шину данных – и на этом этапе в работу вступает маршрутизатор.
Наш маршрутизатор построен по принципу микросервисной архитектуры и включает три основных модуля.
DLQ Manager – модуль сбора и хранения неотправленных, ошибочных сообщений, в котором работают правила автоповтора. Дело в том, что в зависимости от качества связи, ее стабильности и так далее сообщения из шины могут не дойти до приемника. Mаршрутизатор складывает такие сообщения в свою собственную базу данных (я ранее показывал, зачем нужен Patroni-кластер из Postgres). И дальше в этой очереди работают правила автоповтора: хранилище недоставленных сообщений делится на очереди в зависимости от того, для кого предназначались эти сообщения, и для каждой очереди можно строить свои собственные правила автоповтора. Можно построить несколько правил, и они будут идти поочередно, например:
-
Попытаться отправить сообщение 10 раз подряд с коротким интервалом.
-
Если не удалось – второе правило: переходить на попытки раз в 5 минут в течение 24 часов.
-
При необходимости – третье правило: активировать интенсивную отправку (например, 100 попыток за минуту), а после – сформировать уведомление об ошибке: «источник недоступен».
Второй компонент маршрутизатора – WebUI, графический интерфейс управления брокером. Мы разработали его, чтобы избавить администраторов от необходимости постоянно работать в консоли и вручную выполнять рутинные операции – такие как добавление источников, приемника или маршрутов.
WebUI получился удобным, интуитивно понятным и визуально приятным. Через него можно полностью управлять всей инфраструктурой шины – все необходимое под рукой, без лишних сложностей.
И третий компонент – это модуль конфигурирования настройками брокера Configuration, который соединен непосредственно с WebUI.
Передача файлов через шину данных
Один из самых частых вопросов, который мне задавали на конференциях: «Хорошо, мы отправляем текстовые сообщения – понятно. Но что делать, если бизнес требует прикрепить к сообщению файл – например, PDF, изображение или документ?»
И тут важно понимать: брокер – это брокер сообщений. Его задача – надежная доставка данных, а не передача файлов. А при отправке файлов объемом более одного мегабайта, вне зависимости от используемой платформы – ActiveMQ, Kafka или другой – происходит сильная деградация производительности. Шины данных изначально не предназначены для отправки файлов, а ориентированы на небольшие текстовые сообщения.
Наглядный пример – один из московских банков, где жестко ограничили размер сообщения на шине до 40 килобайт. Превысить лимит нельзя – система просто отклонит запрос. Разработчики информационных систем знают об этом и адаптируют свои решения.
Но у нас пошел другой путь – более гибкий и безопасный.
Мы интегрировали объектное хранилище, которое называется MinIO S3. Если вам нужно отправить файл:
-
Система-источник загружает файл в хранилище с помощью обычного POST-запроса.
-
В ответ она получает постоянную ссылку на хранение данного объекта.
-
В сообщении, отправляемом через шину, передается уже не сам файл, а только ссылка на него.
-
Система-приемник, получив сообщение, самостоятельно забирает файл по этой ссылке напрямую из хранилища.
Таким образом, сама шина не участвует в передаче файла, хотя к файлу можно прикрепить любую текстовую информацию – она придет вместе с сообщением и вместе с этой ссылкой. А сам файл в данном случае забирается непосредственно из кластера MinIO S3.
Это стандартный S3-протокол. В своем решении мы использовали open-source решение от MinIO, тем самым защитив шину от деградации сервиса при посылке больших сообщений.
Преимущества использования хранилища S3
Еще один важный и выигрышный сценарий – это случай, знакомый многим: в информационной системе хранится карточка товара – изображение, описание, цена и другие атрибуты. Эта информация передается на сайт, где отображается для покупателей.
Но что происходит, если товар остался тем же, а изменилось только изображение? Например, производитель обновил упаковку, сменил цвет или добавил новую функцию, но цена и описание не изменились.
Именно здесь на помощь приходит наше объектное хранилище на базе MinIO S3. Мы просто заменяем старый файл на новый – например, загружаем обновленное изображение с тем же ключом. Хранилище автоматически обновляет объект, а ссылка на него остается неизменной.
При этом у нас настроены триггеры на уровне хранилища. Как только файл обновляется, система фиксирует изменение и через ActiveMQ отправляет уведомление в систему-получатель – например, на сайт – с информацией: «Объект по ссылке X был изменен». Сайт автоматически ловит эти триггеры и в автоматическом режиме подменяет изображение. Здесь человек практически не участвует.
Такая автоматизация кардинально снизила нагрузку на категорийных и товарных менеджеров, которые раньше вручную искали несоответствия между изображениями на сайте и реальным товаром. И это не просто неудобство – рассогласованность данных может привести к нарушению законодательства. Если покупателю показывают один товар, а по факту он получает другой (например, другого цвета или комплектации), это считается введением в заблуждение и нарушает права потребителей согласно российскому законодательству. Теперь же вся информация остается актуальной, обновляется мгновенно, а риски минимизированы.
На изображении показан скриншот нашего рабочего хранилища S3. Вы видите, что для каждой очереди или типа данных создан отдельный бакет – своего рода изолированная корзина, где хранятся соответствующие файлы.
Если провалиться внутрь корзины, то можно настроить правила. Хранить объекты вечно и вручную следить за ними не нужно.
Для каждой корзины можно сделать соответствующие настройки. Например, вы точно знаете, что если вы высылаете цены через хранилище, то эти цены станут неактуальны через месяц, потому что через месяц вы их обновите. Можно настроить, что хранить данные объекты нужно всего лишь 29 дней, после чего они будут автоматически удалены хранилищем, и корзина будет освобождена. То есть за местом вручную следить не нужно.
Преимущества разработанного решения
Хочу кратко остановиться на преимуществах нашего решения.
Мы более пяти лет работали на платформе ActiveMQ, и после этого начали переход на новое поколение – Apache Kafka. У нас за годы эксплуатации накоплено множество наработок.
Web-интерфейс управления брокером сообщений (создание/редактирование/удаление маршрутов, политик, правил отправки сообщений, автоповторов, трансформация сообщений, маппинг полей в сообщении). Как правило, «из коробки» решение представляет собой консольную версию, где каждое действие выполняется через командную строку. У самого ActiveMQ есть веб-интерфейс, но он настолько простой и примитивный, что большинство операций все равно приходится выполнять через консоль. Тем не менее, создание, редактирование и удаление маршрутов, политик и правил возможно осуществлять через веб-интерфейс с помощью обычных кликов.
Мониторинг компонентов шины через веб-интерфейс маршрутизатора или преднастроенными триггерами Zabbix. Мониторинг компонентов шины мы разработали по нисходящему принципу. На центральной странице отображается несколько квадратов, окрашенных в зеленый, желтый или красный цвет. Они показывают состояние систем: какие находятся в критическом состоянии (долго отсутствует связь), какие вот-вот могут выйти из строя и так далее. Переходя внутрь квадрата, можно получить детализированную информацию: какая именно система недоступна, вплоть до конкретного магазина, а также какая часть в нем не работает – адаптер, публикация или сама информационная система. Таким образом, мы получаем детализацию сверху вниз.
Управление очередями, источниками и приемниками сообщений через веб-интерфейс (поиск сообщений по ID, быстрый просмотр тела сообщения, фильтр по условиям). Это позволяет быстро находить сообщения, отслеживать, когда они проходили и через какие адаптеры.
Настроенная кластеризация и отказоустойчивость. Все параметры подбирались опытным путем. Конфигурация, которую я показывал на самом верхнем слайде, на наш взгляд, является оптимальной для объемов около 30 миллионов сообщений в месяц,
Настроенные регламентные задания (резервное копирование, вакуумирование БД и т.д.). Мы можем передать готовые скрипты, которые все это будут делать,
Готовые телеграмм боты (мобильное управление компонентами шины, Alert-bot). Это наше ноу-хау. Как вы знаете, программисты и айтишники в целом – люди ленивые. Поэтому мы решили управлять всей шиной, не вставая с дивана. Например, ночью, если что-то случается, нет необходимости подходить к компьютеру или включать его. Для этого у нас настроены Telegram-боты с кнопками. Сейчас в Telegram появился новый функционал: можно прикреплять не только кнопки и картинки, но даже целые сайты. Благодаря этому дежурный системный инженер может взять телефон, войти в чат-бот и управлять основными компонентами шины: посмотреть, где находится сообщение, проверить график загрузки конкретной очереди (в виде снапшота из Zabbix), перезагрузить компоненты шины и так далее – все это прямо с телефона через Telegram-бота.
Простой и понятный веб-интерфейс визуализации прохождения сообщения по всем компонентам шины (источник -> адаптер -> брокер -> адаптер -> приемник). Источник, адаптер, брокер, адаптер приемника и сам приемник отображаются в виде квадратиков системных администраторов с вопросами вроде: «Найдите мое сообщение, я отправил его пять минут назад, где оно?» – все видно прямо в интерфейсе. Введя уникальный ID сообщения, можно увидеть, через какие квадратики оно прошло и когда. Интерфейс специально сделан простым, чтобы люди, не глубоко знакомые с шиной и IT, могли легко отслеживать сообщения. Например, можно понять: когда я отправил цены и где они сейчас – дошли ли до системы приемника или застряли из-за проблем со связью. Если в цепочке отсутствует приемник, сразу становится понятно, что нужно бить тревогу.
Готовая и проверенная временем интеграция с более чем 150-ю системами (Честный Знак, Axelot, MDM, OMS, 1С: ЗУП, 1С: УТ, 1С: ERP 2.5, WMS Manhattan, WMS Инфор, WMS 1С: Склад и тд.). Мы потратили много времени на интеграцию с государственными системами, в частности с «Меркурием».
Интеграция с хранилищем объектов по протоколу S3 (MinIO, Amazon) для обмена файлами через брокер (включая уведомление об изменение файла в хранилище). Я анализировал предложения конкурентов и везде, где задавал вопрос «А как отправлять файлы?», получал примерно одинаковый ответ: «До одного мегабайта можно отправлять, а больше – не очень». Либо предлагают использовать простое файловое хранилище: складываешь файл туда, а в сообщении отправляешь ссылку на диск или каталог. Но проблема в том, что файл можно переместить, удалить и так далее – и ссылка протухнет. С объектным хранилищем ситуация другая: внутри бакета объект можно перемещать, и ссылка при этом не изменится; вытащить объект из бакета и переместить в другой – невозможно. Соответственно, ссылка, которую передает шина, всегда остается актуальной. И это одно из наших преимуществ при разработке решения.
Интеграция с CI/CD, GitLab для разработки своих собственных компонент, в частности модификация адаптеров, которые мы предоставляем.
Вот, пожалуй, и все, что хотелось рассказать о ключевых элементах нашей шины данных. Это результат многолетней работы, тысяч интеграций и решения реальных бизнес-задач.
*************
Статья написана по итогам доклада (видео), прочитанного на конференции INFOSTART TECH EVENT.
Вступайте в нашу телеграмм-группу Инфостарт