Загрузка данных

Система позволяет параллельно загружать большие объемы данных. Данные можно загружать только в логические таблицы. Загрузка данных в логические и материализованные представления недоступна.

Под большим объемом данных подразумевается количество записей от нескольких сотен до нескольких миллионов. Для загрузки небольшого объема данных можно использовать функцию обновления данных.

Данные загружаются в виде сообщений Kafka, поэтому для их загрузки нужен топик Kafka. Если в брокере сообщений Kafka настроено автоматическое создание топиков, то дополнительные действия не требуются. Иначе топик необходимо создать, если он отсутствует. Подробнее о создании топиков см. в документации Kafka:

Чтобы загрузить данные из внешней информационной системы в логическую таблицу:

  1. Загрузите данные из внешней информационной системы в топик Kafka.
    Данные должны иметь формат, описанный в разделе Формат загрузки данных.
  2. Создайте логическую таблицу, если она еще не создана.
  3. Создайте внешнюю таблицу загрузки, если она еще не создана.
  4. Выполните запрос BEGIN DELTA на открытие дельты, если она еще не открыта.
  5. Выполните запрос INSERT INTO logical_table на загрузку данных из топика в логическую таблицу. В запросе нужно указать внешнюю таблицу загрузки, определяющую параметры загрузки.
  6. Если необходимо, загрузите и (или) обновите другие данные.
    В открытой дельте можно выполнить произвольное количество запросов на обновление и загрузку данных, а также отменить изменения. При этом не допускается добавление информации о различных состояниях одного объекта (то есть различных записей с одинаковым первичным ключом) в одной дельте.
  7. Выполните запрос COMMIT DELTA для сохранения изменений и закрытия дельты.

При успешном выполнении действий состояние данных системы обновляется, как описано в разделе Версионирование данных.

Пока дельта не закрыта, изменения данных этой дельты можно отменить с помощью запроса ROLLBACK DELTA. Созданные внешние таблицы загрузки можно использовать повторно или удалить.

Примеры

-- выбор логической базы данных sales в качестве базы данных по умолчанию
USE sales;

-- создание логической таблицы sales
CREATE TABLE sales (
  id INT NOT NULL,
  transaction_date TIMESTAMP NOT NULL,
  product_code VARCHAR(256) NOT NULL,
  product_units INT NOT NULL,
  store_id INT NOT NULL,
  description VARCHAR(256),
  PRIMARY KEY (id)
)
DISTRIBUTED BY (id);

-- создание внешней таблицы загрузки
CREATE UPLOAD EXTERNAL TABLE sales_ext_upload (
  id INT,
  transaction_date TIMESTAMP,
  product_code VARCHAR(256),
  product_units INT,
  store_id INT,
  description VARCHAR(256)
)
LOCATION  'kafka://zk1:2181,zk2:2181,zk3:2181/sales'
FORMAT 'AVRO'
MESSAGE_LIMIT 1000;

-- открытие новой (горячей) дельты
BEGIN DELTA;

-- запуск загрузки данных в логическую таблицу sales
INSERT INTO sales SELECT * FROM sales.sales_ext_upload;

-- закрытие дельты (фиксация изменений)
COMMIT DELTA;