Загрузка данных

Содержание раздела
  1. Подготовка к загрузке
    1. Создание топика (опционально)
    2. Подготовка данных
    3. Подготовка логических сущностей
      1. Подготовка сущностей для загрузки в логическую таблицу
      2. Подготовка сущностей для загрузки в standalone-таблицу
  2. Загрузка данных в логическую таблицу
  3. Загрузка данных в standalone-таблицу
  4. Примеры
    1. Загрузка в логическую таблицу
    2. Загрузка в standalone-таблицу

Система позволяет параллельно загружать большие объемы данных, добавляя, обновляя и удаляя записи.

Загружать данные можно в логические таблицы и standalone-таблицы. Загрузка данных в логические и материализованные представления недоступна.

Под большим объемом данных подразумевается количество записей от нескольких сотен до нескольких миллионов. Для загрузки небольшого объема данных можно использовать функцию обновления данных.

Действия по загрузке данных в логические и standalone-таблицы отличаются, поэтому рассмотрены по отдельности. Перед загрузкой данных выполните действия, описанные в секции Подготовка к загрузке.

Подготовка к загрузке

Создание топика (опционально)

Данные загружаются в систему из сообщений топиков Kafka. Поэтому, если в брокере сообщений Kafka выключено автоматическое создание топиков, создайте топики вручную.

Чтобы создать топик, следуйте любой из инструкций в документации Kafka:

Рекомендации о разделении данных по топикам см. в разделе Рекомендации по топикам Kafka.

Подготовка данных

Чтобы подготовить данные, загрузите данные, соответствующие формату, из внешней информационной системы в топик Kafka:

  • Чтобы добавить или обновить запись логической таблицы, включите в сообщение Kafka запись Avro со значением sys_op, равным 0.
  • Чтобы удалить запись из логической таблицы, включите в сообщение Kafka запись Avro со значением sys_op, равным 1.
  • Чтобы добавить* запись в standalone-таблицу, включите в сообщение Kafka запись Avro без значения sys_op.

* Обновление и удаление данных standalone-таблиц с помощью функции загрузки данных недоступно. Обновить записи standalone-таблицы можно запросом UPSERT VALUES, удалить — запросом DELETE.

Подготовка логических сущностей

Подготовка сущностей для загрузки в логическую таблицу

  1. Создайте логическую таблицу, если она отсутствует.
  2. Создайте внешнюю таблицу загрузки, если она отсутствует.

Созданные внешние таблицы загрузки можно использовать повторно или удалить.

Подготовка сущностей для загрузки в standalone-таблицу

  1. Создайте внешнюю таблицу загрузки, если она отсутствует.
  2. Создайте внешнюю writable-таблицу, если она отсутствует:
    • если standalone-таблица отсутствует и ее нужно создать при создании внешней таблицы, укажите ключевое слово OPTIONS со значением auto.create.table.enable=true,
    • иначе пропустите ключевое слово OPTIONS или укажите для него значение auto.create.table.enable=false.

Созданные внешние таблицы загрузки можно использовать повторно или удалить.

Загрузка данных в логическую таблицу

Чтобы загрузить данные из топика Kafka в логическую таблицу, выполните INSERT SELECT FROM upload_external_table. В запросе укажите внешнюю таблицу загрузки, определяющую параметры загрузки.

Загрузку данных можно включить в дельту. Чтобы загрузить данные в дельте:

  1. Откройте дельту.
  2. Загрузите нужные данные.
  3. Закройте дельту.

Чтобы данные, записанные вне дельт, были доступны во всех видах запросов на чтение и выгрузку данных, их должна замыкать дельта. При этом дельта может быть пустой. Подробнее см. в разделе Операции в дельте и вне дельты.

Загрузка данных в standalone-таблицу

Чтобы загрузить данные из внешней информационной системы в standalone-таблицу, выполните INSERT SELECT FROM upload_external_table.

В запросе укажите внешнюю таблицу загрузки, определяющую параметры загрузки, а также внешнюю writable-таблицу, ссылающуюся на целевую standalone-таблицу.

При загрузке данных в standalone-таблицу учитывайте ограничения таблиц в конкретной СУБД.

Примеры

