Как загрузить данные

Содержание раздела
  1. Подготовка к загрузке
    1. Создание топика (опционально)
    2. Подготовка данных
    3. Подготовка логических сущностей
      1. Подготовка сущностей для загрузки в логическую таблицу
      2. Подготовка сущностей для загрузки в прокси-таблицу
      3. Подготовка сущностей для загрузки в standalone-таблицу
  2. Загрузка данных
  3. Примеры
    1. Загрузка в логическую таблицу
    2. Загрузка в прокси-таблицу
    3. Загрузка в standalone-таблицу

Система позволяет параллельно загружать большие объемы данных из Kafka, добавляя, обновляя и удаляя записи.

Загружать данные можно в следующие логические сущности:

Загрузка данных в логические и материализованные представления недоступна.

Под большим объемом данных подразумевается количество записей от нескольких сотен до нескольких миллионов. Для загрузки небольшого объема данных можно использовать функцию обновления данных.

Перед загрузкой данных выполните действия, описанные в секции Подготовка к загрузке.

Подготовка к загрузке

Создание топика (опционально)

Данные загружаются в систему из сообщений топиков Kafka. Поэтому, если в брокере сообщений Kafka выключено автоматическое создание топиков, создайте топики вручную.

Чтобы создать топик, следуйте любой из инструкций в документации Kafka:

Рекомендации о разделении данных по топикам см. в разделе Рекомендации по топикам Kafka.

Подготовка данных

Чтобы подготовить данные, загрузите данные, соответствующие формату, из внешней информационной системы в топик Kafka:

  • Чтобы добавить или обновить запись логической таблицы, включите в сообщение Kafka запись Avro со значением sys_op, равным 0.
  • Чтобы удалить запись из логической таблицы, включите в сообщение Kafka запись Avro со значением sys_op, равным 1.
  • Чтобы добавить, обновить* или удалить* запись прокси-таблицы или standalone-таблицы, включите в сообщение Kafka запись Avro без значения sys_op.

*Обновление и удаление записей в прокси-таблицах и standalone-таблицах с помощью функции загрузки данных доступны только в СУБД ADP.
Обновить и удалить записи прокси-таблиц и standalone-таблиц, размещенных в СУБД ADB и ADP, также можно с помощью запросов UPSERT VALUES и DELETE.

Подготовка логических сущностей

Подготовка сущностей для загрузки в логическую таблицу

  1. Создайте логическую таблицу, если она отсутствует.
  2. Создайте внешнюю таблицу для загрузки данных, если она отсутствует:

После завершения загрузки созданные внешние таблицы можно использовать повторно или удалить.

Подготовка сущностей для загрузки в прокси-таблицу

  1. Создайте прокси-таблицу, если она отсутствует.
  2. Создайте внешнюю таблицу для загрузки данных, если она отсутствует:

После завершения загрузки созданные внешние таблицы можно использовать повторно или удалить.

Подготовка сущностей для загрузки в standalone-таблицу

  1. Создайте внешнюю таблицу для загрузки данных, если она отсутствует:
  2. Создайте внешнюю writable-таблицу, если она отсутствует:
    • если standalone-таблица отсутствует и ее нужно создать при создании внешней таблицы, укажите в запросе ключевое слово OPTIONS со значением auto.create.table.enable=true,
    • иначе выполните запрос без ключевого слова OPTIONS.

После завершения загрузки созданные внешние таблицы можно использовать повторно или удалить.

Загрузка данных

Чтобы загрузить данные:

  1. Если необходимо загрузить данные в логическую таблицу и включить операцию загрузки в дельту, а дельта еще не открыта, откройте ее.
  2. Выполните запрос INSERT SELECT FROM external_table:
    • чтобы выполнить запрос в синхронном режиме, отправьте его по JDBC или по HTTP с помощью POST-метода query без параметра async (или с параметром async, имеющим значение false);
    • чтобы выполнить запрос в асинхронном режиме, отправьте его по HTTP с помощью POST-метода query, где в теле сообщения укажите параметр async со значением true.
  3. Если данные загружались в логическую таблицу в рамках дельты и эта операция обновления должна быть последней в дельте, закройте дельту.

При загрузке данных в прокси-таблицы и standalone-таблицы учитывайте ограничения таблиц целевой СУБД.

Примеры

Загрузка в логическую таблицу

-- создание логической таблицы
CREATE TABLE marketing.sales (
  id BIGINT NOT NULL,
  transaction_date TIMESTAMP NOT NULL,
  product_code VARCHAR(256) NOT NULL,
  product_units BIGINT NOT NULL,
  store_id BIGINT NOT NULL,
  description VARCHAR(256),
  PRIMARY KEY (id)
)
DISTRIBUTED BY (id);

