Отправка сообщений о событиях по HTTP

Содержание раздела
  1. HTTP-метод обработки сообщений о событиях
  2. Управление отправкой сообщений
  3. Порядок отправки сообщений
    1. Немедленная отправка или накопление сообщений
    2. Отправка сообщений подписчикам
    3. Буфер неотправленных сообщений
  4. Формат сообщений
    1. Параметры сообщения
    2. Параметры массива eventParams
  5. Примеры сообщений

Система поддерживает отправку сообщений о событиях по HTTP. Сообщения отправляются внешним сервисам-подписчикам (далее — подписчики), которые заданы в конфигурации ноды, с использованием указанного в конфигурации HTTP-метода.

Список и параметры подписчиков настраиваются в секции конфигурации postStatusEvent.subscriberGroup. Для каждого подписчика может быть указано несколько узлов на случай недоступности некоторых из них.

HTTP-метод обработки сообщений о событиях

HTTP-метод обработки сообщений о событиях должен быть реализован на стороне подписчиков. Подписчик выделяет URL-адрес, обращаясь по которому Prostore будет отправлять сообщения о событиях, а также определяет порядок обработки сообщений и формирования ответа.

HTTP-метод должен поддерживать прием сообщений о событиях в формате, в котором их формирует система Prostore. Спецификацию OpenAPI для HTTP-метода см. ниже или по ссылке.

Спецификация HTTP-метода обработки сообщений о событиях
openapi: 3.0.0
info:
  title: StatusEventAPI
  version: 1.0.2
  description: Status Event Rest API
  contact:
    name: Alexander Senko
servers:
  - url: 'http://{host}:{port}'
    variables:
      host:
        default: localhost
      port:
        default: '8080'
paths:
  /custom-path/custom-endpointS:
    post:
      summary: Status event
      operationId: post-status-event
      responses:
        '200':
          description: OK
        '500':
          description: Internal Server Error
      description: Custom status event endpoint
      tags:
        - Status event
      requestBody:
        content:
          application/json:
            schema:
              type: array
              items:
                allOf:
                  - $ref: '#/components/schemas/status-event-common'
                  - $ref: '#/components/schemas/status-event-params'
            examples:
              DELTA_OPEN:
                value:
                  - datamart: testdb
                    datetime: '2019-08-24 14:15:22.123'
                    event: DELTA_OPEN
                    eventLogId: 1a8ae2c9-9963-4453-848b-31a196c58c03
                    eventParams:
                      deltaNum: 5
              DELTA_CLOSE:
                value:
                  - datamart: testdb
                    datetime: '2019-08-24 14:15:23.123'
                    event: DELTA_CLOSE
                    eventLogId: 1a8ae2c9-9963-4453-848b-31a196c58c03
                    eventParams:
                      deltaNum: 5
                      deltaDate: '2019-08-24 14:15:22'
              DELTA_CANCEL:
                value:
                  - datamart: testdb
                    datetime: '2019-08-24 14:15:22.123'
                    event: DELTA_CANCEL
                    eventLogId: 1a8ae2c9-9963-4453-848b-31a196c58c03
                    eventParams:
                      deltaNum: 6
              DATAMART_SCHEMA_CHANGED:
                value:
                  - datamart: testdb
                    datetime: '2019-08-24 14:15:23.123'
                    event: DATAMART_SCHEMA_CHANGED
                    eventLogId: 1a8ae2c9-9963-4453-848b-31a196c58c03
                    eventParams:
                      datamart: testdb
                      entityName: tbl
                      entityDefinition: TABLE.DEFAULT
                      changeDateTime: '2019-08-24 14:15:22.123'
              WRITE_OK:
                value:
                  - datamart: testdb
                    datetime: '2019-08-24 14:15:23.123'
                    event: WRITE_OK
                    eventLogId: 1a8ae2c9-9963-4453-848b-31a196c58c03
                    eventParams:
                      entity: tbl
                      cn: 21
                      ts: 1566656122000000
                      rowsAffected: 10
              WRITE_CANCEL:
                value:
                  - datamart: testdb
                    datetime: '2019-08-24 14:15:22.123'
                    event: WRITE_CANCEL
                    eventLogId: 1a8ae2c9-9963-4453-848b-31a196c58c03
                    eventParams:
                      entity: tbl
                      cn: 22
              ARRAY:
                value:
                  - datamart: testdb
                    datetime: '2019-08-24 14:15:22.123'
                    event: DELTA_OPEN
                    eventLogId: 1a8ae2c9-9963-4453-848b-31a196c58c03
                    eventParams:
                      deltaNum: 5
                  - datamart: testdb
                    datetime: '2019-08-24 14:15:23.123'
                    event: DELTA_CLOSE
                    eventLogId: 1a8ae2c9-9963-4453-848b-31a196c58c03
                    eventParams:
                      deltaNum: 5
                      deltaDate: '2019-08-24 14:15:22'
                  - datamart: testdb
                    datetime: '2019-08-24 14:15:23.123'
                    event: WRITE_OK
                    eventLogId: 1a8ae2c9-9963-4453-848b-31a196c58c03
                    eventParams:
                      entity: tbl
                      cn: 21
                      ts: 1566656122000000
                      rowsAffected: 10