Загрузка в логическую таблицу

-- выбор логической базы данных marketing в качестве базы данных по умолчанию
USE marketing;

-- создание логической таблицы
CREATE TABLE sales (
  id BIGINT NOT NULL,
  transaction_date TIMESTAMP NOT NULL,
  product_code VARCHAR(256) NOT NULL,
  product_units BIGINT NOT NULL,
  store_id BIGINT NOT NULL,
  description VARCHAR(256),
  PRIMARY KEY (id)
)
DISTRIBUTED BY (id);

-- создание внешней таблицы загрузки
CREATE UPLOAD EXTERNAL TABLE sales_ext_upload (
  id BIGINT,
  transaction_date TIMESTAMP,
  product_code VARCHAR(256),
  product_units BIGINT,
  store_id BIGINT,
  description VARCHAR(256)
)
LOCATION  'kafka://zk1:2181,zk2:2181,zk3:2181/sales'
FORMAT 'AVRO'
MESSAGE_LIMIT 1000;

-- загрузка данных в логическую таблицу вне дельты
INSERT INTO sales SELECT * FROM sales_ext_upload;

-- открытие новой (горячей) дельты
BEGIN DELTA;

-- загрузка данных в логическую таблицу в дельте
INSERT INTO sales SELECT * FROM sales_ext_upload;

-- закрытие дельты
COMMIT DELTA;

Загрузка в standalone-таблицу

Создание standalone-таблицы через систему (см. параметр auto.create.table.enable=true) и загрузка данных в нее:

-- создание внешней writable-таблицы с созданием связанной standalone-таблицы в датасорсе adp
CREATE WRITABLE EXTERNAL TABLE marketing.agreements_ext_write_adp (
  id BIGINT NOT NULL,
  client_id BIGINT NOT NULL,
  number VARCHAR NOT NULL,
  signature_date DATE,
  effective_date DATE,
  closing_date DATE,
  description VARCHAR,
  PRIMARY KEY(id)
)
DISTRIBUTED BY (id)
LOCATION 'core:adp://marketing.agreements'
OPTIONS ('auto.create.table.enable=true');

-- создание внешней таблицы загрузки
CREATE UPLOAD EXTERNAL TABLE marketing.agreements_ext_upload (
  id BIGINT NOT NULL,
  client_id BIGINT NOT NULL,
  number VARCHAR NOT NULL,
  signature_date DATE,
  effective_date DATE,
  closing_date DATE,
  description VARCHAR
) 
LOCATION  'kafka://$kafka/agreements'
FORMAT 'AVRO'
OPTIONS ('auto.create.sys_op.enable=false')

-- загрузка данных в standalone-таблицу, на которую указывает внешняя writable-таблица agreements_ext_write_adp
INSERT INTO marketing.agreements_ext_write_adp SELECT * FROM marketing.agreements_ext_upload;

Подробнее об опции auto.create.table.enable см. в разделах CREATE WRITABLE EXTERNAL TABLE и CREATE READABLE EXTERNAL TABLE.

Загрузка в существующую standalone-таблицу:

-- создание внешней writable-таблицы, указывающей на существующую standalone-таблицу в датасорсе adg
CREATE WRITABLE EXTERNAL TABLE marketing.payments_ext_write_adg (
id BIGINT NOT NULL,
agreement_id BIGINT,
code VARCHAR(16),
amount DOUBLE,
currency_code VARCHAR(3),
description VARCHAR
)
LOCATION 'core:adg://dtm__marketing__payments';

-- создание внешней таблицы загрузки
CREATE UPLOAD EXTERNAL TABLE marketing.payments_ext_upload (
id BIGINT NOT NULL,
agreement_id BIGINT,
code VARCHAR(16),
amount DOUBLE,
currency_code VARCHAR(3),
description VARCHAR
)
LOCATION  'kafka://$kafka/payments'
FORMAT 'AVRO'
OPTIONS ('auto.create.sys_op.enable=false');

-- загрузка данных в standalone-таблицу, на которую указывает внешняя writable-таблица payments_ext_write_adg
INSERT INTO marketing.payments_ext_write_adg SELECT * FROM marketing.payments_ext_upload;