Пример построения микросервисов с использованием apache kafka.
Задача:
Построить событийную архитектуру между приложениями(микросервисы)(не только 1с).
Общая логика решения задачи():
1. Приложение А генерирует событие
2. Другое приложение Б(В, Г..) "слушает" кафка (подписывается на событие)(если коротко)
3. Приложение А отправляет данные кафка (Поставщик)
4. кафка отправляет данные всем кто подписан на это событие (Потребитель)
5. Приложение Б(В, Г..) получает данные
Технологии:
1. 1с (поставщик, потребитель, tcp клиент)
2. Разнородный бэкенд (python, и тд) (поставщик, потребитель)
3. с++ 1с вк native (транспорт для tcp клиента 1с)
4. apache kafka (шина данных) https://kafka.apache.org
5. flask (python micro web сервер) http://flask.pocoo.org
6. tcp сервер (python)
7. KafkaConsumer, KafkaProducer (python) https://github.com/dpkp/kafka-python
Общая принцип работы(модули):
1с(1):
При запуске 1с стартует tcp клиент (вк native)(3) .Устанавливает соединение с tcp сервером(6) (передавая ид пользователя из 1с). Подписывается на событие вк native. При наступления события уходит в контекст 1с через ОбработкаВнешнегоСобытия для последующей обработки. Для генерации событий выполняет гет, пост запросы на веб сервер(5).
tcp сервер(6):
Хеш таблица открытых сокетов(активных соединений) и ид пользователя. Подписывается на события шины данных для отправки клиенту 1с. При наступлении событий отправляет данные в сокет(клиенту 1с). Генерирует события при авторизации(установка сокета), выхода(разрыв, закрытие сокета) в шину данных.
web сервер(5):
Обрабатывает запросы со стороны 1с(прокси для кафка) и отправляет в шину данных. Также подписывается на события сторонних приложений требующих отправку данных для 1с(изменился статус звонка asterisk, прилетел тригер для обновления формы, перегнать что то с &НаСервере на &НаКлиенте , и тд).
Шина данных(4):
Управляет оркестром приложений(упорядоченная очередь). При наступлении события маршутизирует подписчикам, если подсписчиков нет, хранит у себя, доставит потом когда появятся.
Разнородный бэкенд(2):
Подписывается, генерирует события в шину данных.(например web app time line(журнал записи), asterisk ami client(автообзвон, фонер), из предыдущих статей)
Шаг 1. tcp сервер
from socketserver import TCPServer, ThreadingMixIn, BaseRequestHandler
from datetime import datetime, date
from kafka import KafkaConsumer
from kafka import KafkaProducer
import _thread
class ThreadedTCPServer(ThreadingMixIn, TCPServer):
pass
class OnesSocketServerHandler(BaseRequestHandler):
def handle(self):
self.callback(self.server, self.request, self.client_address)
class OnesSocketServer():
handler = OnesSocketServerHandler
users = {}
producer = KafkaProducer(bootstrap_servers=['192.168.777.555:9092'])
def __init__(self):
self.handler.callback = self.callback
_thread.start_new_thread(self.start_listen_event)
def callback(self, server, request, client_address):
print(f"""CONNECTED LISTENER {client_address}""")
u_ref = None
while True:
try:
buf = request.recv(256).decode('utf-8')
except:
break
#print(f"""recv {buf}""")
if not buf:
break
buf = buf.strip('\n')
if buf == 'logout':
break
elif buf[0:5] == 'u_ref' and len(buf[6:]) > 1:
u_ref = buf[6:]
u_ref = u_ref.upper()
self.users.setdefault(u_ref, dict())
self.users[u_ref][client_address] = request
authorize(user = u_ref, address = client_address[0], producer = self.producer)
print(f"""DISCONNECTED LISTENER {client_address}""")
if not u_ref is None:
user = self.users.get(u_ref)
if not user is None:
sock = user[client_address]
if not sock is None:
if sock._closed != True:
sock.close()
del user[client_address]
#for x in user:
# sock = self.users[u_ref][client_address]
# if sock._closed:
# continue
# sock.close()
# del self.users[x][address]
if len(user) == 0:
del self.users[u_ref]
unauthorize(user = u_ref, address = client_address[0], producer = self.producer)
pass
def start_listen_event(self):
consumer = KafkaConsumer('ones_socket_send_user', 'ones_socket_send_user_send_all', 'ones_socket_add_listener',
group_id='my-group', bootstrap_servers=['192.168.777.555:9092'])
for message in consumer:
if message.topic=="ones_socket_send_user":
u_ref = message.key.decode('utf-8')
user = self.users.get(u_ref)
if user is None:
continue
for x in user:
sock = user[x]
if sock._closed:
continue
sock.sendall(message.value)
elif message.topic=="ones_socket_send_user_send_all":
for x in self.users:
for y in self.users[x]:
sock = self.users[x][y]
if sock._closed:
continue
sock.sendall(message.value)
elif message.topic=="ones_socket_add_listener":
pass
#user = self.users.get(u_ref)
#if user is None:
# continue
#for x in user:
# sock = user[x]
# if sock._closed:
# continue
# sock.sendall(message.value)
def authorize(**data):
user = data['user']
address = data['address']
producer = data['producer']
key = user.encode('utf-8')
value = address.encode('utf-8')
try:
producer.send(f'user_auth', key=key, value=value)
except:
print(f"""send to kafka failed {value}""")
pass
def unauthorize(**data):
user = data['user']
address = data['address']
producer = data['producer']
key = user.encode('utf-8')
value = address.encode('utf-8')
try:
producer.send(f'user_unauth', key=key, value=value)
except:
print(f"""send to kafka failed {value}""")
pass
TCP_IP = '192.168.777.222'
TCP_PORT = 11000
if __name__ == '__main__':
ones_serv = OnesSocketServer()
server = ThreadedTCPServer((TCP_IP, TCP_PORT), OnesSocketServerHandler)
print('starting ones socket server '+str(TCP_IP)+':'+str(TCP_PORT)+' (use <Ctrl-C> to stop)')
server.serve_forever()
установка сокета(прослушка):
while True:
try:
buf = request.recv(256).decode('utf-8')
except:
break
отправка в сокет:
sock.sendall(message.value)
генерация события для шины данных:
producer.send(f'user_auth', key=key, value=value)
подписка,обработка события:
consumer = KafkaConsumer('ones_socket_send_user', 'ones_socket_send_user_send_all', 'ones_socket_add_listener',
group_id='my-group', bootstrap_servers=['192.168.777.555:9092'])
for message in consumer:
if message.topic=="ones_socket_send_user":
Шаг 2. web сервер
from server import app
if __name__ == '__main__':
HOST = '192.168.777.222'
PORT = 8095
app.run(HOST, PORT, threaded=True)
from flask import request
from server import app, producer
import json
HEADERS = {"Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "POST", "Access-Control-Allow-Headers": "Content-Type"}
@app.route('/')
@app.route('/index')
def index():
return [], 200, HEADERS
@app.route('/api_user_event', methods=['GET', 'POST'])
def api_user_event():
if request.method == 'POST':
body = request.data.decode('utf-8-sig')
data = json.loads(body)
user = data.get('u_ref', '')
jdata = data.get('jdata', '')
else:
user = request.args.get('u_ref', '')
jdata = request.args.get('jdata', '')
if len(user) == 0 or len(jdata) == 0:
return 'bad args', 400, HEADERS
key = user.encode('utf-8')
value = jdata.encode('utf-8')
try:
producer.send(f'ones_socket_send_user', key=key, value=value)
except:
print(f"""send to kafka failed {value}""")
return 'send to kafka', 200, HEADERS
@app.route('/api_user_event_send_all', methods=['GET', 'POST'])
def api_user_event_broadcast():
if request.method == 'POST':
body = request.data.decode('utf-8-sig')
data = json.loads(body)
user = data.get('u_ref', '')
jdata = data.get('jdata', '')
else:
user = request.args.get('u_ref', '')
jdata = request.args.get('jdata', '')
if len(user) == 0 or len(jdata) == 0:
return 'bad args', 400, HEADERS
key = user.encode('utf-8')
value = jdata.encode('utf-8')
try:
producer.send(f'ones_socket_send_user_send_all', key=key, value=value)
except:
print(f"""send to kafka failed {value}""")
return 'send to kafka', 200, HEADERS
from flask import Flask
from flask_cors import CORS
from kafka import KafkaProducer, KafkaConsumer
import _thread
def start_listen_event():
consumer = KafkaConsumer('ami_client_event', 'upd_external_event_trigger',group_id='my-group', bootstrap_servers=['192.168.77.555:9092'])
for message in consumer:
try:
producer.send(f'ones_socket_send_user', key=message.key, value=message.value)
except:
print(f"""send to kafka failed {value}""")
app = Flask(__name__)
producer = KafkaProducer(bootstrap_servers=['192.168.777.555:9092'])
CORS(app, support_credentials=True)
import server.views
_thread.start_new_thread(start_listen_event)
регистрация енд поинт(точка входа, путь и тд):
@app.route('/api_user_event', methods=['GET', 'POST'])
def api_user_event(): ...
Шаг 3. Пример генерации из стороннего приложения
if len(user_list) != 0:
d = {}
d['Действия'] = 'External_Record_Log_Event'
d['Данные'] = {}
d['Данные']['d_ref'] = d_ref
d['Данные']['date'] = date
d['event_date'] = datetime.now()
jdata = json.dumps(d, default=common.json_serial)
user_list = self.users_cache.get(d_ref)
for u_ref in user_list:
key = u_ref.encode('utf-8')
value = jdata.encode('utf-8')
try:
self.producer.send(f'upd_external_event_trigger', key=key, value=value)
except:
print(f"""send to kafka failed {jdata}""")
asterisk ami event triger
def event_listener(event, **kwargs):
if event.name in EVENTS_NOT_LISTEN:
return
#print(f"""{str(event)}""")
if event.name == 'Newstate' or event.name == 'Hangup' or event.name == 'VarSet':
if event.name == 'VarSet':
if not event.keys['Variable'] in EVENTS_VARS_LISTEN:
return
for x in USER_EVENTS:
if USER_EVENTS[x]['channel'] in event.keys['Channel']:
d = {}
d['Действия'] = 'Asterisk_Event'
d['Данные'] = event.keys
d['Данные']['name'] = event.name
d['Данные']['id_ext'] = USER_EVENTS[x]['id_ext']
d['Данные']['event_date'] = datetime.now()
jdata = json.dumps(d, default=json_serial)
do_user_event(jdata, USER_EVENTS[x]['user'])
print(f"""{str(event)}""")
Прослушка ами:
client = AMIClient(address=AMI_ADDRESS, port=AMI_PORT)
future = client.login(username=AMI_USER, secret=AMI_SECRET)
client.add_event_listener(event_listener)
Шаг 4. 1с
Процедура ПодключитьСокет(Кнопка)
ПодключитьВнешнююКомпоненту("СокетВК", "Socket", ТипВнешнейКомпоненты.Native);
Сокет = Новый("AddIn.Socket.EventListener");
Сокет.хост = "192.168.777.222";
Сокет.ссылка = "A0BDD89D6773B96411E789535C2BC380";
Сокет.подключить();
события будут приходить в ОбработкаВнешнегоСобытия(Источник, Событие, Данные),
обработку нужно делать там
КонецПроцедуры
Процедура Get(Кнопка)
СтрокаДжейсон = "тест";
пользователь = "A0BDD89D6773B96411E789535C2BC380";
Сервер = "192.168.777.222";
ПортСервера = "8095";
ТаймАут = 1;
Прокси = Новый ИнтернетПрокси(Ложь);
ТелоЗапроса = "jdata="+СтрокаДжейсон+"&u_ref="+пользователь;
Соединение = Новый HTTPСоединение(Сервер, ПортСервера,,,Прокси,ТаймАут);
Запрос = Новый HTTPЗапрос("/api_user_event?"+ТелоЗапроса);
Результат = Соединение.Получить(Запрос);
КонецПроцедуры
Процедура Post(Элемент)
СтрокаДжейсон = "тест";
пользователь = "A0BDD89D6773B96411E789535C2BC380";
Сервер = "192.168.777.222";
ПортСервера = "8095";
ТаймАут = 1;
Прокси = Новый ИнтернетПрокси(Ложь);
структура = новый Структура;
структура.Вставить("jdata", СтрокаДжейсон);
структура.Вставить("u_ref", пользователь);
ТелоЗапроса = JSON.лЗаписатьJSON(структура);
Соединение = Новый HTTPСоединение(Сервер, ПортСервера,,,Прокси,ТаймАут);
Запрос = Новый HTTPЗапрос("/api_user_event");
Запрос.УстановитьТелоИзСтроки(ТелоЗапроса);
Результат = Соединение.ОтправитьДляОбработки(Запрос);
КонецПроцедуры
Процедура Post_500(Элемент)
Для Сч=0 По 500 Цикл
СтрокаДжейсон = "тест";
пользователь = "A0BDD89D6773B96411E789535C2BC380";
Сервер = "192.168.777.222";
ПортСервера = "8095";
ТаймАут = 1;
Прокси = Новый ИнтернетПрокси(Ложь);
структура = новый Структура;
структура.Вставить("jdata", СтрокаДжейсон+Строка(Сч));
структура.Вставить("u_ref", пользователь);
ТелоЗапроса = JSON.лЗаписатьJSON(структура);
Соединение = Новый HTTPСоединение(Сервер, ПортСервера,,,Прокси,ТаймАут);
Запрос = Новый HTTPЗапрос("/api_user_event");
Запрос.УстановитьТелоИзСтроки(ТелоЗапроса);
Результат = Соединение.ОтправитьДляОбработки(Запрос);
КонецЦикла;
КонецПроцедуры
Шаг 5. вк native
Полное создание вк native в статье не рассматривается.
int EventListener::connect_socket(char *host, u_short port, char *subject) {
SOCKET s;
WSADATA wsadata;
int error = WSAStartup(0x0202, &wsadata);
if (error) return error;
if (wsadata.wVersion != 0x0202) {
WSACleanup();
return wsadata.wVersion;
}
char buf[MAXLEN];
std::wstring ws_buf(MAXLEN, 0);
while (1) {
SOCKADDR_IN target;
target.sin_family = AF_INET;
target.sin_port = htons(port);
target.sin_addr.s_addr = inet_addr(host);
s = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (s == INVALID_SOCKET) return s;
try {
if (::connect(s, (SOCKADDR *)&target, sizeof(target)) == SOCKET_ERROR) {
throw std::exception("SOCKET ERROR");
}
}
catch (std::exception &e) {
std::string es(e.what());
std::wstring ew;
copy(es.begin(), es.end(), back_inserter(ew));
callback(std::wstring(L"Socket"),
(std::wstring) L"Connection to server:" + std::to_wstring((long long)port) + L" failed! " + ew);
Sleep(30 * 1000);
continue;
}
callback(std::wstring(L"Socket"), (std::wstring) L"Connected to server:" + std::to_wstring((long long)port));
::send(s, subject, strlen(subject), 0);
buf[0] = 0;
long size;
while (1) {
size = recv(s, buf, MAXLEN, 0);
if (!s || size <= 0 || !strlen(buf)) break;
buf[size] = 0;
size = MultiByteToWideChar(CP_UTF8, 0, &buf[0], sizeof(buf), NULL, 0);
MultiByteToWideChar(CP_UTF8, 0, &buf[0], sizeof(buf), &ws_buf[0], size);
callback(std::wstring(L"Socket"), ws_buf);
}
::closesocket(s);
}
return 0;
}
Шаг 6. 1c web services
Используется для передачи сообщения из бизнес приложения для отправки в 1с (серверная часть)(для дальнейшей обработка бизнес логики в контексте 1с) (создание документа, старт бизнес процесса, заполнение справочника) (конечно можно сделать сделать прямой вызов без использования сообщений, здесь идея в том что на один ответ 1c ws может быть в дальнейшем подписано несколько подписчиков, тем самым все получат сообщение и будут выполнят свою логику далее)
import json
from suds.cache import NoCache
from suds.client import Client
from kafka import KafkaProducer, KafkaConsumer
import _thread
import time
DEF_USER = 'administrator'
DEF_PASSWORD = 'administrator'
DEF_WS = "http://my_ip_base/my_base/ws/my_gate.1cws?wsdl"
upd_event_lists = []
def send(username, password, jdata, method, ws, client):
#try:
# client = Client(ws, username = username, password = password, cache = NoCache(), timeout = 60)
#except Exception as e:
# print(e)
# return
eval_str = 'client.service.%s(%s)' % (method, jdata and "%r" % jdata or '')
try:
res = eval(eval_str)
except Exception as e:
print(e)
return
res = json.loads(res)
client = None
print(f"""eval_str->{res}""")
return res
def upd_loop(ind_upd_list):
while True:
do_upd_loop(ind_upd_list)
def do_upd_loop(ind_upd_list):
t_loop = 0.250
for x in upd_event_lists[ind_upd_list]:
try:
client = Client(ws, username = username, password = password, cache = NoCache(), timeout = 60)
except Exception as e:
print(e)
continue
send(x['username'], x['password'], x['jdata'], x['method'], x['ws'], client)
upd_event_lists[ind_upd_list].remove(x)
time.sleep(t_loop)
upd_event_lists.append([])
upd_event_lists.append([])
upd_event_lists.append([])
upd_event_lists.append([])
#upd_event_lists.append([])
#upd_event_lists.append([])
#upd_event_lists.append([])
#upd_event_lists.append([])
for i, val in enumerate(upd_event_lists):
_thread.start_new_thread(upd_loop, (i,))
#producer = KafkaProducer(bootstrap_servers=['192.168.5.131:9092'])
consumer = KafkaConsumer('ones_ws_event', group_id='my-group', bootstrap_servers=['192.168.5.131:9092'])
for message in consumer:
print(f"""ones_ws_event->{message}""")
method = message.key.decode('utf-8')
data = message.value.decode('utf-8')
jdata = data
username = DEF_USER
password = DEF_PASSWORD
ws = DEF_WS
data = json.loads(data)
if not data.get('event_setting') is None:
event_setting = data.get('event_setting')
username = event_setting.get('username')
password = event_setting.get('password')
ws = event_setting.get('ws')
jdata = event_setting.get('jdata')
lens = []
for i, val in enumerate(upd_event_lists):
lens.append((len(upd_event_lists[i]), i))
min_list = min(lens)
upd_event_lists[min_list[1]].append({'username':username, 'password':password, 'ws':ws, 'jdata':jdata, 'method':method})
#_thread.start_new_thread(send, (username, password, jdata, method, ws,))
#res = send(username, password, jdata, method, ws)
Подключение к ws:
client = Client(ws, username = username, password = password, cache = NoCache(), timeout = 60)
Вызов метода в контексте 1с(метод ws сервиса):
eval_str = 'client.service.%s(%s)' % (method, jdata and "%r" % jdata or '')
try:
res = eval(eval_str)
except Exception as e:
print(e)
return
Конструкция вида: upd_event_lists.append([]), и затем _thread.start_new_thread(upd_loop, (i,)), сделано для того чтобы не завалить ws 1c. Те может случится так что прилетит много запросов и если делать send сразу происходит отказ в установке соединения (Удаленный компьютер отверг запрос на подключение, еррор вин сок и тд.)(проверено примерно на 1000 сообщения поданных вход модулю который выполнял _thread.start_new_thread(send, (username, password, jdata, method, ws,)) для каждого принятого сообщения, сгенерировав их в кафку предварительно с другого модуля, произошел отказ(пользователей было 400 на момент приема сообщений) ну и 1с чета затупила сразу, в плане установки новых соединений с рпхостом(рагент сдох короче)).
import json
from kafka import KafkaProducer, KafkaConsumer
#import _thread
import time
import requests
from requests.auth import HTTPBasicAuth
headers = {'Content-type': 'application/json'}
#'Accept': 'text/plain'}
ONES_USER = ''
ONES_PASSWORD = ''
ONES_HOST = '192.168.555.560'
ONES_HTTP = 'http://192.168.555.560/AA'
consumer = KafkaConsumer('ones_http_event', group_id='my-group', bootstrap_servers=['192.168.5.131:9092'])
producer = KafkaProducer(bootstrap_servers=['192.168.5.131:9092'])
for message in consumer:
print(f"""ones_http_event->{message}""")
method = message.key.decode('utf-8')
data = message.value.decode('utf-8')
jdata = data
username = ONES_USER
password = ONES_PASSWORD
data = json.loads(data)
id_request = None
if not data.get('id_request') is None:
id_request = data.get('id_request')
if not data.get('event_setting') is None:
event_setting = data.get('event_setting')
username = event_setting.get('username')
password = event_setting.get('password')
jdata = event_setting.get('jdata')
if not jdata.get('id_request') is None:
id_request = jdata.get('id_request')
auth_ones = HTTPBasicAuth(username, password)
r = None
try:
r = requests.get(f"""{ONES_HTTP}/hs/gate/v1?method={method}&data={jdata}""", headers=headers, auth=auth_ones)
#data=res_str,
except Exception as e:
print(e)
res = ''
if not r is None:
if r.status_code != 200:
res = r.reason
print(r.reason)
else:
res = r.text
if not id_request is None:
key = id_request.encode('utf-8')
value = res.encode('utf-8')
producer.send('ones_http_event_response', key=key, value=value)
в 1с это http сервис. Это работает быстрее ws, потому что ws это soap over http. Чтобы понять нужно cмотреть в структуру вызовов клиент-сервер SOAP.(получение wsdl, вызов метода soap и тд). http сервис - это голый http - get/post/put/head
Примеры работы:
1c посылает гет во внешнее приложение->внешнее приложение получает событие, обрабытвает генерирует событие на обновление клиента 1с отправляет в кафка->кафка марштузирует tcp серверу->tcp сервер отпраляет в сокет всем клиентам которые зарегистрированы по данному ид->1с обрабатывает внешние событие.
1c посылает гет, пост на веб сервер->веб сервер отправляет в кафка->кафка марштузирует tcp серверу->tcp сервер отпраляет в сокет всем клиентам которые зарегистрированы по данному ид->1с обрабатывает внешние событие.
Пример отправки и получения 500 запросов + в процессе отправки генерация из внешних приложений.
Пример подключения/отключения в процессе отправки.
Деплой и идеология apache kafka в статье не рассматривается(написано тысячи статей).
Для мониторинга apache kafka используется https://github.com/yahoo/kafka-manager.
Метрика:
upd для тех кто мало что понял мануал для dot net от ms, смотреть в общие принципы и идеи