Потоковая загрузка данных

Содержание раздела
  1. Предварительный шаг для standalone-таблиц
  2. Как загрузить поток данных
    1. Пример реализации HTTP-клиента для потоковой загрузки данных в CSV-формате
    2. Пример реализации HTTP-клиента для потоковой загрузки данных в Avro-формате

Система поддерживает потоковую загрузку данных в следующие логические сущности:

Потоковая загрузка данных доступна при программном подключении по протоколам HTTP/2 и HTTP/1.1.

Для вставки небольших объемов данных до сотни записей можно использовать функцию обновления данных.

Предварительный шаг для standalone-таблиц

Загрузка данных в standalone-таблицу выполняется с помощью внешней writable-таблицы. Если она отсутствует, ее необходимо создать.

Чтобы создать внешнюю writable-таблицу, выполните запрос CREATE WRITABLE EXTERNAL TABLE:

  • если standalone-таблица отсутствует и ее нужно создать при создании внешней таблицы, укажите в запросе ключевое слово OPTIONS со значением auto.create.table.enable=true;
  • иначе выполните запрос без ключевого слова OPTIONS.

Как загрузить поток данных

Чтобы загрузить поток данных (см. примеры реализации HTTP-клиента для потоковой загрузки ниже):

  1. Откройте HTTP-соединение по протоколу HTTP/2 или HTTP/1.1.
  2. Вызовите HTTP-метод upload для отправки потока данных.
  3. Отправьте порции данных в формате Avro или CSV.
  4. Закройте HTTP-соединение.

Пример реализации HTTP-клиента для потоковой загрузки данных в CSV-формате

Ниже показан пример реализации HTTP-клиента для потоковой загрузки данных в CSV-формате. Код написан на Java с использованием Vertx.

// создание инстанса vertx
Vertx vertx = Vertx.vertx();

// создание HTTP-клиента с заданными параметрами
HttpClientOptions clientOptions = new HttpClientOptions();
// установка протокола HTTP/2
clientOptions.setProtocolVersion(HttpVersion.HTTP_2);
// отключение переключения с HTTP/1.1 на HTTP/2
clientOptions.setHttp2ClearTextUpgrade(false);
HttpClient httpClient = vertx.createHttpClient(clientOptions);

// отправка запроса на потоковую загрузку данных в логическую таблицу marketing.sales
// запрос с идентификатором 12345 (queryId=12345) добавит и (или) обновит записи таблицы (sysOp=0)
// при разрыве соединения уже загруженные данные будут сохранены (commitOnDisconnect=true)
Future<Void> requestFuture = httpClient.request(HttpMethod.POST, 9090, "localhost", "/api/v1/datamarts/marketing/entities/sales/upload?sysOp=0&queryId=12345&commitOnDisconnect=true")
    .compose(request -> {
        // включение разделения данных на порции
        request.setChunked(true);
        // добавление заголовка Content-Type, определяющего тип загружаемых данных (CSV)
        request.putHeader(HttpHeaders.CONTENT_TYPE, "text/csv");
        // старт потока загрузки данных
        Future<Void> requestSendFuture = request.sendHead()
            .compose(unused -> {
                // отправка пользовательских данных в выбранном формате
                // используйте request.write(Buffer) для отправки порции данных
                // после отправки всех порций данных закончите запрос с помощью request.end()
              
                // пример: загрузка всех порций данных одним вызовом
                return request.end(csvData());
            });
        // обработка ответов от Prostore
        Future<Void> responseFuture = request.response()
            .compose(response -> {
                // вывод кода ответа
                System.out.println("Response status code: " + response.statusCode());
                Promise<Void> promise = Promise.promise();
                // обработка ошибок
                response.exceptionHandler(promise::tryFail);
                // обработка данных, полученных от Prostore
                response.handler(dataFromServer -> {
                    System.out.println("Response data: " + dataFromServer.toString());
                });
                // завершение формирования и обработки ответа
                response.endHandler(v -> {
                    promise.tryComplete();
                });
                return promise.future();
            })
            // финальная обработка ответа: вывод успешного ответа или логирование ошибки
            .onComplete(ar -> {
                request.reset();
                if (ar.succeeded()) {
                    System.out.println("Response success");
                }
                if (ar.failed()) {
                    ar.cause().printStackTrace();
                }
            });
        return Future.join(requestSendFuture, responseFuture)
            .<Void>mapEmpty();
    })
    // завершение потоковой загрузки данных: вывод успешного ответа или логирование ошибки
    .onSuccess(v -> System.out.println("Upload done"))
    .onFailure(Throwable::printStackTrace);
