Команда RESTORE_FROM_KAFKA

Содержание раздела
  1. Восстанавливаемые сущности
  2. Восстанавливаемые данные
  3. Аутентификация
  4. Порядок работы команды
    1. Загрузка инкрементов из топика
    2. Создание сущностей и загрузка данных
    3. Обработка ошибок
  5. Использование команды
    1. Проверка целостности бэкапа
    2. Восстановление сбойной или удаленной логической БД
    3. Создание реплики логической БД
    4. Обновление реплики логической БД
    5. Отмена изменений в случае сбоя
  6. Синтаксис
    1. Параметры
    2. Ключевые слова
      1. --connect
      2. --topic
      3. --kafkaBrokersCore
      4. --kafkaBrokers
      5. --delta
      6. --partitionsCount
      7. --threadsCount
      8. --messageLimit
      9. --readonly
      10. --deltaFrom
      11. --kafkaRetries
  7. Ограничения
  8. Примеры
    1. Развертывание логической БД по состоянию на последнюю дельту
    2. Развертывание логической БД по состоянию на последнюю дельту с указанием токена
    3. Развертывание логической БД по состоянию на указанную дельту
    4. Пошаговый пример развертывания логической БД из бэкапа
      1. Проверка топика с бэкапом
      2. Развертывание логической БД из бэкапа
      3. Проверка развернутой логической БД
  9. Варианты вывода команды
    1. Вывод при успешной проверке бэкапа
    2. Вывод при успешном развертывании логической БД из бэкапа
    3. Вывод при неуспешном развертывании логической БД из бэкапа

Команда RESTORE_FROM_KAFKA разворачивает логическую базу данных из бэкапа, сохраненного командой BACKUP_TO_KAFKA в топике Kafka. Логическую базу данных можно развернуть по состоянию на указанную или последнюю дельту бэкапа.

Примеры использования команды:

  • восстановление сбойной или удаленной логической БД;
  • создание реплики логической БД на другом сервере;
  • дополнение реплики логической БД, развернутой ранее, новыми данными.

По умолчанию команда загружает бэкап из топика того брокера сообщений Kafka, который задан в конфигурации с помощью параметра ZOOKEEPER_KAFKA_ADDRESS. Чтобы загрузить бэкап из другого брокера сообщений, укажите в команде ключевое слово kafkaBrokers с адресом нужного брокера.

Для запуска команды на хосте должна быть установлена среда исполнения Java (JRE) или набор инструментов для разработки на Java (JDK).

Восстанавливаемые сущности

Команда восстанавливает следующие сущности:

Команда не восстанавливает внешние и standalone-таблицы исходной логической БД, так как они отсутствуют в бэкапе. При необходимости вы можете их восстановить вручную после успешного завершения работы команды.

Восстанавливаемые данные

В зависимости от заданных параметров команда восстанавливает данные по состоянию на последнюю или указанную дельту с сохранением всей истории изменений этих данных.

Номера и метки времени дельт, развернутых из бэкапа, соответствуют исходной логической БД. Соответствие номеров операций записи (CN) в дельтах не гарантируется.

Если данные бэкапа разворачиваются в непустой логической БД, это логическая БД наполняется только недостающими сущностями и дельтами. Текущие сущности и дельты в логической БД не перезаписываются, даже если отличаются от сохраненных в бэкапе.

Аутентификация

Команда поддерживает передачу JWT-токена для последующей аутентификации в системе.

Если в системе включена аутентификация запросов, необходимо указать информацию о токене в составе строки подключения команды. Подробнее о формате строки подключения см. в секции Синтаксис.

Порядок работы команды

Команда последовательно загружает отдельные части — инкременты — из топика в логическую базу данных. Каждый такой инкремент содержит изменения данных с одной меткой времени и все записи журнала.

Загрузка инкрементов из топика

Инкременты, загружаемые из топика с бэкапом, записываются в промежуточные топики. Команда не фиксирует прочитанное смещение в топике с бэкапом, позволяя многократно загружать бэкап из одного и того же топика. По мере наполнения промежуточных топиков команда загружает инкременты из них в логическую БД. Перегрузка данных через отдельные топики исключает влияние сбоев, которые могут возникнуть при развертывании, на целостность бэкапа в топике Kafka.