components:
  schemas:
    status-event-common:
      title: status-event-common
      type: object
      description: Model for status event common information
      properties:
        datamart:
          type: string
        datetime:
          type: string
          format: date-time
        event:
          type: string
          enum:
            - DELTA_OPEN
            - DELTA_CLOSE
            - DELTA_CANCEL
            - DATAMART_SCHEMA_CHANGED
            - WRITE_OK
            - WRITE_CANCEL
        eventLogId:
          type: string
          format: uuid
      required:
        - datamart
        - datetime
        - event
        - eventLogId
    status-event-params:
      title: status-event-params
      type: object
      description: Model for status event params
      properties:
        eventParams:
          anyOf:
            - $ref: '#/components/schemas/status-event-delta-open'
            - $ref: '#/components/schemas/status-event-delta-close'
            - $ref: '#/components/schemas/status-event-delta-cancel'
            - $ref: '#/components/schemas/status-event-datamart-schema-changed'
            - $ref: '#/components/schemas/status-event-write-ok'
            - $ref: '#/components/schemas/status-event-write-cancel'
          type: object
      required:
        - eventParams
    status-event-delta-open:
      title: status-event-delta-open
      type: object
      description: Model for DELTA_OPEN status event specific params
      properties:
        deltaNum:
          type: integer
      required:
        - deltaNum
    status-event-delta-close:
      title: status-event-delta-close
      type: object
      description: Model for DELTA_CLOSE status event specific params
      properties:
        deltaNum:
          type: integer
        deltaDate:
          type: string
          format: date-time
      required:
        - deltaNum
        - deltaDate
    status-event-delta-cancel:
      title: status-event-delta-cancel
      type: object
      description: Model for DELTA_CANCEL status event specific params
      properties:
        deltaNum:
          type: integer
      required:
        - deltaNum
    status-event-datamart-schema-changed:
      title: status-event-datamart-schema-changed
      type: object
      description: Model for DATAMART_SCHEMA_CHANGED status event specific params
      properties:
        datamart:
          type: string
        entityName:
          type: string
        entityDefinition:
          type: string
        changeDateTime:
          type: string
          format: date-time
      required:
        - datamart
        - entityName
        - entityDefinition
        - changeDateTime
    status-event-write-ok:
      title: status-event-write-ok
      type: object
      description: Model for WRITE_OK status event specific params
      properties:
        entity:
          type: string
        cn:
          type: integer
        ts:
          type: integer
        rowsAffected:
          type: integer
      required:
        - entity
        - cn
        - ts
        - rowsAffected
    status-event-write-cancel:
      title: status-event-write-cancel
      type: object
      description: Model for WRITE_CANCEL status event specific params
      properties:
        entity:
          type: string
        cn:
          type: integer
      required:
        - entity
        - cn
tags:
  - name: Status event
    description: defines status event endpoints

Управление отправкой сообщений

