Yandex DataLens — это сервис для бизнес-аналитики. Сервис позволяет подключаться к различным источникам данных, строить визуализации, собирать дашборды и делиться полученными результатами.
С помощью Yandex DataLens вы можете отслеживать продуктовые и бизнес-метрики напрямую из источников, чтобы принимать решения, основанные на данных.
На данный момент Yandex DataLens выложен в общий доступ и его можно развернуть на собственных ресурсах, но на практике и в данной статье лучше использоваться облачный DataLens.
Цель статьи: продемонстрировать сбор данных о продажах в режиме реального времени с множества торговых точек, загрузку этих данных в базу данных, расположенную в сервисе Яндекс.Облако и визуализацию собранных данных в чартах Yandex DataLens.
В качестве демонстрации мы будем использовать бессерверные (serverless) сервисы платформы Яндекс.Облако. Код реализации задачи предоставлен в упрощенном виде, сервисы авторизации не используются.
Изменения в конфигурации 1С
В качестве примера будет использоваться конфигурация 1С: Розница, редакция 3.0. Однако можно использовать любую другую конфигурацию, в том числе 1С:РМК и 1С:Мобильная касса.
Создаем новое расширение ExportToBI. Поскольку расширение изменяет общие модули, безопасный режим следует выключить.
Далее заимствуем следующие объекты конфигурации: Общий модуль ДополнительныеОтчетыИОбработки, Справочники Номенклатура и СтруктурныеЕдиницы, Регистр накопления Продажи.
Для регистрации изменений к выгрузке создается новый план обмена expBI_Обмен, в состав которого включаем заимствованные ранее справочники и регистр накопления. Для упрощения задачи авторегистрацию ставим в режим Разрешить. Разумеется в проде следует не использовать авторегистрацию, а регистрировать изменения через подписки на события
Добавляем в расширение серверный модуль expBI_ОбменДанными со следующими процедурами и функциями:
Для конвертации массивов и структур в формат JSON
Функция СтруктураВJSON(Значение)
НастройкиСериализацииJSON = Новый НастройкиСериализацииJSON();
НастройкиСериализацииJSON.ФорматСериализацииДаты = ФорматДатыJSON.ISO;
ПараметрыЗаписи = Новый ПараметрыЗаписиJSON(ПереносСтрокJSON.Нет);
ЗаписьJSON = Новый ЗаписьJSON();
ЗаписьJSON.УстановитьСтроку(ПараметрыЗаписи);
ЗаписатьJSON(ЗаписьJSON, Значение, НастройкиСериализацииJSON);
Возврат ЗаписьJSON.Закрыть();
КонецФункции
Получение узла плана обмена для использования в методах механизмов обмена данными (упрощённый режим, в проде лучше использовать константы или другие средства)
Функция ПолучитьУзел()
Выб = ПланыОбмена.expBI_Обмен.Выбрать();
Пока Выб.Следующий() Цикл
Если Не Выб.ЭтотУзел Тогда
Возврат Выб.Ссылка;
КонецЕсли;
КонецЦикла;
УстановитьПривилегированныйРежим(Истина);
Нов = ПланыОбмена.expBI_Обмен.СоздатьУзел();
Нов.Наименование = "Облако";
Нов.ОбменДанными.Загрузка=Истина;
Нов.Код = Строка(Новый УникальныйИдентификатор());
Нов.Записать();
Возврат Нов.Ссылка;
КонецФункции
Получение для выгрузки структуры с данными элемента справочника
Функция ДанныеСправочника(Имя, СправочникОбъект)
Результат = Новый Структура;
Результат.Вставить("type", Имя);
Результат.Вставить("guid", Строка(СправочникОбъект.Ссылка.УникальныйИдентификатор()));
Результат.Вставить("name", СправочникОбъект.Наименование);
Результат.Вставить("code", СправочникОбъект.Код);
Возврат Результат;
КонецФункции
Получение для выгрузки структуры с данными набора записей регистра накопления
Функция ДанныеРегистраНакопления(Имя, Набор)
Результат = Новый Массив;
Для каждого Стр из Набор Цикл
Если Стр.Количество=0 и Стр.Сумма=0 Тогда
Продолжить;
КонецЕсли;
Данные = Новый Структура;
Данные.Вставить("type", Имя);
Данные.Вставить("guid", Строка(Набор.Отбор.Регистратор.Значение.УникальныйИдентификатор()));
Данные.Вставить("date", Формат(Стр.Период, "ДФ=yyyy-MM-ddTHH:mm:ss"));
Данные.Вставить("id", Стр.НомерСтроки);
Данные.Вставить("product", Строка(Стр.Номенклатура.УникальныйИдентификатор()));
Данные.Вставить("unit", Строка(Стр.Склад.УникальныйИдентификатор()));
Данные.Вставить("count", Стр.Количество);
Данные.Вставить("sum", Стр.Сумма);
Результат.Добавить(Данные);
КонецЦикла;
Возврат Результат;
КонецФункции
Подготавливаем пакет с измененными данными для выгрузки. Поскольку используемые для решения задачи сервисы в Яндекс.Облаке имеют ограничения по объему передаваемых данных мы будем использовать для отправки не все данные, а небольшие порции. Опять же предоставленный алгоритм сильно упрощен.
Функция ПодготовитьПакет()
РазмерПакета = 30;
Выбрано=0;
Результат = Новый Структура;
Ссылки = Новый Массив;
УдалитьРегистрацию = Новый Массив;
ДанныеДляОтправки = Новый Массив;
Узел = ПолучитьУзел();
ВыборкаИзменений = ПланыОбмена.ВыбратьИзменения(Узел, Узел.НомерОтправленного + 1);
Пока ВыборкаИзменений.Следующий() Цикл
Данные = ВыборкаИзменений.Получить();
Если Данные = Неопределено Тогда
Продолжить;
КонецЕсли;
Если ТипЗнч(Данные) = Тип("УдалениеОбъекта") Тогда //Не учитываем для учебных целей
Продолжить
ИначеЕсли ТипЗнч(Данные) = Тип("СправочникОбъект.Номенклатура")Тогда
Ссылки.Добавить(Данные);
Если Данные.ЭтоГруппа Тогда
ДанныеСправочника = ДанныеСправочника("groups", Данные)
Иначе
ДанныеСправочника = ДанныеСправочника("products", Данные);
ДанныеСправочника.Вставить("group", Строка(Данные.Родитель.УникальныйИдентификатор()))
КонецЕсли;
ДанныеДляОтправки.Добавить(ДанныеСправочника);
ИначеЕсли ТипЗнч(Данные) = Тип("СправочникОбъект.СтруктурныеЕдиницы")Тогда
Ссылки.Добавить(Данные.Ссылка);
ДанныеДляОтправки.Добавить(ДанныеСправочника("units", Данные));
ИначеЕсли ТипЗнч(Данные) = Тип("РегистрНакопленияНаборЗаписей.Продажи") Тогда
ДанныеРегистра = ДанныеРегистраНакопления("sales", Данные);
Если ДанныеРегистра.Количество()>0 Тогда
Ссылки.Добавить(Данные);
ОбщегоНазначенияКлиентСервер.ДополнитьМассив(ДанныеДляОтправки, ДанныеРегистра);
Выбрано = Выбрано + ДанныеРегистра.Количество();
Иначе
УдалитьРегистрацию.Добавить(Данные);
КонецЕсли;
КонецЕсли;
Выбрано = Выбрано + 1;
Если Выбрано>РазмерПакета Тогда
Прервать
КонецЕсли;
КонецЦикла;
Результат.Вставить("УдалитьРегистрацию", УдалитьРегистрацию);
Результат.Вставить("Ссылки", Ссылки);
Результат.Вставить("Данные", ДанныеДляОтправки);
Возврат Результат;
КонецФункции
Создаем объект HTTPСоединение на основании адреса
Функция ПолучитьСоединение(Адрес) Экспорт
СтруктураURI=ОбщегоНазначенияКлиентСервер.СтруктураURI(Адрес);
Прокси = ПолучениеФайловИзИнтернета.ПолучитьПрокси("https");
ЗащищенноеСоединение = ?(НРег(СтруктураURI.Схема) = "http", Неопределено, ОбщегоНазначенияКлиентСервер.НовоеЗащищенноеСоединение());
Попытка
Возврат Новый HTTPСоединение(СтруктураURI.Хост, СтруктураURI.Порт, , , Прокси, 120, ЗащищенноеСоединение);
Исключение
ВызватьИсключение ПодробноеПредставлениеОшибки(ИнформацияОбОшибке());
КонецПопытки;
КонецФункции
Непосредственной процедура обмена.
Процедура ВыполнитьОбмен() Экспорт
Пока Истина Цикл
Пакет = ПодготовитьПакет();
Узел = Неопределено;
Если Пакет.УдалитьРегистрацию.Количество()>0 Тогда
Узел = ПолучитьУзел();
Для каждого Рег из Пакет.УдалитьРегистрацию Цикл
ПланыОбмена.УдалитьРегистрациюИзменений(Узел, Рег);
КонецЦикла;
КонецЕсли;
Если Пакет.Данные.Количество()=0 Тогда
Возврат
КонецЕсли;
Адрес = "https://*************************.apigw.yandexcloud.net";
Соединение = ПолучитьСоединение(Адрес);
Запрос = Новый HTTPЗапрос("/data");
Запрос.Заголовки.Вставить("Content-Type", "application/json");
Запрос.УстановитьТелоИзСтроки(СтруктураВJSON(Пакет.Данные));
//ПолучитьДвоичныеДанныеИзСтроки(Запрос.ПолучитьТелоКакСтроку()).Записать("C:\Users\sedaiko\test.json");
Ответ = Соединение.ОтправитьДляОбработки(Запрос);
Если Ответ.КодСостояния<>200 Тогда
ВызватьИсключение Ответ.ПолучитьТелоКакСтроку()
КонецЕсли;
Если Узел = Неопределено Тогда
Узел = ПолучитьУзел();
КонецЕсли;
Для каждого Стр из Пакет.Ссылки Цикл
ПланыОбмена.УдалитьРегистрациюИзменений(Узел, Стр);
КонецЦикла;
КонецЦикла;
КонецПроцедуры
Обратите внимание на обязательную передачу в HTTPЗапросе заголовка "Content-Type: application/json", иначе в функцию обработке будет передана json-структура, закодированная в base64. Адрес для обмена берем из сервиса API-Gateway, создание которого описано далее.
На данный момент разработчики Библиотеки стандартных подсистем не предоставили возможность использовать собственные регламентные задания из расширений, будем использовать костыль. Для этого создаем общий серверный модуль expBI_РегламентноеЗаданиеОбмена со следующими процедурами и функциями
Имя регламентного задания для поиска
Функция КлючРегламентногоЗадания() Экспорт
возврат "expBI_РегламентноеЗаданиеОбмена.ВыполнитьСинхронзациюВФоне"
КонецФункции
Создание регламентного задания
ДобавитьРегламентноеЗадание
Функция ДобавитьРегламентноеЗадание()
КлючРЗ = КлючРегламентногоЗадания();
РЗ = РегламентныеЗадания.СоздатьРегламентноеЗадание("ЗапускДополнительныхОбработок");
РЗ.Наименование = "Обмен с облаком";
РЗ.Ключ = КлючРЗ;
РЗ.Использование = Истина;
РЗ.Параметры.Добавить(Справочники.ДополнительныеОтчетыИОбработки.ПустаяСсылка());
РЗ.Параметры.Добавить(КлючРЗ);
РЗ.Записать();
Возврат РЗ
КонецФункции
Установка периода запуска регламентного задания
Процедура УстановитьИнтервалОбмена(Интервал) Экспорт
КлючРЗ = КлючРегламентногоЗадания();
РезультатПоиска = РегламентныеЗаданияСервер.НайтиЗадания(Новый Структура("Ключ", КлючРЗ));
Если РезультатПоиска.Количество() = 0 Тогда
РегламентноеЗадание = ДобавитьРегламентноеЗадание();
Иначе
РегламентноеЗадание = РезультатПоиска[0];
КонецЕсли;
Расписание = Новый РасписаниеРегламентногоЗадания;
Расписание.ПериодПовтораДней=1;
Расписание.ПериодПовтораВТечениеДня = Интервал*60;
РегламентноеЗадание.Расписание = Расписание;
РегламентноеЗадание.Записать();
КонецПроцедуры
Вызов процедура обмена
ВыполнитьСинхронзациюВФоне
Функция ВыполнитьСинхронзациюВФоне(ИдентификаторКоманды, АдресРезультата = Неопределено) Экспорт
Отказ = Ложь;
ТекстОшибки = ""; //сюда пишем подробности по ошибкам обработки
expBI_ОбменДанными.ВыполнитьОбмен();
Если Не ПустаяСтрока(ТекстОшибки) Тогда
ЗаписьЖурналаРегистрации(
"Обмен с облаком", УровеньЖурналаРегистрации.Ошибка,,,ТекстОшибки);
КонецЕсли;
Результат = Новый Структура("Успешно, ТекстОшибки", Не Отказ, ТекстОшибки);
Если АдресРезультата <> Неопределено Тогда
ПоместитьВоВременноеХранилище(Результат, АдресРезультата);
КонецЕсли;
Возврат Результат;
КонецФункции
И сам костыль - в заимствованном модуле ДополнительныеОтчетыИОбработки переопределяем процедуру ВыполнитьКоманду директивой &Вместо
&Вместо("ВыполнитьКоманду")
&Вместо("ВыполнитьКоманду")
Функция expBI_ВыполнитьКоманду(ПараметрыКоманды, АдресРезультата)
Если Найти(ПараметрыКоманды.ИдентификаторКоманды, expBI_РегламентноеЗаданиеОбмена.КлючРегламентногоЗадания())=1 Тогда
Возврат expBI_РегламентноеЗаданиеОбмена.ВыполнитьСинхронзациюВФоне(ПараметрыКоманды.ИдентификаторКоманды, АдресРезультата)
КонецЕсли;
// Вставить содержимое метода.
Результат = ПродолжитьВызов(ПараметрыКоманды, АдресРезультата);
Возврат Результат;
КонецФункции
Для интерактивности и отладки создаем общую форму expBI_Настройка с командой ВыполнитьОбмен, реквизитом Интервал.
В модуле описываем следующие процедуры и обработчики. Регламентное задание обмена создастся и активируется при первом открытии формы. Опять же все сделано для упрощения демонстрации.
Модуль формы
expBI_Настройка
&НаСервере
Процедура ПриСозданииНаСервере(Отказ, СтандартнаяОбработка)
Если Интервал=0 Тогда
Интервал=1;
expBI_РегламентноеЗаданиеОбмена.УстановитьИнтервалОбмена(Интервал);
КонецЕсли;
КонецПроцедуры
&НаСервере
Процедура ИнтервалПриИзмененииНаСервере()
expBI_РегламентноеЗаданиеОбмена.УстановитьИнтервалОбмена(Интервал);
КонецПроцедуры
&НаКлиенте
Процедура ИнтервалПриИзменении(Элемент)
ИнтервалПриИзмененииНаСервере();
КонецПроцедуры
&НаСервереБезКонтекста
Процедура ВыполнитьОбменНаСервере()
expBI_ОбменДанными.ВыполнитьОбмен();
КонецПроцедуры
&НаКлиенте
Процедура ВыполнитьОбмен(Команда)
ВыполнитьОбменНаСервере();
КонецПроцедуры
На этом расширение конфигурации закончено. Можно переходить к настройкам сервисов облака.
Создаем сервисы в Яндекс.Облако
Для нашей задачи будем использовать следующие облачные ресурсы: API-Gateway, Cloud Functions и Базу данных YDB.
Для начала заходим в консоль Яндекс.Облако, создаем новое облако и переходим созданный каталог ресурсов. Подробно о первоначальной настройки описано в документации.
В разделе Сервисные аккаунты каталога ресурсов создаем сервисный аккаунт. Поскольку мы настраиваем облако в учебных целях, не будем углубляться в настройку прав и дадим аккаунту роль admin.
База данных YDB.
Создаем первый ресурс в облаке - База данных YDB (Managed Service for YDB). В качестве типа базы данных указываем Serverless. Пропускную способность указываем Без ограничения. Нажимаем кнопку Создать базу данных.
В созданной базе данных переходим в раздел навигация и нажимаем справа кнопку Новый SQL-запрос. Вставляем DDL-запросы и жмем Выполнить.
CREATE TABLE `products`
(
`guid` Utf8 NOT NULL,
`code` Utf8,
`group` Utf8,
`name` Utf8,
PRIMARY KEY (`guid`)
);
CREATE TABLE `units`
(
`guid` Utf8 NOT NULL,
`code` Utf8,
`group` Utf8,
`name` Utf8,
PRIMARY KEY (`guid`)
);
CREATE TABLE `groups`
(
`guid` Utf8 NOT NULL,
`code` Utf8,
`name` Utf8,
PRIMARY KEY (`guid`)
);
CREATE TABLE `sales`
(
`guid` Utf8 NOT NULL,
`id` Int32,
`date` Datetime,
`unit` Utf8,
`product` Utf8,
`count` Float,
`sum` Float,
PRIMARY KEY (`guid`, `id`)
);
Теперь у нас есть таблицы для аналитики.
В разделе Обзор копируем имя базы данных. Это значение параметра database в пункте Эндпоинт, оно будет вида /ru-central1/*********/***********. Это значение нам понадобится в функции ниже
Cloud Functions
На локальном компьютере создаем два файла следующего содержания и упаковываем из в zip-архив
import re
from json import loads as json_loads
from datetime import datetime
from ydb import iam, aio as ydb_aio
credentials = iam.MetadataUrlCredentials()
# credentials=iam.ServiceAccountCredentials.from_file('key.json')
database = "/ru-central1/b1gi0b0h*******/etn8094o***********"
driver = ydb_aio.Driver(
database=database,
endpoint="grpcs://ydb.serverless.yandexcloud.net:2135",
credentials=credentials,
)
pool = ydb_aio.SessionPool(driver, size=50) # Max number of available session
async def execute_sql(session, query: str, parameters: dict):
prepared_query = await session.prepare(query)
await session.transaction().execute(prepared_query, parameters, commit_tx=True)
def get_type(val):
if isinstance(val, bool):
return 'Bool'
elif isinstance(val, int):
return 'Int32'
elif isinstance(val, float):
return 'Float'
elif re.match(r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}$', val) is not None:
return 'DateTime'
return 'Utf8'
def convert_val(value):
if isinstance(value, str) and re.match(r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}$', value) is not None:
return int(datetime.strptime(value, "%Y-%m-%dT%H:%M:%S").timestamp())
return value
async def load_data(messages):
tables = []
session = await pool.acquire()
for msg in messages:
table = msg['type']
fields = [f for f in msg.keys() if f != 'type']
values = [f"${f}" for f in fields]
declare = [f'DECLARE ${f} AS {get_type(msg[f])}' for f in fields]
parameters = {f"${f}": convert_val(msg[f]) for f in fields}
query = f"{'; '.join(declare)}; UPSERT INTO {table} ({','.join(fields)}) VALUES ({','.join(values)})"
await execute_sql(session, query, parameters)
async def handler(event, context):
body = json_loads(event.get('body'))
await load_data(body)
return {
'statusCode': 200,
'body': dict(result=True),
}
В переменную database подставляем значение параметра database скопированное в предыдущем пункте.
В консоли в каталоге ресурсов создаем функцию с именем save-db. Переходим в раздел Редактор.
Указываем среду выполнения Python 3.12, способ создания zip-архив. В качестве файла указываем созданный ранее zip-архив. Таймаут ставим 300секунд, память 128Мб, указываем ранее созданный сервисный аккаунт. Нажимаем Сохранить изменения.
API-Gateway.
Создаем в каталоге ресурсов API-Gateway (API-шлюз). Указываем следующую спецификацию.
openapi: 3.0.0
info:
title: Sample API
version: 1.0.0
paths:
/data:
post:
x-yc-apigateway-integration:
type: cloud_functions
function_id: d4euiv********
service_account_id: ajegd***********
Значение function_id: указываем идентификатор (не имя!) ранее созданной функции, а в service_account_id идентификатор сервисного аккаунта.
Сохраняем изменения. После того как шлюз буден создан, в разделе Общая информация будет доступен Служебный домен, адрес которого необходимо присвоить переменной Адрес в функции expBI_ОбменДанными.ВыполнитьОбмен расширения.
Собственно сервисы готовы. Открываем консоль windows и выполняем команду
curl -X POST -H "Content-Type: application/json" https://********.apigw.yandexcloud.net/data -d"[{\"type\": \"products\",\"guid\": \"abcd\",\"name\": \"Товар\"}]
Сервер должен вернуть:
{"result":true}
На этом настройку сервисов облака можно считать законченной.
Синхронизация
Для первичной синхронизации Складов и Номенклатуры можно зарегистрировать изменения следующим кодом во внешней обработке
Код регистрации объектов к обмену
Выб = ПланыОбмена.expBI_Обмен.Выбрать();
Пока Выб.Следующий() Цикл
Если Не Выб.ЭтотУзел Тогда
Узел = Выб.Ссылка;
КонецЕсли;
КонецЦикла;
ПланыОбмена.ЗарегистрироватьИзменения(Узел, Метаданные.Справочники.Номенклатура);
ПланыОбмена.ЗарегистрироватьИзменения(Узел, Метаданные.Справочники.СтруктурныеЕдиницы);
После чего заходим в раздел Главное конфигурации, находим команду открытия общей формы expBI_Настройка и выполняем синхронизацию. В разделе Навигация базы данных YDB можно увидеть загруженные справочники.
Настройка Yandex DataLens
Для примера настроим настроим один чарт. Не будем углубляться в тонкости настройки и терминологию, при необходимости можно обратится к документации.
Переходим в Yandex DataLens и, в-первую очередь, подключимся к базе данных YDB
Теперь необходимо создать датасет. Везде указываем тип связи LEFT JOIN
Настройка связей в датасете
Переименовываем необходимые поля датасета, а ненужные скрываем
Настраиваем чарт. Для примера будем использовать линейную диаграмму с отбором по складу Розничный магазин
Настройка линейной диаграммы
Создаем дашборд с настроенным чартом. Для наглядности в настройках дашборда лучше указать автообновление через интервал, скажем, 30 секунд.
На этом настройка нашего тестового стенда выполнены. Пробуем пробить чек или провести продажу по складу Розничный магазин. Через пару минут продажа должна отразится в чарте.
Послесловие
В данной статье реализован тестовый пример сбора данных и их визуализации в облачной BI-системе.
Разумеется, описанные алгоритмы и настройки подходят только для целей ознакомления. Для промышленной реализации следует создать дополнительные функции и настройки: авторизация в API-gateway, тонкая настройка сервисных аккаунтов (при необходимости), учесть удаление движений набора записей регистра накопления и т.д.
Для больших баз полную первоначальную синхронизацию через API-gateway сделать вряд ли получится, т.к. имеется ограничение размера для тела запроса. Тут уже лучше использовать обмен через Object Storage.
Провайдеры облачных ресурсов дают широкий набор инструментов, главное верно их использовать для решения поставленных задач.