Промежуточные топики разделяются на количество партиций, равное значению ключевого слова partitionsCount.

Создание сущностей и загрузка данных

При загрузке инкрементов в логическую БД команда создает необходимые логические сущности и загружает недостающие дельты.

С каждой дельтой разворачивается вся схема данных, актуальная на момент создания инкремента. Для обычных логических таблиц и партиций из бэкапа загружается схема данных и сами данные, для партиционированных таблиц, логических и материализованных представлений — только схема данных. После создания материализованные представления автоматически наполняются данными из развернутых таблиц.

Операции записи, выполненные в исходной логической БД вне дельт, при загрузке инкрементов в логическую БД включаются в состав ближайшей дельты, следующей за ними. Номера и метки времени дельт сохраняются такими же, какими они были в исходной логической БД.

Обработка ошибок

Если во время работы команды возникает ошибка загрузки данных или в топике отсутствует дельта из интервала дельт, выбранных к развертыванию, команда останавливает работу и выдает ошибку. В тексте ошибки указывается, какие дельты удалось загрузить и на какой дельте возникла ошибка.

Использование команды

Проверка целостности бэкапа

Чтобы проверить корректность и полноту бэкапа в топике Kafka без его фактического развертывания, выполните команду RESTORE_FROM_KAFKA с ключевым словом readonly.

Восстановление сбойной или удаленной логической БД

Чтобы восстановить сбойную или удаленную логическую БД, выполните команду RESTORE_FROM_KAFKA. В команде укажите топик, в котором находится бэкап логической БД, с помощью ключевого слова topic.

Создание реплики логической БД

Чтобы создать реплику логической БД на том же сервере, не прерывая работу исходной логической БД:

  1. Разверните еще один экземпляр системы с новым окружением.
  2. В новом окружении создайте реплику логической БД, выполнив команду RESTORE_FROM_KAFKA. В качестве имени логической БД-реплики укажите имя исходной логической БД, для которой создавался бэкап.

Обновление реплики логической БД

Чтобы обновить развернутую ранее реплику логической БД, выполните команду RESTORE_FROM_KAFKA. В команде укажите имя существующей логической БД-реплики и топик, который содержит свежую версию бэкапа исходной логической БД.

При успешном выполнении действий команда дополнит развернутую логическую БД отсутствующими сущностями и данными.

Отмена изменений в случае сбоя

Если во время работы команды произошел сбой и какая-то из дельт разворачиваемой логической БД осталась незакрытой, команда выводит ошибку Delta is not committed.

Чтобы доразвернуть логическую БД в этом случае:

  1. Отмените изменения сбойной дельты, выполнив для нее ROLLBACK DELTA.
  2. Повторите исходную команду RESTORE_FROM_KAFKA.

Синтаксис

java -jar <dtm_tools_file_name>.jar \
--connect <prostore_host>:<prostore_port>/<db_name>[?user=jwt&password=<jwt_token>] \
restore_from_kafka \
--topic <backup_topic> \
[–-kafkaBrokersCore <core_kafka_brokers_list>] \
[--kafkaBrokers <backup_kafka_broker_list>] \
[--delta  <delta_num>] \
[--partitionsCount <partitions_count>] \
[--threadsCount <threads_count>] \
[--messageLimit <message_limit>] \
[--readonly [<log_level>] \
[--deltaFrom <delta_from>]] \
[--kafkaRetries <kafka_retries>]

Параметры

dtm_tools_file_name

Имя jar-файла утилиты DTM Tools.

Ключевые слова

--connect

Задает строку подключения к ноде Prostore. Значение состоит из следующих элементов:

  1. prostore_host — IP-адрес или доменное имея ноды Prostore;
  2. db_name — номер порта для подключения к ноде Prostore, равный значению параметра конфигурации DTM_CORE_HTTP_PORT;
  3. db_name — имя логической БД, которую нужно развернуть из бэкапа;
  4. (если включена аутентификация запросов) строка ?user=jwt&password=<jwt_token>, где jwt_token — авторизационный токен.

