Команда RESTORE_FROM_KAFKA

Содержание раздела
  1. Восстанавливаемые сущности
  2. Восстанавливаемые данные
    1. Восстановление данных из инкрементального бэкапа
    2. Восстановление данных из снапшот-бэкапа
  3. Аутентификация
  4. Как работает команда
    1. Загрузка данных из топика
    2. Создание сущностей и восстановление данных
    3. Обработка ошибок
  5. Использование команды
    1. Проверка целостности бэкапа
    2. Восстановление логической БД
    3. Создание копии логической БД в другом окружении
    4. Отмена изменений в случае сбоя
  6. Синтаксис
    1. Параметры
    2. Ключевые слова
      1. --connect
      2. --topic
      3. --kafkaBrokersCore
      4. --kafkaBrokers
      5. --delta
      6. --partitionsCount
      7. --threadsCount
      8. --messageLimit
      9. --readonly
      10. --deltaFrom
      11. --kafkaRetries
  7. Ограничения
  8. Примеры
    1. Развертывание логической БД по состоянию на последнюю дельту
    2. Развертывание логической БД по состоянию на последнюю дельту с указанием токена
    3. Развертывание логической БД по состоянию на указанную дельту
    4. Пошаговый пример развертывания логической БД из бэкапа
      1. Проверка топика с бэкапом
      2. Развертывание логической БД из бэкапа
      3. Проверка развернутой логической БД
  9. Варианты вывода команды
    1. Вывод при успешной проверке бэкапа
    2. Вывод при успешном развертывании логической БД из бэкапа
    3. Вывод при неуспешном развертывании логической БД из бэкапа

Команда RESTORE_FROM_KAFKA разворачивает логическую базу данных из бэкапа, сохраненного в топике Kafka командой BACKUP_TO_KAFKA.

Примеры использования команды:

  • восстановление сбойной или удаленной логической БД;
  • создание копии логической БД в другом окружении.

По умолчанию команда загружает бэкап из топика того брокера сообщений Kafka, который задан в команде с помощью ключевого слова kafkaBrokersCore и (или) в конфигурации с помощью параметра ZOOKEEPER_KAFKA_ADDRESS. Чтобы загрузить бэкап из другого брокера сообщений, укажите в команде ключевое слово kafkaBrokers с адресом нужного брокера.

Для запуска команды на хосте должна быть установлена среда исполнения Java (JRE) или набор инструментов для разработки на Java (JDK).

Восстанавливаемые сущности

Команда восстанавливает логическую и физическую структуры следующих сущностей:

Команда не восстанавливает внешние таблицы и standalone-таблицы исходной логической БД, так как они отсутствуют в бэкапе. При необходимости вы можете их восстановить вручную после успешного завершения работы команды.

Восстанавливаемые данные

Команда восстанавливает данные следующих сущностей:

Данные материализованных представлений восстанавливаются не командой, а механизмом синхронизации представлений. Синхронизация представления запускается после того, как команда завершает восстановление связанных таблиц представления.

Данные восстанавливаются в следующем состоянии:

  • на указанную или последнюю дельту — если развертывание выполняется из инкрементального бэкапа;
  • на зафиксированный момент времени — если развертывание выполняется из снапшот-бэкап.

Подробнее о видах бэкапа см. в разделе Команда BACKUP_TO_KAFKA.

Восстановление данных из инкрементального бэкапа

В зависимости от заданных параметров команда восстанавливает данные по состоянию на указанную или последнюю дельту с сохранением всей истории изменений этих данных.

Номера и метки времени дельт, развернутых из бэкапа, соответствуют исходной логической БД. Соответствие номеров операций записи (CN) в дельтах не гарантируется.

Если данные бэкапа разворачиваются в непустой логической БД, это логическая БД наполняется только недостающими сущностями и дельтами. Текущие сущности и дельты в логической БД не перезаписываются, даже если отличаются от сохраненных в бэкапе.

При восстановлении из инкрементального бэкапа данные прокси-таблиц не восстанавливаются.