При необходимости можно отключить отправку сообщений по следующим видам событий:

  • завершение и отмена операций записи — отправка сообщений отключается/включается параметром конфигурации POST_STATUS_EVENT_SUBSCRIBER_GROUP_WRITE_OPERATIONS_ENABLED;
  • создание и удаление внешних таблиц в логической схеме данных — отправка сообщений отключается/включается параметром конфигурации POST_STATUS_EVENT_SUBSCRIBER_GROUP_EXTERNAL_DDL_ENABLED.

Порядок отправки сообщений

Немедленная отправка или накопление сообщений

Сообщения о событиях могут отправляться сразу или заданное время накапливаться в пачку перед отправкой. Время накопления сообщений задается параметром конфигурации POST_STATUS_EVENT_SUBSCRIBER_GROUP_BUFFERING_PERIOD_MS. По умолчанию это время равно 0, и сообщения отправляются сразу после их формирования, без ожидания других сообщений.

В режиме немедленной отправки сообщений система может отправить несколько сообщений сразу, если за время обработки одного события успели произойти другие регистрируемые события.

Отправка сообщений подписчикам

Сообщения о событиях отправляются всем настроенным подписчикам. Каждое сообщение или пачка сообщений (в зависимости от настроек накопления сообщений) отправляется случайному узлу каждого подписчика.

Если выбранный узел подписчика недоступен, система перебирает сконфигурированные узлы этого подписчика по очереди, пытаясь отправить сообщения кому-нибудь из них.

Сообщения отправляются последовательно. Следующее сообщение или пачка сообщений отправляется после успешной отправки предыдущего сообщения (пачки).

Буфер неотправленных сообщений

Если сообщения не удалось доставить ни одному из узлов подписчика, система добавляет их в буфер неотправленных сообщений и периодически пытается отправить их повторно. Интервал между попытками повторной отправки задается параметром конфигурации POST_STATUS_EVENT_SUBSCRIBER_GROUP_ENDPOINT_RETRY_TIMEOUT_MS, по умолчанию он равен 5 секундам.

Размер буфера неотправленных сообщений задается параметром конфигурации POST_STATUS_EVENT_SUBSCRIBER_GROUP_BUFFER_SIZE. По умолчанию буфер вмещает 1000 сообщений. При переполнении буфера старые сообщения перезаписываются более свежими.

Формат сообщений

Сообщения о событиях имеют следующий формат:

{
  "datamart": "<logical_db>",
  "datetime": "<event_timestamp>",
  "event": "<event_code>",
  "eventLogId": "<event_UUID>",
  "eventParams": {
    ...
  }
}

Параметры сообщения

logical_db

Имя логической базы данных, в которой произошло событие.

event_timestamp

Дата и время события в формате YYYY-MM-DD hh:mm:ss[.microseconds]. Подробнее о формате см. в разделе Строковый формат даты и времени в ответах.

event_code

Код события. Возможные значения:

  • DATAMART_SCHEMA_CHANGED — выполнение DDL-запроса (независимо от того, изменил запрос логическую схему данных или нет);
  • DELTA_OPEN — открытие дельты;
  • DELTA_CLOSE — закрытие дельты;
  • DELTA_CANCEL — откат дельты;
  • WRITE_OK — завершение операции записи;
  • WRITE_CANCEL — отмена операции записи.
event_UUID

Уникальный идентификатор события.

eventParams

Массив параметров события. Набор параметров в массиве зависит от вида события, как показано в таблице ниже.