--topic

Задает имя топика Kafka, содержащего бэкап логической БД.

--kafkaBrokersCore

Задает список адресов брокеров сообщений Kafka, с которыми работает Prostore, и служит для ускорения работы команды.

Если ключевое слово не указано, команда запрашивает список адресов из конфигурации ноды Prostore.

--kafkaBrokers

Задает список брокеров сообщений Kafka, содержащих топик с бэкапом. Значение указывается в формате host1:port1,host2:port2,..hostN:portN.

Ключевое слово нужно указывать, если список брокеров отличается от значения параметра конфигурации ZOOKEEPER_KAFKA_ADDRESS.

--delta

Задает номер дельты, по состоянию на которую восстанавливаются схема данных и данные логической БД.

Если ключевое слово не указано, команда восстанавливает логическую БД по состоянию на максимальную дельту из непрерывного диапазона дельт, для которых есть инкременты в указанном топике Kafka на момент запуска команды.

--partitionsCount

Задает количество партиций в промежуточных служебных топиках, которые команда использует при загрузке данных из топика с бэкапом в восстанавливаемую логическую БД. Разделение топиков на несколько партиций может ускорить загрузку данных, если ее выполняют несколько процессов параллельно (см. threadsCount).

Если ключевое слово не указано, все данные в каждом топике содержатся в одной партиции.

--threadsCount

Задает количество параллельных потоков, загружающих бэкап в логическую БД.

Если ключевое слово не указано, количество потоков равно количеству числу партиций в топике с бэкапом.

--messageLimit

Задает максимальное количество сообщений Kafka, записываемых в один промежуточный топик.

Если ключевое слово не указано, в каждый топик записывается до 1000 сообщений.

--readonly

Включает режим проверки бэкапа и вывода метаинформации с заданным уровнем детализации log_level. При этом фактического развертывания логической БД не происходит.

Параметр log_level определяет уровень детализации логов, выводимых в режиме readonly. Возможные значения:

  • 0 (по умолчанию) — выводятся ключи сообщений Kafka и список некорректных, неполных и отсутствующих дельт в топике с бэкапом;
  • 1 — выводится список некорректных, неполных и отсутствующих дельт в топике с бэкапом.

Если ключевое слово не указано, команда загружает данные в логическую БД без вывода метаинформации по бэкапу.

--deltaFrom

Задает дельту, с которой начинается проверка бэкапа в режиме readonly.

Если ключевое слово не указано, бэкап проверяется с нулевой дельты.

--kafkaRetries

Задает количество попыток загрузки бэкапа из топика, которое команда делает при запуске команды.

Если ключевое слово не указано, делается 10 попыток.

Ограничения

  • Перед развертыванием логической БД команда не проверяет полноту журнала, сохраненного в топике с бэкапом. Если журнал отсутствует или заполнен частично, развертывание логической БД приведет к некорректным результатам.
  • Во время работы команды не допускаются следующие действия с разворачиваемой логической БД:
    • открытие, закрытие и откат дельт,
    • загрузка и обновление данных,
    • создание, удаление и изменение логических сущностей.
  • Команда игнорирует ошибки, не связанные с загрузкой данных из бэкапа, и продолжает развертывание. Например, команда пропускает создание логического представления, если оно связано с другой логической БД, отсутствующей на момент запуска команды.
  • Нумерация операций записи, а также значения delta_num в журнале исходной и развернутой логических базах данных могут отличаться.

Примеры

Развертывание логической БД по состоянию на последнюю дельту

java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing restore_from_kafka --topic marketing_backup_topic --kafkaBrokers 10.123.2.43:9092,10.123.2.44:9092 --partitionsCount 4 --threadsCount 3 --messageLimit 2000 --kafkaRetries 5

Развертывание логической БД по состоянию на последнюю дельту с указанием токена

