Формат сообщений в сигнальном топике Kafka
Содержание раздела
Система регистрирует события, записывая сообщения о них в сигнальный топик Kafka.
Сообщения записываются в JSON-формате в топик, имя которого задано с помощью параметра конфигурации KAFKA_STATUS_EVENT_TOPIC
. Каждое сообщение состоит из ключа и тела.
Рекомендуется настроить автоматическую очистку сигнального топика в конфигурации брокера Kafka.
Виды записываемых сообщений
Система записывает сообщения по следующим событиям:
- изменение логической схемы данных;
- открытие дельты;
- закрытие дельты;
- откат дельты;
- завершение операции записи;
- отмена операции записи.
Управление записью сообщений
Запись некоторых видов сообщений можно отключать и включать с помощью следующих параметров конфигурации:
KAFKA_STATUS_EVENT_WRITE_OPERATIONS_ENABLED
— управляет записью сообщений о завершении и отмене операций записи;KAFKA_STATUS_EVENT_EXTERNAL_DDL_ENABLED
— управляет записью событий о создании и удалении внешних таблиц в логической схеме данных.
Формат ключа сообщения
Ключ сообщения в сигнальном топике Kafka имеет формат, показанный ниже. Описание параметров ключа сообщения см. в секции ниже.
{
"datamart": "<logical_db>",
"datetime": "<event_datetime>",
"event": "<event_code>",
"eventLogId": "<event_UUID>"
}
Параметры ключа сообщения
logical_db
-
Имя логической базы данных, в которой произошло событие.
event_datetime
-
Дата и время события в формате
YYYY-MM-DD hh:mm:ss[.microseconds]
. Подробнее о формате см. в разделе Строковый формат даты и времени в ответах. event_code
-
Код события. Возможные значения:
DATAMART_SCHEMA_CHANGED
— изменение логической схемы,DELTA_OPEN
— открытие дельты,DELTA_CLOSE
— закрытие дельты,DELTA_CANCEL
— откат дельты,WRITE_OK
— завершение операции записи,WRITE_CANCEL
— отмена операции записи.
event_UUID
-
Уникальный идентификатор события.
Примеры ключей сообщения
Пример ключа в сообщении о закрытии дельты:
{
"datamart": "marketing",
"datetime": "2022-02-28 08:02:40",
"event": "DELTA_CLOSE",
"eventLogId": "6b80c392-3198-492f-a631-6a684e521b79"
}
Пример ключа в сообщении о завершении операции записи:
{
"datamart": "marketing",
"datetime": "2022-02-28 09:15:43.532564",
"event": "WRITE_OK",
"eventLogId": "8b8dfa17-6605-4874-8ebf-1b8b47c201f3"
}
Пример ключа в сообщении об отмене операции записи:
{
"datamart": "marketing",
"datetime": "2022-02-28 09:51:11.532",
"event": "WRITE_CANCEL",
"eventLogId": "3d963cbc-8782-498e-923d-602d7ccf7c2d"
}
Формат тела сообщения
Тело сообщения в сигнальном топике Kafka имеет формат, описанный в таблице ниже. Описание параметров тела сообщения см. в секции ниже.
Событие | Формат тела сообщения |
---|---|
Изменение логической схемы | {"datamart": "<logical_db>", "entityName": "<entity_name>", "entityDefinition": "<entity_definition>", changeDateTime": "<change_datetime>"} |
Открытие дельты | {"deltaNum": <delta_number>} |
Закрытие дельты | {"deltaNum": <delta_number>, "deltaDate": "<delta_commit_datetime>"} |
Откат дельты | {"deltaNum": <delta_number>} |
Завершение операции записи | {"entity": "<entity_name>", "cn": <sys_cn>, "ts": "<ts_datetime>", "rowsAffected": <rows_affected_quantity>} |
Отмена операции записи | {"entity": "<entity_name>", "cn": <sys_cn>} |
Параметры тела сообщения
logical_db
-
Имя логической базы данных, к которой относится изменение.
entity_name
-
Имя логической сущности, к которой относится изменение или операция записи.
entity_definition
-
Определение логической сущности. Возможные значения:
TABLE.DEFAULT
— обычная логическая таблица;TABLE.PROXY
— прокси-таблица;TABLE.PARTITION
— партиция;TABLE.PARTITIONED
— партиционированная таблица;MATERIALIZED VIEW.DEFAULT
— материализованное представление;VIEW.DEFAULT
— логическое представление;READABLE EXTERNAL TABLE.CORE:<datasource_alias>
— readable-таблица для работы со standalone-таблицей, расположенной в датасорсе<datasource_alias>
;READABLE EXTERNAL TABLE.KAFKA
— readable-таблица для загрузки данных из брокера сообщений Kafka;WRITABLE EXTERNAL TABLE.CORE:<datasource_alias>
— writable-таблица для работы со standalone-таблицей, расположенной в датасорсе<datasource_alias>
;UPLOAD EXTERNAL TABLE.KAFKA
— внешняя таблица загрузки;DOWNLOAD EXTERNAL TABLE.KAFKA
— внешняя таблица выгрузки.
change_datetime
-
Дата и время изменения в формате
YYYY-MM-DD hh:mm:ss
. delta_number
-
Номер дельты.
delta_commit_datetime
-
Дата и время закрытия дельты в формате
YYYY-MM-DD hh:mm:ss.SSSSSS
. sys_cn
-
Номер операции записи. Для прокси-таблиц и standalone-таблиц указывается значение
null
. ts_datetime
-
Дата и время выполнения или отмены операции записи в формате
YYYY-MM-DD hh:mm:ss.SSSSSS
. Для прокси-таблиц и standalone-таблиц указывается значениеnull
. rows_affected_quantity
-
Количество добавленных, измененных и удаленных операцией строк.
Примеры тела сообщения
Пример тела в сообщении об изменении логической схемы:
{
"changeDateTime": "2022-02-24 15:25:47",
"datamart": "marketing"
}
Пример тела в сообщении о закрытии дельты:
{
"deltaNum": 8,
"deltaDate": "2022-02-28 08:02:40.420923"
}