Событие Массив параметров eventParams
Выполнение DDL-запроса {"datamart": "<logical_db>", "entityName": "<entity_name>", "entityDefinition": "<entity_definition>", changeDateTime": "<ddl_timestamp>"}
Открытие дельты {"deltaNum": <delta_number>}
Закрытие дельты {"deltaNum": <delta_number>, "deltaDate": "<delta_commit_timestamp>"}
Откат дельты {"deltaNum": <delta_number>}
Завершение операции записи {"entity": "<entity_name>", "cn": <sys_cn>, "ts": <ts_unix_time>, "rowsAffected": <rows_affected_quantity>}
Отмена операции записи {"entity": "<entity_name>", "cn": <sys_cn>}

Параметры массива eventParams

logical_db

Имя логической базы данных, в которой выполнен DDL-запрос.

entity_name

Имя логической сущности, к которой относится DDL-запрос или операция записи.

entity_definition

Определение логической сущности. Возможные значения:

  • TABLE.DEFAULT — обычная логическая таблица;
  • TABLE.PROXY — прокси-таблица;
  • TABLE.PARTITION — партиция;
  • TABLE.PARTITIONED — партиционированная таблица;
  • MATERIALIZED VIEW.DEFAULT — материализованное представление;
  • VIEW.DEFAULT — обычное логическое представление (не относящее к категории простых*);
  • VIEW.PLAIN.FILTERED — простое* представление с условием;
  • VIEW.PLAIN.UNFILTERED — простое* представление без условия;
  • READABLE EXTERNAL TABLE.CORE:<datasource_name> — readable-таблица для работы со standalone-таблицей, расположенной в датасорсе <datasource_name>;
  • READABLE EXTERNAL TABLE.KAFKA — readable-таблица для загрузки данных из брокера сообщений Kafka;
  • WRITABLE EXTERNAL TABLE.CORE:<datasource_name> — writable-таблица для работы со standalone-таблицей, расположенной в датасорсе <datasource_name>;
  • UPLOAD EXTERNAL TABLE.KAFKA — внешняя таблица загрузки;
  • DOWNLOAD EXTERNAL TABLE.KAFKA — внешняя таблица выгрузки.

* Простое логическое представление — представление, основанное на запросе к одной таблице и принадлежащее той же логической базе данных, что и таблица. Такое представление может включать условие без подзапросов и соединений.

ddl_timestamp

Дата и время выполнения DDL-запроса в формате YYYY-MM-DD hh:mm:ss[.microseconds]. Подробнее о формате см. в разделе Строковый формат даты и времени в ответах.

delta_number

Номер дельты.

delta_commit_timestamp

Дата и время закрытия дельты в формате YYYY-MM-DD hh:mm:ss[.microseconds]. Подробнее о формате см. в разделе Строковый формат даты и времени в ответах.

sys_cn

Номер операции записи. Для прокси-таблиц и standalone-таблиц указывается значение null.

ts_unix_time

Дата и время выполнения или отмены операции записи в Unix-формате, равное целому числу микросекунд с 00:00:00 UTC 1 января 1970 года. Для прокси-таблиц и standalone-таблиц указывается значение null.

rows_affected_quantity

Количество добавленных, измененных и удаленных операцией строк.

Примеры сообщений

Пример сообщения о выполнении DDL-запроса:

[
  {
    "datamart": "marketing",
    "datetime": "2024-05-10 11:10:46.104",
    "event": "DATAMART_SCHEMA_CHANGED",
    "eventLogId": "1a8ae2c9-9963-4453-848b-31a196c58c03",
    "eventParams": {
      "datamart": "marketing",
      "entityName": "sales",
      "entityDefinition": "TABLE.DEFAULT",
      "changeDateTime": "2024-05-10 11:10:46.085"
    }
  }
]

Пример сообщения о завершении операции записи:

[
  {
    "datamart": "marketing",
    "datetime": "2024-08-24 14:15:23.278",
    "event": "WRITE_OK",
    "eventLogId": "1a8ae2c9-9963-4453-848b-31a196c58c03",
    "eventParams": {
      "entity": "sales",
      "cn": 21,
      "ts": 1724508923278000,
      "rowsAffected": 10
    }
  }
]

Пример сообщения о закрытии дельты:

[
  {
    "datamart": "marketing",
    "datetime": "2024-08-24 14:20:06.755",
    "event": "DELTA_CLOSE",
    "eventLogId": "1a8ae2c9-9963-4453-848b-31a196c58c03",
    "eventParams": {
      "deltaNum": 5,
      "deltaDate": "2024-08-24 14:20:06"
    }
  }
]