Восстановление данных из снапшот-бэкапа

При восстановлении логической БД из снапшот-бэкапа данные восстанавливаются по состоянию на момент времени, зафиксированный в снапшоте. Дополнительно восстанавливается та история изменений данных, которая была выгружена вместе со снапшотом, если такая была (см. ключевое слово snapshotDepth команды BACKUP_TO_KAFKA).

Номера дельт, развернутых из снапшот-бэкапа, отличаются от исходной логической БД. Первая по номеру дельта бэкапа становится нулевой дельтой в развернутой логической БД, а последующие — следующими по порядку. Например:

  • выгруженные данные: снапшот на дельту 21 и изменения данных в дельтах 22–24;
  • результат в развернутой логической БД: дельты 0–3 (дельта 21 → дельта 0, дельта 22 → дельта 1 и т.д.).

Нумерация операций записи в восстановленной логической БД может отличаться от источника.

Аутентификация

Команда поддерживает передачу JWT-токена для последующей аутентификации в системе.

Если в системе включена аутентификация запросов, необходимо указать информацию о токене в составе строки подключения команды. Подробнее о формате строки подключения см. в секции Синтаксис.

Как работает команда

Загрузка данных из топика

Данные последовательно загружаются из топика следующего брокера Kafka:

  • указанного с помощью ключевого слова kafkaBrokers — если команда содержит kafkaBrokers;
  • указанного с помощью ключевого слова kafkaBrokersCore и (или) параметра конфигурации ZOOKEEPER_KAFKA_ADDRESS — если команда не содержит kafkaBrokers.

Загрузка данных происходит через промежуточные топики. Количество партиций в промежуточных топиках регулируется значением ключевого слова partitionsCount.

По мере наполнения промежуточных топиков данные из них загружаются в логическую БД. Перегрузка данных через отдельные топики исключает влияние сбоев, которые могут возникнуть при развертывании, на целостность бэкапа в топике Kafka.

Команда не фиксирует прочитанное смещение в топике с бэкапом, позволяя многократно загружать бэкап из одного и того же топика.

Создание сущностей и восстановление данных

При загрузке бэкапа в логическую БД команда создает необходимые логические сущности и наполняет их данными. Вместе с каждой дельтой разворачивается схема данных, актуальная на момент выгрузки бэкапа.

Операции записи, выполненные в исходной логической БД вне дельт, при загрузке бэкапа в логическую БД включаются в состав ближайшей дельты, следующей за ними.

Из всех записей с одним первичным ключом, которые были добавлены и удалены в дельте и отдельных операциях записи с той же меткой времени, как у этой дельты, при восстановлении логической БД остается только одна итоговая запись. Например, для таблицы с первичным ключом id:

  • отдельная операция записи: добавлена запись id=100, value=10;
  • следующая дельта (номер 5): изменена запись id=100, value=10id=100, value=33;
  • результат восстановления для дельты 5: добавлена запись id=100, value=33.

Обработка ошибок

Если команде не удалось восстановить логическую базу данных, она останавливает работу и выдает ошибку. Дополнительно, если часть данных удалось восстановить, команда возвращает номера восстановленных дельт.

В случае ошибки команда не отменяет сделанные изменения, и для логической БД может потребоваться повторный запуск команды и (или) ручной разбор результатов восстановления.

Использование команды

Проверка целостности бэкапа

Чтобы проверить корректность и полноту бэкапа в топике Kafka без его фактического развертывания, выполните команду RESTORE_FROM_KAFKA с ключевым словом readonly.

Восстановление логической БД

Чтобы восстановить сбойную или удаленную логическую БД, выполните команду RESTORE_FROM_KAFKA. В команде укажите топик, в котором находится бэкап логической БД, с помощью ключевого слова topic.

Инкрементальный бэкап можно загрузить в пустую или непустую логическую БД, снапшот-бэкапа — только в пустую логическую БД.

Если логическая БД восстанавливается из снапшот-бэкапа, топик topic должен содержать не более одного снапшота этой логической БД; иначе возможны ошибки восстановления.

