Загрузка данных

Для загрузки данных нужен существующий топик Kafka. Если в брокере сообщений Kafka настроено автоматическое создание топиков, то дополнительные действия не требуются. Иначе необходимо создать топик, если он еще не создан. Подробнее о создании топиков см. в документации Kafka:

Примечание: загрузка данных возможна только в логическую таблицу. Загрузка данных в логические и материализованные представления недоступна.

Чтобы загрузить данные из внешней информационной системы в логическую таблицу:

  1. Загрузите данные из внешней информационной системы в топик Kafka.
    Данные должны иметь формат, описанный в разделе Формат загрузки данных.
  2. Создайте логическую таблицу, если она еще не создана.
  3. Создайте внешнюю таблицу загрузки, если она еще не создана.
  4. Выполните запрос BEGIN DELTA на открытие дельты, если она еще не открыта.
  5. Выполните запрос INSERT INTO logical_table на загрузку данных из топика в логическую таблицу. В запросе нужно указать внешнюю таблицу загрузки, определяющую параметры загрузки.
    После успешного окончания загрузки данных система вернет ответ с пустым объектом ResultSet.
  6. Если необходимо, загрузите другие данные, например в другие логические таблицы.
    В рамках одной открытой дельты можно выполнять произвольное количество запросов INSERT INTO logical_table, при этом не допускается загрузка различных состояний одного и того же объекта.
  7. Выполните запрос COMMIT DELTA для сохранения изменений и закрытия дельты.

При успешном выполнении последовательности действий загруженные данные сохраняются в качестве актуальных, а предыдущая версия данных, если такая была, становится архивной. Подробнее о версионировании см. в разделе Версионирование данных.

Пока дельта не закрыта, все изменения данных, выполненные в рамках нее, можно отменить (см. ROLLBACK DELTA). Созданные внешние таблицы загрузки можно использовать повторно или удалить.

Пример

-- выбор логической базы данных sales в качестве базы данных по умолчанию
USE sales

-- создание логической таблицы sales
CREATE TABLE sales (
  identification_number INT NOT NULL,
  transaction_date TIMESTAMP NOT NULL,
  product_code VARCHAR(256) NOT NULL,
  product_units INT NOT NULL,
  store_id INT NOT NULL,
  description VARCHAR(256),
  PRIMARY KEY (identification_number)
)
DISTRIBUTED BY (identification_number)

-- создание внешней таблицы загрузки
CREATE UPLOAD EXTERNAL TABLE sales_ext_upload (
  identification_number INT,
  transaction_date TIMESTAMP,
  product_code VARCHAR(256),
  product_units INT,
  store_id INT,
  description VARCHAR(256)
)
LOCATION  'kafka://zk1:2181,zk2:2181,zk3:2181/sales'
FORMAT 'AVRO'
MESSAGE_LIMIT 1000

-- открытие новой (горячей) дельты
BEGIN DELTA

-- запуск загрузки данных в логическую таблицу sales
INSERT INTO sales SELECT * FROM sales.sales_ext_upload

-- закрытие дельты (фиксация изменений)
COMMIT DELTA