Формат сообщений в сигнальном топике Kafka

Содержание раздела
  1. Виды записываемых сообщений
  2. Управление записью сообщений
  3. Формат ключа сообщения
    1. Параметры ключа сообщения
    2. Примеры ключей сообщения
  4. Формат тела сообщения
    1. Параметры тела сообщения
    2. Примеры тела сообщения

Система регистрирует события, записывая сообщения о них в сигнальный топик Kafka.

Сообщения записываются в JSON-формате в топик, имя которого задано с помощью параметра конфигурации KAFKA_STATUS_EVENT_TOPIC. Каждое сообщение состоит из ключа и тела.

Рекомендуется настроить автоматическую очистку сигнального топика в конфигурации брокера Kafka.

Виды записываемых сообщений

Система записывает сообщения по следующим событиям:

Управление записью сообщений

Запись некоторых видов сообщений можно отключать и включать с помощью следующих параметров конфигурации:

  • KAFKA_STATUS_EVENT_WRITE_OPERATIONS_ENABLED — управляет записью сообщений о завершении и отмене операций записи;
  • KAFKA_STATUS_EVENT_EXTERNAL_DDL_ENABLED — управляет записью событий о создании и удалении внешних таблиц в логической схеме данных.

Формат ключа сообщения

Ключ сообщения в сигнальном топике Kafka имеет формат, показанный ниже. Описание параметров ключа сообщения см. в секции ниже.

{
  "datamart": "<logical_db>",
  "datetime": "<event_datetime>",
  "event": "<event_code>",
  "eventLogId": "<event_UUID>"
}

Параметры ключа сообщения

logical_db

Имя логической базы данных, в которой произошло событие.

event_datetime

Дата и время события в формате YYYY-MM-DD hh:mm:ss[.microseconds]. Подробнее о формате см. в разделе Строковый формат даты и времени в ответах.

event_code

Код события. Возможные значения:

  • DATAMART_SCHEMA_CHANGED — изменение логической схемы,
  • DELTA_OPEN — открытие дельты,
  • DELTA_CLOSE — закрытие дельты,
  • DELTA_CANCEL — откат дельты,
  • WRITE_OK — завершение операции записи,
  • WRITE_CANCEL — отмена операции записи.
event_UUID

Уникальный идентификатор события.

Примеры ключей сообщения

Пример ключа в сообщении о закрытии дельты:

{
  "datamart": "marketing", 
  "datetime": "2022-02-28 08:02:40",
  "event": "DELTA_CLOSE",
  "eventLogId": "6b80c392-3198-492f-a631-6a684e521b79"
}

Пример ключа в сообщении о завершении операции записи:

{
  "datamart": "marketing", 
  "datetime": "2022-02-28 09:15:43.532564",
  "event": "WRITE_OK",
  "eventLogId": "8b8dfa17-6605-4874-8ebf-1b8b47c201f3"
}

Пример ключа в сообщении об отмене операции записи:

{
  "datamart": "marketing", 
  "datetime": "2022-02-28 09:51:11.532",
  "event": "WRITE_CANCEL",
  "eventLogId": "3d963cbc-8782-498e-923d-602d7ccf7c2d"
}

Формат тела сообщения

Тело сообщения в сигнальном топике Kafka имеет формат, описанный в таблице ниже. Описание параметров тела сообщения см. в секции ниже.

Событие Формат тела сообщения
Изменение логической схемы {"datamart": "<logical_db>", "entityName": "<entity_name>", "entityDefinition": "<entity_definition>", changeDateTime": "<change_datetime>"}
Открытие дельты {"deltaNum": <delta_number>}
Закрытие дельты {"deltaNum": <delta_number>, "deltaDate": "<delta_commit_datetime>"}
Откат дельты {"deltaNum": <delta_number>}
Завершение операции записи {"entity": "<entity_name>", "cn": <sys_cn>, "ts": "<ts_datetime>", "rowsAffected": <rows_affected_quantity>}
Отмена операции записи {"entity": "<entity_name>", "cn": <sys_cn>}

Параметры тела сообщения

logical_db

Имя логической базы данных, к которой относится изменение.

entity_name

Имя логической сущности, к которой относится изменение или операция записи.

entity_definition

Определение логической сущности. Возможные значения:

  • TABLE.DEFAULT — обычная логическая таблица;
  • TABLE.PROXY — прокси-таблица;
  • TABLE.PARTITION — партиция;
  • TABLE.PARTITIONED — партиционированная таблица;
  • MATERIALIZED VIEW.DEFAULT — материализованное представление;
  • VIEW.DEFAULT — логическое представление;
  • READABLE EXTERNAL TABLE.CORE:<datasource_alias> — readable-таблица для работы со standalone-таблицей, расположенной в датасорсе <datasource_alias>;
  • READABLE EXTERNAL TABLE.KAFKA — readable-таблица для загрузки данных из брокера сообщений Kafka;
  • WRITABLE EXTERNAL TABLE.CORE:<datasource_alias> — writable-таблица для работы со standalone-таблицей, расположенной в датасорсе <datasource_alias>;
  • UPLOAD EXTERNAL TABLE.KAFKA — внешняя таблица загрузки;
  • DOWNLOAD EXTERNAL TABLE.KAFKA — внешняя таблица выгрузки.
change_datetime

Дата и время изменения в формате YYYY-MM-DD hh:mm:ss.

delta_number

Номер дельты.

delta_commit_datetime

Дата и время закрытия дельты в формате YYYY-MM-DD hh:mm:ss.SSSSSS.

sys_cn

Номер операции записи. Для прокси-таблиц и standalone-таблиц указывается значение null.

ts_datetime

Дата и время выполнения или отмены операции записи в формате YYYY-MM-DD hh:mm:ss.SSSSSS. Для прокси-таблиц и standalone-таблиц указывается значение null.

rows_affected_quantity

Количество добавленных, измененных и удаленных операцией строк.

Примеры тела сообщения

Пример тела в сообщении об изменении логической схемы:

{
  "changeDateTime": "2022-02-24 15:25:47",
  "datamart": "marketing"
}

Пример тела в сообщении о закрытии дельты:

{
  "deltaNum": 8,
  "deltaDate": "2022-02-28 08:02:40.420923"
}