Создание копии логической БД в другом окружении

Чтобы создать копию логической БД на том же сервере, не прерывая работу исходной логической БД:

  1. Разверните еще один экземпляр системы с новым окружением.
  2. В новом окружении создайте копию логической БД, выполнив команду RESTORE_FROM_KAFKA. В качестве имени разворачиваемой логической БД укажите имя исходной логической БД, для которой создавался бэкап.

Отмена изменений в случае сбоя

Если во время работы команды произошел сбой и какая-то из дельт разворачиваемой логической БД осталась незакрытой, команда выводит ошибку Delta is not committed.

Чтобы доразвернуть логическую БД в этом случае:

  1. Отмените изменения сбойной дельты, выполнив для нее ROLLBACK DELTA.
  2. Повторите исходную команду RESTORE_FROM_KAFKA.

Синтаксис

java -jar <dtm_tools_file_name>.jar \
--connect <prostore_host>:<prostore_port>/<db_name>[?user=jwt&password=<jwt_token>] \
restore_from_kafka \
--topic <backup_topic> \
[–-kafkaBrokersCore <core_kafka_brokers_list>] \
[--kafkaBrokers <backup_kafka_broker_list>] \
[--delta <delta_num>] \
[--partitionsCount <partitions_count>] \
[--threadsCount <threads_count>] \
[--messageLimit <message_limit>] \
[--readonly [<log_level>] \
[--deltaFrom <delta_from>]] \
[--kafkaRetries <kafka_retries>]

Параметры

dtm_tools_file_name

Имя jar-файла утилиты DTM Tools.

Ключевые слова

--connect

Задает строку подключения к ноде Prostore. Значение состоит из следующих элементов:

  1. prostore_host — IP-адрес или доменное имея ноды Prostore;
  2. db_name — номер порта для подключения к ноде Prostore, равный значению параметра конфигурации DTM_CORE_HTTP_PORT;
  3. db_name — имя логической БД, которую нужно развернуть из бэкапа;
  4. (если включена аутентификация запросов) строка ?user=jwt&password=<jwt_token>, где jwt_token — авторизационный токен.

--topic

Задает имя топика Kafka, содержащего бэкап логической БД.

--kafkaBrokersCore

Задает список адресов брокеров сообщений Kafka, с которыми работает Prostore, и служит для ускорения работы команды.

Ключевое слово обязательно при запуске команды в инсталляциях, где сервисная БД в ZooKeeper недоступна со стороны утилиты DTM Tools и утилита не может запросить информацию о брокерах из конфигурации Prostore. В остальных случаях ключевое слово опционально.

Если ключевое слово не указано, команда запрашивает список адресов из конфигурации ноды Prostore.

--kafkaBrokers

Задает список адресов брокеров сообщений Kafka, содержащих топик с бэкапом, в формате host1:port1,host2:port2,..hostN:portN.

Ключевое слово нужно указывать, если требуется загрузить бэкап из брокеров, отличных от тех, с которыми работает Prostore.

На рисунке ниже показана схема работы утилиты с двумя брокерами Kafka: один брокер используется утилитой и системой Prostore, второй — только утилитой. Первый брокер находится в одной сети с Prostore, а второй может находиться в другой сети, например, в удаленном дата-центре.

Схема работы утилиты с двумя брокерами Kafka

--delta

Задает номер дельты, по состоянию на которую восстанавливаются данные логической БД из инкрементального бэкапа.

Если ключевое слово не указано и восстановление происходит из инкрементального бэкапа, команда восстанавливает данные по состоянию на максимальную дельту из непрерывного диапазона дельт в топике (на момент запуска команды). Например:

  • инкременты дельт в топике: 0, 1, 5, 6, 7;
  • восстанавливаются дельты: 0, 1.

--partitionsCount