java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing?user=jwt&password=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c restore_from_kafka --topic marketing_backup_topic --kafkaBrokers 10.123.2.43:9092,10.123.2.44:9092 --partitionsCount 4 --threadsCount 3 --messageLimit 2000  --kafkaRetries 5

Развертывание логической БД по состоянию на указанную дельту

java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing restore_from_kafka --topic marketing_backup_topic --delta 27 --threadsCount 2

Пошаговый пример развертывания логической БД из бэкапа

Для примера понадобится топик backup_topic_sourcedb с бэкапом логической БД sourcedb, выгруженным в соответствии с примером раздела Команда BACKUP_TO_KAFKA > Пошаговый пример выгрузки бэкапа.

Проверка топика с бэкапом

Проверка бэкапа в топике backup_topic_sourcedb с использованием ноды Prostore, доступной по адресу 10.92.3.86:9090:

java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/sourcedb restore_from_kafka --topic backup_topic_sourcedb --kafkaBrokers 10.92.3.86:9092 --readonly

В команде используется топик, который принадлежит брокеру Kafka, доступному по адресу 10.92.3.86:9092.

Развертывание логической БД из бэкапа

Развертывание логической БД по состоянию на последнюю дельту исходной логической БД с использованием ноды Prostore, доступной по адресу 10.92.3.86:9090:

java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/sourcedb restore_from_kafka --topic backup_topic_sourcedb --kafkaBrokers 10.92.3.86:9092 --partitionsCount 1 --threadsCount 1 --messageLimit 2000

В команде используется топик, который принадлежит брокеру Kafka, доступному по адресу 10.92.3.86:9092.

Проверка развернутой логической БД

Опционально после завершения развертывания можно проверить результат развертывания, выполнив запрос:

SELECT * from sourcedb.table1_adqm

В ответе на запрос вернется таблица с одной записью, имеющей следующие значения:

Имя столбца Значение
id 1
boolean_col false
int32_col 32768
int_col 65537
bigint_col 1280000
float_col 3.1280000
double_col 2.718281828
char_col char_value
varchar_col varchar_value
uuid_col 12345678-1234-abcd-ef00-1234567899ab
link_col https://www.google.com
date_col 1970-01-11
time_col 13:10:30
timestamp_col 1970-01-01 13:10:30.000000

Варианты вывода команды

Вывод при успешной проверке бэкапа

Ниже показан примерный вид вывода команды при успешной проверке бэкапа в режиме readonly (набор сообщений и их текст могут изменяться от версии к версии):

deltaNum: ...
icrementId: "..."
restoreInfo:
  deltaDate: ...
  streamTotal: ...
  tableEntries:
  - tableName: "..."
    topicName: "..."
  - tableName: "..."
    topicName: "..."
    ...
  deltaChangesEntries:
  - changeNum: 0
    changeQuery: "..."
  - changeNum: 1
    changeQuery: "..."
    ...
  streams:
    1:
      chunkTotal: ...
      chunks:
      - ...
      - ...
    2:
      chunkTotal: ...
      chunks:
      - ...
    ...
  done: true
...
There no incomplete deltas in range ...

Вывод при успешном развертывании логической БД из бэкапа

Ниже показан примерный вид вывода команды при успешном развертывании логической БД (набор сообщений и их текст могут изменяться от версии к версии):

Started RESTORE process with params...
...
Successfully created destination datamart ...
RestoreProcessChunkTask completed successfully ...
Topic ... already exists
...
Trying to send data message ...
Topic ... with ... partitions successfully created
...
Increment ... fully processed
Restoring delta ... of increment ...
Got changes for datamart ...
Successfully applied change to ...
Begin delta in ... datamart
Can't drop upload ext table ...
Successfully created upload ext table ...
Consumer task completed successfully, partitions: ...
Data extracted from kafka ...
Successfully dropped upload ext table ...
Committed delta with date ... in ... datamart
Restoring delta ... of increment ...
...
Break. All deltas in backup topic are processed
Topic ... successfully deleted
...
RESTORE is done. ... restored deltas ...

Вывод при неуспешном развертывании логической БД из бэкапа

При неуспешном развертывании логической БД из бэкапа команда выводит текст ошибки.