Как загрузить данные

Содержание раздела
  1. Подготовка к загрузке
    1. Создание топика (опционально)
    2. Подготовка данных
    3. Подготовка логических сущностей
      1. Подготовка сущностей для загрузки в логическую таблицу
      2. Подготовка сущностей для загрузки в прокси-таблицу
      3. Подготовка сущностей для загрузки в standalone-таблицу
  2. Загрузка данных
  3. Примеры
    1. Загрузка в логическую таблицу
    2. Загрузка в прокси-таблицу
    3. Загрузка в standalone-таблицу

Загружать данные из топика Kafka можно в следующие логические сущности:

Перед загрузкой данных выполните действия, описанные в секции Подготовка к загрузке.

Об успешности загрузки данных можно судить по успешному ответу на запрос загрузки данных. Другие факторы такие, как смещение офсетов в топике или наличие данных в физической таблице <table>_staging, не означают успешность загрузки данных.

Подготовка к загрузке

Создание топика (опционально)

Данные загружаются в систему из сообщений топиков Kafka. Поэтому, если в брокере сообщений Kafka выключено автоматическое создание топиков, создайте топики вручную.

Чтобы создать топик, следуйте любой из инструкций в документации Kafka:

Рекомендации о разделении данных по топикам см. в разделе Рекомендации по топикам Kafka.

Подготовка данных

Чтобы подготовить данные к загрузке:

  1. Используя средства внешней системы, приведите данные в формат, подходящий для загрузки.
  2. Убедись, что каждая запись Avro содержит следующее значения sys_op, или добавьте его, если оно отсутствует:
    • 0 — если нужно добавить или обновить запись логической таблицы;
    • 1 — если нужно удалить запись логической таблицы;
    • пустое значение — если нужно добавить, обновить или удалить запись прокси- или standalone-таблицы. Обновление и удаление записей прокси- или standalone-таблиц таким способом доступно только для СУБД ADP.
  3. Загрузите подготовленные данные в топик Kafka.

Система не валидирует данные при их загрузке. Данные должны быть проверены на соответствие формату загрузки и, если нужно, скорректированы внешней системой перед их загрузкой в Prostore.

Подготовка логических сущностей

Подготовка сущностей для загрузки в логическую таблицу

  1. Создайте логическую таблицу, если она отсутствует.
  2. Создайте внешнюю таблицу для загрузки данных, если она отсутствует:

После завершения загрузки созданные внешние таблицы можно использовать повторно или удалить.

Подготовка сущностей для загрузки в прокси-таблицу

  1. Создайте прокси-таблицу, если она отсутствует.
  2. Создайте внешнюю таблицу для загрузки данных, если она отсутствует:

После завершения загрузки созданные внешние таблицы можно использовать повторно или удалить.

Подготовка сущностей для загрузки в standalone-таблицу

  1. Создайте внешнюю таблицу для загрузки данных, если она отсутствует:
  2. Создайте внешнюю writable-таблицу, если она отсутствует:
    • если standalone-таблица отсутствует и ее нужно создать при создании внешней таблицы, укажите в запросе ключевое слово OPTIONS со значением auto.create.table.enable=true,
    • иначе выполните запрос без ключевого слова OPTIONS.

После завершения загрузки созданные внешние таблицы можно использовать повторно или удалить.

Загрузка данных

Чтобы загрузить данные:

  1. Если необходимо загрузить данные в логическую таблицу и включить операцию загрузки в дельту, а дельта еще не открыта, откройте ее.
  2. Выполните запрос INSERT SELECT FROM external_table:
    • чтобы выполнить запрос в синхронном режиме, отправьте его по JDBC или по HTTP с помощью POST-метода query без параметра async (или с параметром async, имеющим значение false);
    • чтобы выполнить запрос в асинхронном режиме, отправьте его по HTTP с помощью POST-метода query, где в теле сообщения укажите параметр async со значением true.
  3. Если данные загружались в логическую таблицу в рамках дельты и эта операция обновления должна быть последней в дельте, закройте дельту.

При загрузке данных в прокси-таблицы и standalone-таблицы учитывайте ограничения таблиц целевой СУБД.

Примеры

Загрузка в логическую таблицу

-- создание логической таблицы
CREATE TABLE marketing.sales (
  id BIGINT NOT NULL,
  transaction_date TIMESTAMP NOT NULL,
  product_code VARCHAR(256) NOT NULL,
  product_units BIGINT NOT NULL,
  store_id BIGINT NOT NULL,
  description VARCHAR(256),
  PRIMARY KEY (id)
)
DISTRIBUTED BY (id);