Задает количество партиций в промежуточных служебных топиках, которые команда использует при загрузке данных из топика с бэкапом в восстанавливаемую логическую БД. Разделение топиков на несколько партиций может ускорить загрузку данных, если ее выполняют несколько процессов параллельно (см. threadsCount).

Если ключевое слово не указано, все данные в каждом топике содержатся в одной партиции.

--threadsCount

Задает количество параллельных потоков, загружающих бэкап в логическую БД.

Если ключевое слово не указано, количество потоков равно количеству числу партиций в топике с бэкапом.

--messageLimit

Задает максимальное количество сообщений Kafka, записываемых в один промежуточный топик.

Если ключевое слово не указано, в каждый топик записывается до 1000 сообщений.

--readonly

Включает режим проверки бэкапа и вывода метаинформации с заданным уровнем детализации log_level. При этом фактического развертывания логической БД не происходит.

Параметр log_level определяет уровень детализации логов, выводимых в режиме readonly. Возможные значения:

  • 0 (по умолчанию) — выводятся ключи сообщений Kafka и список некорректных, неполных и отсутствующих дельт в топике с бэкапом;
  • 1 — выводится список некорректных, неполных и отсутствующих дельт в топике с бэкапом.

Если ключевое слово не указано, команда загружает данные в логическую БД без вывода метаинформации по бэкапу.

--deltaFrom

Задает дельту, с которой начинается проверка бэкапа в режиме readonly.

Если ключевое слово не указано, бэкап проверяется с нулевой дельты.

--kafkaRetries

Задает количество попыток загрузки бэкапа из топика, которое команда делает при запуске команды.

Если ключевое слово не указано, делается 10 попыток.

Ограничения

  • Перед развертыванием логической БД команда не проверяет полноту журнала, сохраненного в топике с бэкапом. Если журнал отсутствует или заполнен частично, развертывание логической БД приведет к некорректным результатам.
  • Во время работы команды не допускаются следующие действия с разворачиваемой логической БД:
    • открытие, закрытие и откат дельт,
    • загрузка и обновление данных,
    • создание, удаление и изменение логических сущностей.
  • Команда игнорирует ошибки, не связанные с загрузкой данных из бэкапа, и продолжает развертывание. Например, команда пропускает создание логического представления, если оно связано с другой логической БД, отсутствующей на момент запуска команды.
  • Нумерация операций записи, а также значения delta_num в журнале исходной и развернутой логических базах данных могут отличаться.
  • Если логическая БД восстанавливается из инкрементального бэкапа, данные прокси-таблиц не восстанавливаются. Восстанавливаются только их логическая и физическая схемы.
  • Развертывание из снапшот-бэкапа:
    • Нумерация дельт в развернутой логической БД отличается от нумерации в исходной логической БД.
    • Топик, из которого восстанавливается логическая БД, должен содержать не более одного снапшота этой логической БД. Иначе возможны ошибки восстановления.

Примеры

Развертывание логической БД по состоянию на последнюю дельту

java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing restore_from_kafka --topic marketing_backup_topic --kafkaBrokers 10.123.2.43:9092,10.123.2.44:9092 --partitionsCount 4 --threadsCount 3 --messageLimit 2000 --kafkaRetries 5

Развертывание логической БД по состоянию на последнюю дельту с указанием токена

java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing?user=jwt&password=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c restore_from_kafka --topic marketing_backup_topic --kafkaBrokers 10.123.2.43:9092,10.123.2.44:9092 --partitionsCount 4 --threadsCount 3 --messageLimit 2000  --kafkaRetries 5

Развертывание логической БД по состоянию на указанную дельту

java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing restore_from_kafka --topic marketing_backup_topic --delta 27 --threadsCount 2

Пошаговый пример развертывания логической БД из бэкапа

Для примера понадобится топик backup_topic_sourcedb с бэкапом логической БД sourcedb, выгруженным в соответствии с примером раздела Команда BACKUP_TO_KAFKA > Пошаговый пример выгрузки бэкапа.

Проверка топика с бэкапом

Проверка бэкапа в топике backup_topic_sourcedb с использованием ноды Prostore, доступной по адресу 10.92.3.86:9090:

java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/sourcedb restore_from_kafka --topic backup_topic_sourcedb --kafkaBrokers 10.92.3.86:9092 --readonly

В команде используется топик, который принадлежит брокеру Kafka, доступному по адресу 10.92.3.86:9092.

Развертывание логической БД из бэкапа

Команды, приведенные ниже в этой секции, иллюстрируют разные способы восстановления логической БД. Чтобы восстановить тестовую БД, используйте любую из команд, но не обе сразу.

В командах используются топики, которые принадлежит брокеру Kafka, доступному по адресу 10.92.3.86:9092.

Развертывание логической БД из топика, содержащего инкрементальный бэкап (данные восстанавливается по состоянию на последнюю дельту исходной логической БД с полной историей изменений):

java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/sourcedb restore_from_kafka --topic backup_topic_sourcedb --kafkaBrokers 10.92.3.86:9092 --partitionsCount 1 --threadsCount 1 --messageLimit 2000

Развертывание логической БД из топика, содержащего снапшот-бэкап (данные восстанавливаются по состоянию на последнюю дельту исходной логической БД без истории изменений, так как бэкап был выгружен без истории)

java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/sourcedb restore_from_kafka --topic backup_topic_sourcedb_snapshot --kafkaBrokers 10.92.3.86:9092 --partitionsCount 1 --threadsCount 1 --messageLimit 2000

Проверка развернутой логической БД

Опционально после завершения развертывания можно проверить результат развертывания, выполнив запрос:

SELECT * from sourcedb.table1_adqm

В ответе на запрос вернется таблица с одной записью, имеющей следующие значения:

Имя столбца Значение
id 1
boolean_col false
int32_col 32768
int_col 65537
bigint_col 1280000
float_col 3.1280000
double_col 2.718281828
char_col char_value
varchar_col varchar_value
uuid_col 12345678-1234-abcd-ef00-1234567899ab
link_col https://www.google.com
date_col 1970-01-11
time_col 13:10:30
timestamp_col 1970-01-01 13:10:30.000000

Варианты вывода команды

Вывод при успешной проверке бэкапа

Ниже показан примерный вид вывода команды при успешной проверке бэкапа в режиме readonly (набор сообщений и их текст могут изменяться от версии к версии):

deltaNum: ...
icrementId: "..."
restoreInfo:
  deltaDate: ...
  streamTotal: ...
  tableEntries:
  - tableName: "..."
    topicName: "..."
  - tableName: "..."
    topicName: "..."
    ...
  deltaChangesEntries:
  - changeNum: 0
    changeQuery: "..."
  - changeNum: 1
    changeQuery: "..."
    ...
  streams:
    1:
      chunkTotal: ...
      chunks:
      - ...
      - ...
    2:
      chunkTotal: ...
      chunks:
      - ...
    ...
  done: true
...
There no incomplete deltas in range ...

Вывод при успешном развертывании логической БД из бэкапа

Ниже показан примерный вид вывода команды при успешном развертывании логической БД (набор сообщений и их текст могут изменяться от версии к версии):

Started RESTORE process with params...
...
Successfully created destination datamart ...
RestoreProcessChunkTask completed successfully ...
Topic ... already exists
...
Trying to send data message ...
Topic ... with ... partitions successfully created
...
Increment ... fully processed
Restoring delta ... of increment ...
Got changes for datamart ...
Successfully applied change to ...
Begin delta in ... datamart
Can't drop upload ext table ...
Successfully created upload ext table ...
Consumer task completed successfully, partitions: ...
Data extracted from kafka ...
Successfully dropped upload ext table ...
Committed delta with date ... in ... datamart
Restoring delta ... of increment ...
...
Break. All deltas in backup topic are processed
Topic ... successfully deleted
...
RESTORE is done. ... restored deltas ...

Вывод при неуспешном развертывании логической БД из бэкапа

При неуспешном развертывании логической БД из бэкапа команда выводит текст ошибки.