import re
import datetime
from typing import Dict
from elasticsearch import helpers, Elasticsearch
import hashlib
import argparse
Users = {}
Apps = {}
Events = {}
Metas = {}
def ParseDict(file_path):
file = open(file_path, 'r', encoding='utf-8')
for line in file:
line = line.replace('\n','')
res = re.search('\{(1|3|4|5),.+,\d+\},', line)
if res != None:
if res.group(1) == '1':
res = re.search('\{1,.+,\"(.*)\",(\d+)\},', line)
Users[res.group(2)] = res.group(1)
if res.group(1) == '5':
res = re.search('\{5,.+,\"(.*)\",(\d+)\},', line)
Metas[res.group(2)] = res.group(1)
if res.group(1) == '3':
res = re.search('\{3,\"(.*)\",(\d+)\},', line)
Apps[res.group(2)] = res.group(1)
if res.group(1) == '4':
res = re.search('\{4,\"(.*)\",(\d+)\},', line)
Events[res.group(2)] = res.group(1)
def ParseLog(file_path):
file = open(file_path, 'r', encoding='utf-8')
text = file.read()
strEvents = re.findall('\{\d{14},.+?^\},\n', text, re.DOTALL|re.MULTILINE)
i = 0
for match in strEvents:
line = re.sub('\n', '@', match)
res = re.match('\{(\d{14}),(\w),@\{(.+?)\},(\d+),(\d+),(\d+),(\d+),(\d+),(\w),(.*?),(\d+),.+?\},(.*?),(\d+),(\d+),(\d+),(\d+),(\d+),@\{\d+\}@\},', line)
if res == None:
continue
i = i + 1
if i%10000==0:
print(file_path + ' ' + str(i))
Tran = res.group(2).replace('N', '-').replace('U', 'V').replace('R', 'InProc').replace('C', 'X')
Importance = res.group(9).replace('I', 'Информация').replace('E', 'Ошибка').replace('W', 'Предупреждение').replace('N', 'Примечание')
Usr = Users.get(res.group(4))
App = Apps.get(res.group(6))
Event = Events.get(res.group(8))
strDate = res.group(1)
Date = strDate[:4] + '-' + strDate[4:6]+ '-' + strDate[6:8] + 'T' + strDate[8:10] + ':' + strDate[10:12]+ ':' + strDate[12:14]
MetaData = Metas.get(res.group(11))
if sys_event_aliases.get(Event) != None:
Event = sys_event_aliases[Event]
strDoc = str(Date) + str(Event) + str(Usr) + str(MetaData) + str(Tran) + '(' + str(res.group(3)) + ')' + str(App) + str(res.group(10)) + str(Importance) + str(res.group(12)) + str(res.group(16))
hash_object = hashlib.md5(strDoc.encode())
idDoc = hash_object.hexdigest()
doc = {
"_id": idDoc,
"Date": Date,
"Event": Event,
"User": Usr,
"Meta": MetaData,
"Tran": Tran + '(' +res.group(3) + ')',
"App": App,
"Comment": res.group(10),
"Importance": Importance,
"Data": res.group(12),
"Session": res.group(16)
}
yield doc
def create_index(client, suff):
print("Имя индекса: events" + '_' +suff)
client.indices.create(
index="events" + '_' +suff,
body={
"settings": {"number_of_shards": 1, "lifecycle": {"name": "Events_del"}},
"mappings": {
"properties": {
"Date": {"type": "date","format": "yyyy-MM-dd'T'HH:mm:ss"},
"Event": {"type": "keyword"},
#"Day": {"type": "text"},
"User": {"type": "keyword"},
"Meta": {"type": "keyword"},
"Tran": {"type": "text"},
"App": {"type": "keyword"},
"Comment": {"type": "text"},
"Importance": {"type": "keyword"},
"Session": {"type": "text"},
"Data": {"type": "text"},
}
},
},
ignore=400,
)
client.indices.put_alias(index="events" + '_' +suff, name='events_journal')
if __name__ == '__main__':
sys_event_aliases = {"_$Session$_.Start": "Сеанс. Начало", "_$Session$_.Finish": "Сеанс. Завершение", "_$Session$_.Authentication": "Сеанс. Аутентификация",
"_$InfoBase$_.ConfigUpdate": "Информационная база. Изменение конфигурации",
"_$InfoBase$_.DBConfigUpdate": "Информационная база. Изменение конфигурации базы данных",
"_$InfoBase$_.EventLogSettingsUpdate": "Информационная база. Изменение параметров журнала регистрации",
"_$InfoBase$_.InfoBaseAdmParamsUpdate": "Информационная база. Изменение параметров информационной базы",
"_$InfoBase$_.MasterNodeUpdate": "Информационная база. Изменение главного узла",
"_$InfoBase$_.RegionalSettingsUpdate": "Информационная база. Изменение региональных установок",
"_$InfoBase$_.TARInfo": "Тестирование и исправление. Сообщение",
"_$InfoBase$_.TARMess": "Тестирование и исправление. Предупреждение",
"_$InfoBase$_.TARImportant": "Тестирование и исправление. Ошибка",
"_$Data$_.New": "Данные. Добавление", "_$Data$_.Update": "Данные. Изменение",
"_$Data$_.Delete": "Данные. Удаление",
"_$Data$_.TotalsPeriodUpdate": "Данные. Изменение периода рассчитанных итогов",
"_$Data$_.Post": "Данные. Проведение", "_$Data$_.Unpost": "Данные. Отмена проведения",
"_$User$_.New": "Пользователи. Добавление", "_$User$_.Update": "Пользователи. Изменение",
"_$User$_.Delete": "Пользователи. Удаление", "_$Job$_.Start": "Фоновое задание. Запуск",
"_$Job$_.Succeed": "Фоновое задание. Успешное завершение",
"_$Job$_.Fail": "Фоновое задание. Ошибка выполнения",
"_$Job$_.Cancel": "Фоновое задание. Отмена", "_$PerformError$_": "Ошибка выполнения",
"_$Transaction$_.Begin": "Транзакция. Начало",
"_$Transaction$_.Commit": "Транзакция. Фиксация",
"_$Transaction$_.Rollback": "Транзакция. Отмена"}
parser = argparse.ArgumentParser()
parser.add_argument("--i", required=True, type=str, help="Идентификатор индекса")
parser.add_argument("--c", required=True, type=str, help="Каталог с файлами журнала")
parser.add_argument("--t", default=1, type=int,
help="Временное смещение файлов. Например, когда на одном сервере обрабатываются логи разных временных зон")
parser.add_argument("--p", default='h', choices=["h", "d"], required=True, type=str,
help="Периодичность журнала. h - почасовой; d - подневной")
parser.add_argument("--f", default='', help="Имя Файла журнала, если требуется загрузить конктретный")
parser.add_argument("--es", required=True, default='', help="ES server")
args = parser.parse_args()
path_log = args.c
index_id = args.i
hour_delta = args.t
periodicity = args.p
file_name = args.f
ServerPort = args.es
print('Args:')
print(args)
print('*')
print('Каталог: ' + path_log)
file_dict = path_log + '/1Cv8.lgf'
now_date = datetime.datetime.now()
if file_name == '':
if periodicity == 'h':
last_date = now_date - datetime.timedelta(hours=hour_delta)
file_name = last_date.strftime("%Y%m%d%H0000.lgp")
else:
last_date = now_date - datetime.timedelta(days=1)
file_name = last_date.strftime("%Y%m%d000000.lgp")
file_exec = path_log + '/' + file_name
print('Обрабатывается: ' + file_exec)
daySuff = file_name[0:8]
print('DaySuff ' + daySuff)
print('Start parse dict')
ParseDict(file_dict)
print('End parse dict')
es = Elasticsearch(ServerPort)
print('Create index ' + index_id + '_' + daySuff)
create_index(es, index_id + '_' + daySuff)
print('Create index end')
print('Начало загрузки данных:')
failed = 0
success = 0
for ok, action in helpers.streaming_bulk(
client=es, index="events_" + index_id + '_' + daySuff, actions=ParseLog(file_exec),
):
if not ok:
failed += 1
else:
success += 1
print('Загрузка завершена')
print('success: ' + str(success))
print('failed: ' + str(failed))