-- создание внешней таблицы загрузки
CREATE UPLOAD EXTERNAL TABLE marketing.sales_ext_upload (
  id BIGINT,
  transaction_date TIMESTAMP,
  product_code VARCHAR(256),
  product_units BIGINT,
  store_id BIGINT,
  description VARCHAR(256)
)
LOCATION 'kafka_brokers://10.129.0.22:9092,10.129.0.123:9092/sales_in'
FORMAT 'AVRO'
MESSAGE_LIMIT 1000;

-- загрузка данных в логическую таблицу вне дельты
INSERT INTO marketing.sales SELECT * FROM marketing.sales_ext_upload;

-- выбор логической базы данных marketing в качестве базы данных по умолчанию
USE marketing;

-- открытие новой дельты
BEGIN DELTA;

-- загрузка данных в логическую таблицу в дельте
INSERT INTO sales SELECT * FROM sales_ext_upload;

-- закрытие дельты
COMMIT DELTA;

Загрузка в прокси-таблицу

-- создание внешней readable-таблицы для загрузки данных
CREATE READABLE EXTERNAL TABLE marketing.payments_ext_upload_to_proxy (
  id BIGINT,
  agreement_id BIGINT,
  type_code VARCHAR(16),
  amount DOUBLE,
  currency_code VARCHAR(3),
  description VARCHAR,
  PRIMARY KEY(id)
)
DISTRIBUTED BY (id)
LOCATION 'kafka://$kafka/payments_adp_in'
FORMAT 'AVRO';

-- загрузка данных в прокси-таблицу payments_proxy
INSERT INTO marketing.payments_proxy SELECT * FROM marketing.payments_ext_upload_to_proxy;

Загрузка в standalone-таблицу

Создание standalone-таблицы с помощью системы (см. параметр auto.create.table.enable=true) и загрузка данных в созданную таблицу:

-- создание внешней writable-таблицы с созданием связанной standalone-таблицы в датасорсе adp
CREATE WRITABLE EXTERNAL TABLE marketing.agreements_ext_write_adp (
  id BIGINT NOT NULL,
  client_id BIGINT NOT NULL,
  number VARCHAR NOT NULL,
  signature_date DATE,
  effective_date DATE,
  closing_date DATE,
  description VARCHAR,
  PRIMARY KEY(id)
)
DISTRIBUTED BY (id)
LOCATION 'core:adp://marketing.agreements'
OPTIONS ('auto.create.table.enable=true');

-- создание внешней таблицы загрузки
CREATE UPLOAD EXTERNAL TABLE marketing.agreements_ext_upload (
  id BIGINT NOT NULL,
  client_id BIGINT NOT NULL,
  number VARCHAR NOT NULL,
  signature_date DATE,
  effective_date DATE,
  closing_date DATE,
  description VARCHAR
) 
LOCATION  'kafka://$kafka/agreements'
FORMAT 'AVRO'
OPTIONS ('auto.create.sys_op.enable=false')

-- загрузка данных в standalone-таблицу, на которую указывает внешняя writable-таблица agreements_ext_write_adp
INSERT INTO marketing.agreements_ext_write_adp SELECT * FROM marketing.agreements_ext_upload;

Подробнее об опции auto.create.table.enable см. в разделах CREATE WRITABLE EXTERNAL TABLE и CREATE READABLE EXTERNAL TABLE.

Загрузка в существующую standalone-таблицу:

-- создание внешней writable-таблицы, указывающей на существующую standalone-таблицу в датасорсе adqm
CREATE WRITABLE EXTERNAL TABLE marketing.payments_ext_write_adqm (
id BIGINT NOT NULL,
agreement_id BIGINT,
code VARCHAR(16),
amount DOUBLE,
currency_code VARCHAR(3),
description VARCHAR
)
LOCATION 'core:adqm://dtm__marketing.payments';

-- создание внешней таблицы загрузки
CREATE UPLOAD EXTERNAL TABLE marketing.payments_ext_upload (
id BIGINT NOT NULL,
agreement_id BIGINT,
code VARCHAR(16),
amount DOUBLE,
currency_code VARCHAR(3),
description VARCHAR
)
LOCATION  'kafka://$kafka/payments_in'
FORMAT 'AVRO'
OPTIONS ('auto.create.sys_op.enable=false');

-- загрузка данных в standalone-таблицу, на которую указывает внешняя writable-таблица payments_ext_write_adqm
INSERT INTO marketing.payments_ext_write_adqm SELECT * FROM marketing.payments_ext_upload;