Загрузка данных

Содержание раздела
  1. Подготовка к загрузке
    1. Создание топика (опционально)
    2. Подготовка данных
    3. Подготовка логических сущностей
      1. Подготовка сущностей для загрузки в логическую таблицу
      2. Подготовка сущностей для загрузки в standalone-таблицу
  2. Загрузка данных в логическую таблицу
  3. Загрузка данных в standalone-таблицу
  4. Примеры
    1. Загрузка в логическую таблицу
    2. Загрузка в standalone-таблицу

Система позволяет параллельно загружать большие объемы данных, добавляя, обновляя и удаляя записи.

Загружать данные можно в логические таблицы и standalone-таблицы.

Перед загрузкой данных выполните действия, описанные в секции Подготовка к загрузке.

Об успешности загрузки данных можно судить по успешному ответу на запрос загрузки данных. Другие факторы такие, как смещение офсетов в топике или наличие данных в физической таблице <table>_staging, не означают успешность загрузки данных.

Подготовка к загрузке

Создание топика (опционально)

Данные загружаются в систему из сообщений топиков Kafka. Поэтому, если в брокере сообщений Kafka выключено автоматическое создание топиков, создайте топики вручную.

Чтобы создать топик, следуйте любой из инструкций в документации Kafka:

Рекомендации о разделении данных по топикам см. в разделе Рекомендации по топикам Kafka.

Подготовка данных

Чтобы подготовить данные к загрузке:

  1. Используя средства внешней системы, приведите данные в формат, подходящий для загрузки.
  2. Убедись, что каждая запись Avro содержит следующее значения sys_op, или добавьте его, если оно отсутствует:
    • 0 — если нужно добавить или обновить запись логической таблицы;
    • 1 — если нужно удалить запись логической таблицы;
    • пустое значение — если нужно добавить* запись standalone-таблицы.
  3. Загрузите подготовленные данные в топик Kafka.

* Обновление и удаление данных standalone-таблиц с помощью функции загрузки данных недоступно. Обновить записи standalone-таблицы можно запросом UPSERT VALUES, удалить — запросом DELETE.

Система не валидирует данные при их загрузке. Данные должны быть проверены на соответствие формату загрузки и, если нужно, скорректированы внешней системой перед их загрузкой в Prostore.

Подготовка логических сущностей

Подготовка сущностей для загрузки в логическую таблицу

  1. Создайте логическую таблицу, если она отсутствует.
  2. Создайте внешнюю таблицу для загрузки данных, если она отсутствует:

После завершения загрузки созданные внешние таблицы можно использовать повторно или удалить.

Подготовка сущностей для загрузки в standalone-таблицу

  1. Создайте внешнюю таблицу для загрузки данных, если она отсутствует:
  2. Создайте внешнюю writable-таблицу, если она отсутствует:
    • если standalone-таблица отсутствует и ее нужно создать при создании внешней таблицы, укажите в запросе ключевое слово OPTIONS со значением auto.create.table.enable=true,
    • иначе выполните запрос без ключевого слова OPTIONS.

После завершения загрузки созданные внешние таблицы можно использовать повторно или удалить.

Загрузка данных в логическую таблицу

Чтобы загрузить данные из топика Kafka в логическую таблицу, выполните INSERT SELECT FROM external_table. В запросе укажите внешнюю таблицу, определяющую параметры загрузки.

Операцию по загрузке данных можно выполнить в дельте или вне ее. Чтобы загрузить данные в дельте:

  1. Откройте дельту.
  2. Загрузите нужные данные.
  3. Закройте дельту.

Чтобы данные, записанные вне дельт, были доступны во всех видах запросов на чтение и выгрузку данных, их должна замыкать дельта. Замыкающая дельта может быть пустой. Подробнее см. в разделе Операции в дельте и вне дельты.

Загрузка данных в standalone-таблицу

Чтобы загрузить данные из внешней информационной системы в standalone-таблицу, выполните INSERT SELECT FROM external_table.

В запросе укажите внешнюю таблицу, определяющую параметры загрузки, а также внешнюю writable-таблицу, ссылающуюся на целевую standalone-таблицу.

При загрузке данных в standalone-таблицу учитывайте ограничения таблиц в конкретной СУБД.