-- создание внешней таблицы загрузки
CREATE UPLOAD EXTERNAL TABLE marketing.sales_ext_upload (
  id BIGINT,
  transaction_date TIMESTAMP,
  product_code VARCHAR(256),
  product_units BIGINT,
  store_id BIGINT,
  description VARCHAR(256)
)
LOCATION 'kafka_brokers://10.129.0.22:9092,10.129.0.123:9092/sales_in'
FORMAT 'AVRO'
MESSAGE_LIMIT 1000;

-- загрузка данных в логическую таблицу вне дельты
INSERT INTO marketing.sales SELECT * FROM marketing.sales_ext_upload;

-- выбор логической базы данных marketing в качестве базы данных по умолчанию
USE marketing;

-- открытие новой дельты
BEGIN DELTA;

-- загрузка данных в логическую таблицу в дельте
INSERT INTO sales SELECT * FROM sales_ext_upload;

-- закрытие дельты
COMMIT DELTA;

Загрузка в прокси-таблицу

-- создание внешней readable-таблицы для загрузки данных
CREATE READABLE EXTERNAL TABLE marketing.payments_ext_upload_to_proxy (
  id BIGINT,
  agreement_id BIGINT,
  type_code VARCHAR(16),
  amount DOUBLE,
  currency_code VARCHAR(3),
  description VARCHAR,
  PRIMARY KEY(id)
)
DISTRIBUTED BY (id)
LOCATION 'kafka://$kafka/payments_adp_in'
FORMAT 'AVRO';

-- загрузка данных в прокси-таблицу payments_proxy
INSERT INTO marketing.payments_proxy SELECT * FROM marketing.payments_ext_upload_to_proxy;

Загрузка в standalone-таблицу

Создание standalone-таблицы с помощью системы (см. параметр auto.create.table.enable=true) и загрузка данных в созданную таблицу:

-- создание внешней writable-таблицы с созданием связанной standalone-таблицы в датасорсе adp
CREATE WRITABLE EXTERNAL TABLE marketing.agreements_ext_write_adp (
  id BIGINT NOT NULL,
  client_id BIGINT NOT NULL,
  number VARCHAR NOT NULL,
  signature_date DATE,
  effective_date DATE,
  closing_date DATE,
  description VARCHAR,
  PRIMARY KEY(id)
)
DISTRIBUTED BY (id)
LOCATION 'core:adp://marketing.agreements'
OPTIONS ('auto.create.table.enable=true');

-- создание внешней таблицы загрузки
CREATE UPLOAD EXTERNAL TABLE marketing.agreements_ext_upload (
  id BIGINT NOT NULL,
  client_id BIGINT NOT NULL,
  number VARCHAR NOT NULL,
  signature_date DATE,
  effective_date DATE,
  closing_date DATE,
  description VARCHAR
) 
LOCATION  'kafka://$kafka/agreements'
FORMAT 'AVRO'
OPTIONS ('auto.create.sys_op.enable=false')

-- загрузка данных в standalone-таблицу, на которую указывает внешняя writable-таблица agreements_ext_write_adp
INSERT INTO marketing.agreements_ext_write_adp SELECT * FROM marketing.agreements_ext_upload;

Подробнее об опции auto.create.table.enable см. в разделах CREATE WRITABLE EXTERNAL TABLE и CREATE READABLE EXTERNAL TABLE.

Загрузка в существующую standalone-таблицу:

-- создание внешней writable-таблицы, указывающей на существующую standalone-таблицу в датасорсе adqm
CREATE WRITABLE EXTERNAL TABLE marketing.payments_ext_write_adqm (
id BIGINT NOT NULL,
agreement_id BIGINT,
code VARCHAR(16),
amount DOUBLE,
currency_code VARCHAR(3),
description VARCHAR
)
LOCATION 'core:adqm://dtm__marketing.payments';

-- создание внешней таблицы загрузки
CREATE UPLOAD EXTERNAL TABLE marketing.payments_ext_upload (
id BIGINT NOT NULL,
agreement_id BIGINT,
code VARCHAR(16),
amount DOUBLE,
currency_code VARCHAR(3),
description VARCHAR
)
LOCATION  'kafka://$kafka/payments_in'
FORMAT 'AVRO'
OPTIONS ('auto.create.sys_op.enable=false');

-- загрузка данных в standalone-таблицу, на которую указывает внешняя writable-таблица payments_ext_write_adqm
INSERT INTO marketing.payments_ext_write_adqm SELECT * FROM marketing.payments_ext_upload;