requestFuture.toCompletionStage()
    .toCompletableFuture()
    // ожидание завершения процесса
    .get();
// закрытие инстанса vertx
vertx.close();

Пример реализации HTTP-клиента для потоковой загрузки данных в Avro-формате

Ниже показан пример реализации HTTP-клиента для потоковой загрузки данных в Avro-формате. Код написан на Java с использованием Vertx.

// создание инстанса vertx
Vertx vertx = Vertx.vertx();

// создание HTTP-клиента с заданными параметрами
HttpClientOptions clientOptions = new HttpClientOptions();
// установка протокола HTTP/2
clientOptions.setProtocolVersion(HttpVersion.HTTP_2);
// отключение переключения с HTTP/1.1 на HTTP/2
clientOptions.setHttp2ClearTextUpgrade(false);
HttpClient httpClient = vertx.createHttpClient(clientOptions);

// отправка запроса на потоковую загрузку данных в логическую таблицу marketing.sales
// запрос с идентификатором 12345 (queryId=12345) добавит и (или) обновит записи таблицы (sysOp=0)
// при разрыве соединения уже загруженные данные будут сохранены (commitOnDisconnect=true)
Future<Void> requestFuture = httpClient.request(HttpMethod.POST, 9090, "localhost", "/api/v1/datamarts/marketing/entities/sales/upload?sysOp=0&queryId=12345&commitOnDisconnect=true")
    .compose(request -> {
        // включение разделения данных на порции
        request.setChunked(true);
        // добавление заголовка Content-Type, определяющего тип загружаемых данных (Avro)
        request.putHeader(HttpHeaders.CONTENT_TYPE, "application/avro");
        // старт потока загрузки данных
        Future<Void> requestSendFuture = request.sendHead()
            .compose(unused -> {
                // отправка пользовательских данных в выбранном формате
                // используйте request.write(Buffer) для отправки порции данных
                // после отправки всех порций данных закончите запрос с помощью request.end()
          
                // пример: загрузка всех порций данных одним вызовом
                return request.end(avroData());
            });
        // обработка ответов от Prostore
        Future<Void> responseFuture = request.response()
            .compose(response -> {
                // вывод кода ответа
                System.out.println("Response status code: " + response.statusCode());
                Promise<Void> promise = Promise.promise();
                // обработка ошибок
                response.exceptionHandler(promise::tryFail);
                // обработка данных, полученных от Prostore
                response.handler(dataFromServer -> {
                    System.out.println("Response data: " + dataFromServer.toString());
                });
                // завершение формирования и обработки ответа
                response.endHandler(v -> {
                    promise.tryComplete();
                });
                return promise.future();
            })
            // финальная обработка ответа: вывод успешного ответа или логирование ошибки
            .onComplete(ar -> {
                request.reset();
                if (ar.succeeded()) {
                    System.out.println("Response success");
                }
                if (ar.failed()) {
                    ar.cause().printStackTrace();
                }
            });
        return Future.join(requestSendFuture, responseFuture)
            .<Void>mapEmpty();
    })
    // завершение потоковой загрузки данных: вывод успешного ответа или логирование ошибки
    .onSuccess(v -> System.out.println("Upload done"))
    .onFailure(Throwable::printStackTrace);
requestFuture.toCompletionStage()
    .toCompletableFuture()
    // ожидание завершения процесса
    .get();
// закрытие инстанса vertx
vertx.close();