Примеры

Загрузка в логическую таблицу

-- выбор логической базы данных marketing в качестве базы данных по умолчанию
USE marketing;

-- создание логической таблицы
CREATE TABLE sales (
  id BIGINT NOT NULL,
  transaction_date TIMESTAMP NOT NULL,
  product_code VARCHAR(256) NOT NULL,
  product_units BIGINT NOT NULL,
  store_id BIGINT NOT NULL,
  description VARCHAR(256),
  PRIMARY KEY (id)
)
DISTRIBUTED BY (id);

-- создание внешней таблицы загрузки
CREATE UPLOAD EXTERNAL TABLE sales_ext_upload (
  id BIGINT,
  transaction_date TIMESTAMP,
  product_code VARCHAR(256),
  product_units BIGINT,
  store_id BIGINT,
  description VARCHAR(256)
)
LOCATION  'kafka://zk1:2181,zk2:2181,zk3:2181/sales'
FORMAT 'AVRO'
MESSAGE_LIMIT 1000;

-- загрузка данных в логическую таблицу вне дельты
INSERT INTO sales SELECT * FROM sales_ext_upload;

-- открытие новой (горячей) дельты
BEGIN DELTA;

-- загрузка данных в логическую таблицу в дельте
INSERT INTO sales SELECT * FROM sales_ext_upload;

-- закрытие дельты
COMMIT DELTA;

Загрузка в standalone-таблицу

Создание standalone-таблицы с помощью системы (см. параметр auto.create.table.enable=true) и загрузка данных в созданную таблицу:

-- создание внешней writable-таблицы с созданием связанной standalone-таблицы в датасорсе adp
CREATE WRITABLE EXTERNAL TABLE marketing.agreements_ext_write_adp (
  id BIGINT NOT NULL,
  client_id BIGINT NOT NULL,
  number VARCHAR NOT NULL,
  signature_date DATE,
  effective_date DATE,
  closing_date DATE,
  description VARCHAR,
  PRIMARY KEY(id)
)
DISTRIBUTED BY (id)
LOCATION 'core:adp://marketing.agreements'
OPTIONS ('auto.create.table.enable=true');

-- создание внешней таблицы загрузки
CREATE UPLOAD EXTERNAL TABLE marketing.agreements_ext_upload (
  id BIGINT NOT NULL,
  client_id BIGINT NOT NULL,
  number VARCHAR NOT NULL,
  signature_date DATE,
  effective_date DATE,
  closing_date DATE,
  description VARCHAR
) 
LOCATION  'kafka://$kafka/agreements'
FORMAT 'AVRO'
OPTIONS ('auto.create.sys_op.enable=false')

-- загрузка данных в standalone-таблицу, на которую указывает внешняя writable-таблица agreements_ext_write_adp
INSERT INTO marketing.agreements_ext_write_adp SELECT * FROM marketing.agreements_ext_upload;

Подробнее об опции auto.create.table.enable см. в разделах CREATE WRITABLE EXTERNAL TABLE и CREATE READABLE EXTERNAL TABLE.

Загрузка в существующую standalone-таблицу:

-- создание внешней writable-таблицы, указывающей на существующую standalone-таблицу в датасорсе adqm
CREATE WRITABLE EXTERNAL TABLE marketing.payments_ext_write_adqm (
id BIGINT NOT NULL,
agreement_id BIGINT,
code VARCHAR(16),
amount DOUBLE,
currency_code VARCHAR(3),
description VARCHAR
)
LOCATION 'core:adqm://dtm__marketing.payments';

-- создание внешней таблицы загрузки
CREATE UPLOAD EXTERNAL TABLE marketing.payments_ext_upload (
id BIGINT NOT NULL,
agreement_id BIGINT,
code VARCHAR(16),
amount DOUBLE,
currency_code VARCHAR(3),
description VARCHAR
)
LOCATION  'kafka://$kafka/payments_in'
FORMAT 'AVRO'
OPTIONS ('auto.create.sys_op.enable=false');

-- загрузка данных в standalone-таблицу, на которую указывает внешняя writable-таблица payments_ext_write_adqm
INSERT INTO marketing.payments_ext_write_adqm SELECT * FROM marketing.payments_ext_upload;