Минимальное развертывание с Kafka

Содержание раздела
  1. Предустановленные программные средства
  2. Сборка Prostore
  3. Конфигурационный файл Prostore application.yml
  4. Настройка СУБД Postgres
  5. Сборка и установка коннектора Kafka-Postgres
  6. Запуск сервисов Apache ZooKeeper и Apache Kafka
  7. Запуск коннектора Kafka-Postgres
  8. Запуск Prostore
  9. Подключение к Prostore с помощью SQL-клиента
  10. Демонстрационный сценарий
    1. Создание необходимых логических сущностей
    2. Создание топика Kafka для последующей загрузки данных
    3. Создание бинарного avro-файла kafka_upload_sales.avro из avro-схемы и данных
    4. Загрузка avro-файла kafka_upload_sales.avro
    5. Загрузка данных
    6. Вставка данных
    7. Выборка данных
    8. Выгрузка в топик Kafka
    9. Удаление логических сущностей

В этом разделе описаны шаги по развертыванию среды в конфигурации, предполагающей единственный датасорс типа ADP. Дополнительная информация приведена в разделе Схемы развертывания.

Предустановленные программные средства

  • OC «Альт 8 СП» (8.4);
  • yum-utils;
  • curl;
  • git;
  • wget;
  • OpenJDK 8;
  • Apache Maven 3.6.3;
  • СУБД PostgreSQL 13;
  • Apache ZooKeeper;
  • Apache Kafka (например, в каталоге /opt/kafka);
  • SQL-клиент, например DBeaver;
  • docker;
  • браузер топиков Kafka с возможностью загрузки бинарных данных, например kafkacat.

Сборка Prostore

# клонирование репозитория Prostore
git clone https://repository.datamart.ru/datamarts/prostore ~/prostore
# запуск сборки Prostore средствами Apache Maven
cd ~/prostore
mvn clean install -DskipTests=true
# создание символической ссылки на файл конфигурации
sudo ln -s ~/prostore/dtm-query-execution-core/config/application.yml ~/prostore/dtm-query-execution-core/target/application.yml
# приведение конфигурационного файла к виду, показанному ниже
sudo nano ~/prostore/dtm-query-execution-core/config/application.yml

Конфигурационный файл Prostore application.yml

# настройки сервера Prostore
server:
# номер порта сервиса метрик
  port: ${DTM_METRICS_PORT:8080}
# настройки сервиса исполнения запросов
core:
# настройки плагинов
  plugins:
# список используемых типов датасорсов
    active: ${CORE_PLUGINS_ACTIVE:ADP}
# настройки сетевых подключений через HTTP-протокол
  http:
# номер порта сервиса исполнения запросов
    port: ${DTM_CORE_HTTP_PORT:9090}
# настройки окружения
  env:
# имя окружения для формирования полных имен логических БД
    name: ${DTM_NAME:test}
# настройки механизма управления кластером Raft
  raft:
# идентификатор этой ноды
    myId: ${RAFT_ID:1}
# номера и сетевые адреса нод кластера, включая эту ноду; адрес указывается в формате host:http_port; 
# строки заполняютcя по количеству нод, остальные остаются незаполненными
    servers:
      pnode.1: ${RAFT_SERVER_1:127.0.0.1:9090}
# настройки сервисной базы данных, хранящей метаинформацию
    servicedb:
# имя БД, в которой размещается сервисная БД; возможно размещение системных и бизнес-данных в одной БД с разделением по разным схемам
      db: ${RAFT_SERVICEDB_DATABASE:test}
# имя схемы СУБД, выделенной для размещения сервисной БД
      schema: ${RAFT_SERVICEDB_SCHEMA:sys1}
# имя пользователя/логин для авторизации в СУБД
      user: ${RAFT_SERVICEDB_USER:}
# пароль для авторизации в СУБД
      password: ${RAFT_SERVICEDB_PASS:}
# сетевой адрес хоста с сервисной БД
      host: ${RAFT_SERVICEDB_HOST:localhost}
# сетевой порт на хосте сервисной БД
      port: ${RAFT_SERVICEDB_PORT:5432}
# настройки jet-коннектора
  jet:
# список сетевых адресов jet-коннекторов для загрузки данных в формате: http://host_1:port_1,host_2:port_2,...
    connectionString: ${KAFKA_JET_WRITERS:}
# имя консьюмер-группы для загрузки данных с помощью jet-коннектора
    consumerGroup: ${KAFKA_JET_CONSUMER_GROUP:jet-load}
# время вычитывания (в миллисекундах) данных из Kafka
    pollDurationMs: ${KAFKA_JET_POLL_DURATION_MS:1000}
# предельный размер буфера чтения из Kafka
    pollBufferSize: ${KAFKA_JET_POLL_BUFFER_SIZE:1000}
# предельный размер буфера вставки в БД
    dbBufferSize: ${KAFKA_JET_DB_BUFFER_SIZE:10000}
# время ожидания (в миллисекундах) get-метода коннектора до тайм-аута
    getTimeoutMs: ${KAFKA_JET_GET_TIMEOUT_MS:5000}
# время ожидания (в миллисекундах) до тайм-аута при проверке доступности коннектора
    checkTimeoutMs: ${KAFKA_JET_CHECK_TIMEOUT_MS:2000}
# количество попыток установки соединения при обращении к коннектору (0 — количество не ограничено, 1 — без повторных попыток)
    retryCount: ${KAFKA_JET_RETRY_COUNT:1}
# настройки источника данных
  datasource:
# настройки для EDML-операторов
    edml:
# количество записей, по умолчанию выгружаемых в одном сообщении топика Каfka
      defaultChunkSize: ${EDML_DEFAULT_CHUNK_SIZE:1000}
# период проверки статуса плагина в миллисекундах
      pluginStatusCheckPeriodMs: ${EDML_STATUS_CHECK_PERIOD_MS:1000}
# время ожидания (в миллисекундах) до тайм-аута при работе с первым смещением в топике Kafka
      firstOffsetTimeoutMs: ${EDML_FIRST_OFFSET_TIMEOUT_MS:15000}
# время ожидания (в миллисекундах) до тайм-аута при ожидании смены смещения в топике Kafka
      changeOffsetTimeoutMs: ${EDML_CHANGE_OFFSET_TIMEOUT_MS:10000}
# настройки взаимодействия сервиса исполнения запросов с брокером сообщений Kafka
  kafka:
# максимальное количество повторных попыток получения офсетов из брокера Kafka
    getOffsetsRetryCount: ${ZOOKEEPER_KAFKA_GET_OFFSETS_RETRY_COUNT:3}
# время ожидания между попытками получения офсетов в миллисекундах
    getOffsetsRetryTimeoutMs: ${ZOOKEEPER_KAFKA_GET_OFFSETS_RETRY_TIMEOUT_MS:1000}
    producer:
      property:
# сериализатор строковых ключей
        key.serializer: org.apache.kafka.common.serialization.StringSerializer
# сериализатор строковых значений
        value.serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      property:
# десериализатор ключей как байтов
        key.deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
# десериализатор значений как байтов
        value.deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
# настройки кластера ZooKeeper для взаимодействия с брокером сообщений Kafka
    cluster:
      zookeeper:
# сетевой адрес хоста ZooKeeper для брокера сообщений Kafka
        connection-string: ${ZOOKEEPER_KAFKA_ADDRESS:}
# время ожидания (в миллисекундах) до тайм-аута при соединении брокера Kafka с хостом ZooKeeper
        connection-timeout-ms: ${ZOOKEEPER_KAFKA_CONNECTION_TIMEOUT_MS:30000}
# время бездействия (в миллисекундах) брокера Kafka в сессии хоста ZooKeeper до тайм-аута
        session-timeout-ms: ${ZOOKEEPER_KAFKA_SESSION_TIMEOUT_MS:86400000}
# корневой путь к хосту ZooKeeper для брокера сообщений Kafka
        chroot: ${ZOOKEEPER_KAFKA_CHROOT:}
# максимальное количество повторных попыток соединения с брокером Kafka
        connectionRetryCount: ${ZOOKEEPER_KAFKA_CONNECTION_RETRY_COUNT:1}
# настройки администратора Kafka
    admin:
# время ожидания (в миллисекундах) входного потока данных брокером Kafka до тайм-аута
      inputStreamTimeoutMs: ${KAFKA_INPUT_STREAM_TIMEOUT_MS:2000}
# параметры регистрации системных событий в сигнальном топике Kafka
    status.event.publish:
# имя топика Kafka, в котором регистрируются события
      topic: ${KAFKA_STATUS_EVENT_TOPIC:status.event.topic}
# разрешение на регистрацию событий в топике
      enabled: ${KAFKA_STATUS_EVENT_ENABLED:false}
# дополнительное разрешение на регистрацию событий по операциям записи
      writeOperationsEnabled: ${KAFKA_STATUS_EVENT_WRITE_OPERATIONS_ENABLED:false}
# дополнительное разрешение на регистрацию событий по созданию и удалению внешних таблиц
      externalDDLEnabled: ${KAFKA_STATUS_EVENT_EXTERNAL_DDL_ENABLED:false}
# настройки ADP
adp:
# настройки датасорсов типа ADP
  datasource:
# отметка элемента в массиве датасорсов и имя датасорса; каждый элемент массива содержит настройки одного датасорса типа ADP
    - name: ADP
# окружение (указывает базу данных в датасорсе)
      env: ${ADP_ENV_NAME:${DTM_NAME:test}}
# имя пользователя/логин для авторизации в датасорсе
      user: ${ADP_USERNAME:dtm}
# пароль для авторизации в датасорсе
      password: ${ADP_PASS:}
# сетевой адрес хоста датасорса
      host: ${ADP_HOST:localhost}
# сетевой порт на хосте датасорса
      port: ${ADP_PORT:5432}
# настройки механизма загрузки данных (Kafka-Postgres writer) в датасорс
      mppw:
# список сетевых адресов и путь к REST-интерфейсу для загрузки данных в формате: http://host_1:port_1,host_2:port_2,.../newdata/start
        restStartLoadUrl: ${ADP_REST_START_LOAD_URL:http://localhost:8096/newdata/start}
# список сетевых адресов и путь к REST-интерфейсу для остановки загрузки данных в формате: http://host_1:port_1,host_2:port_2,.../newdata/stop
        restStopLoadUrl: ${ADP_REST_STOP_LOAD_URL:http://localhost:8096/newdata/stop}
# список сетевых адресов и путь для получения информации о версии коннектора в формате: http://host_1:port_1,host_2:port_2,.../versions
        restVersionUrl: ${ADP_MPPW_CONNECTOR_VERSION_URL:http://localhost:8096/versions}
# путь метода get-коннектора
        restGetEndpoint: ${ADP_MPPW_GET_ENDPOINT:/newdata/get}
# уникальное имя консьюмер-группы для загрузки данных в датасорс; для разных датасорсов одного типа должны быть указаны разные имена
        kafkaConsumerGroup: ${ADP_KAFKA_CONSUMER_GROUP:adp-load}
# время ожидания (в миллисекундах) get-метода коннектора до тайм-аута
        restGetTimeoutMs: ${ADP_REST_GET_TIMEOUT_MS:5000}
# время ожидания (в миллисекундах) до тайм-аута при проверке доступности коннектора
        checkTimeoutMs: ${ADP_MPPW_CONNECTOR_CHECK_TIMEOUT_MS:2000}
# количество попыток установки соединения при обращении к коннектору (0 — количество не ограничено, 1 — без повторных попыток)
        retryCount: ${ADP_MPPW_CONNECTOR_RETRY_COUNT:1}
# настройки механизма выгрузки данных (Kafka-Postgres reader) из датасорса
      mppr:
# список сетевых адресов и путь для запросов на выгрузку данных в формате: http://host_1:port_1,host_2:port_2,.../query
        restLoadUrl: ${ADP_MPPR_QUERY_URL:http://localhost:8094/query}
# список сетевых адресов и путь для получения информации о версии коннектора в формате: http://host_1:port_1,host_2:port_2,.../versions
        restVersionUrl: ${ADP_MPPR_CONNECTOR_VERSION_URL:http://localhost:8094/versions}
# время ожидания (в миллисекундах) до тайм-аута при проверке доступности коннектора
        checkTimeoutMs: ${ADP_MPPR_CONNECTOR_CHECK_TIMEOUT_MS:2000}
 # количество попыток установки соединения при обращении к коннектору (0 — количество не ограничено, 1 — без повторных попыток)
        retryCount: ${ADP_MPPR_CONNECTOR_RETRY_COUNT:1}

Далее конфигурационный файл application.yml обозначается термином «конфигурация Prostore».

Настройка СУБД Postgres

# создание в СУБД Postgres SUPERUSER-пользователя c именем и паролем,
# указанными в конфигурации Prostore
# (значения параметров (adp:datasource:user) и (adp:datasource:password) соответственно)
cd /
sudo -u postgres psql -c 'CREATE ROLE dtm WITH LOGIN SUPERUSER'
sudo -u postgres psql -c "ALTER ROLE dtm WITH PASSWORD 'dtm'"
# создание базы данных с именем test, указанным в конфигурации Prostore (env: name)
sudo -u postgres psql -c 'CREATE DATABASE test'
# перезапуск сервиса Postgresql
sudo systemctl reload postgresql-13

Сборка и установка коннектора Kafka-Postgres

# клонирование репозитория kafka-postgres-connector
git clone https://repository.datamart.ru/datamarts/kafka-postgres-connector ~/kafka-postgres-connector
# запуск сборки коннектора kafka-postgres средствами Apache Maven
cd ~/kafka-postgres-connector
mvn clean install -DskipTests=true
# приведение конфигурационных файлов kafka-postgres-writer и kafka-postgres-reader к виду,
# показанному ниже, чтобы значения параметров совпадали со значениями соответствующих параметров конфигурации Prostore
# datasource: postgres: database ~ env: name,
# datasource: postgres: user     ~ adp: datasource: user,
# datasource: postgres: password ~ adp: datasource: password,
# datasource: postgres: hosts    ~ adp: datasource: host, adp: datasource: port
sudo nano ~/kafka-postgres-connector/kafka-postgres-writer/src/main/resources/application.yml
sudo nano ~/kafka-postgres-connector/kafka-postgres-reader/src/main/resources/application.yml
# создание символических ссылок на файлы конфигурации
sudo ln -s ~/kafka-postgres-connector/kafka-postrges-writer/src/main/resources/application.yml ~/kafka-postgres-connector/kafka-postrges-writer/target/application.yml
sudo ln -s ~/kafka-postgres-connector/kafka-postrges-reader/src/main/resources/application.yml ~/kafka-postgres-connector/kafka-postrges-reader/target/application.yml
Конфигурационный файл kafka-postgres-writer `application.yml`
# настройки журналирования
logging:
  # уровни журналирования событий
  level:
    # уровень важности сообщений, записываемых в лог-файл
    ru.datamart.kafka: ${LOG_LEVEL:DEBUG}
    # уровень важности сообщений, записываемых в лог-файл по событиям Kafka
    org.apache.kafka: ${KAFKA_LOG_LEVEL:INFO}

# настройки HTTP-подключений
http:
  # порт, на котором работает коннектор
  port: ${SERVER_PORT:8096}

# настройки для работы с Vertx
vertx:
  # настройки пулов Vertx
  pools:
    # максимальный размер пула потоков, обрабатывающих события Vertx
    eventLoopPoolSize: ${VERTX_EVENT_LOOP_SIZE:12}
    # максимальный размер пула потоков, выполняющих долгие операции
    workersPoolSize: ${VERTX_WORKERS_POOL_SIZE:32}
  # настройки вертиклов Vertx
  verticle:
    query:
      # количество экземпляров вертиклов, обрабатывающих запросы
      instances: ${QUERY_VERTICLE_INSTANCES:12}
    insert:
      # максимальный размер пула потоков, вставляющих данные
      poolSize: ${INSERT_WORKER_POOL_SIZE:32}
      # периодичность вставки новых данных (в миллисекундах)
      insertPeriodMs: ${INSERT_PERIOD_MS:1000}
      # размер пакета операций при вставке данных
      batchSize: ${INSERT_BATCH_SIZE:500}
    consumer:
      # максимальный размер пула потоков, считывающих данные
      poolSize: ${KAFKA_CONSUMER_WORKER_POOL_SIZE:32}
      # максимальный размер результата, возвращаемого по FETCH-запросу к ADP
      maxFetchSize: ${KAFKA_CONSUMER_MAX_FETCH_SIZE:10000}
    commit:
      # размер пула потоков, записывающих смещение в топиках Kafka
      poolSize: ${KAFKA_COMMIT_WORKER_POOL_SIZE:1}
      # периодичность записи смещения в топиках Kafka (в миллисекундах)
      commitPeriodMs: ${KAFKA_COMMIT_WORKER_COMMIT_PERIOD_MS:1000}

# настройки для работы с брокером сообщений Kafka
client:
  kafka:
    # настройки консьюмера (потребителя) Kafka
    consumer:
      # периодичность проверки (в миллисекундах) статуса брокера сообщений Kafka
      checkingTimeoutMs: ${KAFKA_CHECKING_TIMEOUT_MS:10000}
      # время ожидания (в миллисекундах) ответа от брокера сообщений Kafka до тайм-аута
      responseTimeoutMs: ${KAFKA_RESPONSE_TIMEOUT_MS:10000}
      # количество консьюмеров Kafka
      consumerSize: ${KAFKA_CONSUMER_SIZE:10}
      # время ожидания (в миллисекундах) до закрытия соединения с брокером сообщений Kafka
      closeConsumersTimeout: ${KAFKA_CLOSE_CONSUMER_TIMEOUT:15000}
      # свойства консьюмера в соответствии с конфигурацией консьюмеров Kafka (https://kafka.apache.org/documentation/#consumerconfigs)
      property:
        # режим обнуления смещения в топиках Kafka
        auto.offset.reset: ${KAFKA_AUTO_OFFSET_RESET:earliest}
        # признак автоматической записи смещения в топиках Kafka
        enable.auto.commit: ${KAFKA_AUTO_COMMIT:false}
        # периодичность (в миллисекундах) автоматической записи смещения в топиках Kafka
        auto.commit.interval.ms: ${KAFKA_AUTO_INTERVAL_MS:1000}

# настройки для работы с ADP
datasource:
  postgres:
    # имя базы данных ADP
    database: ${POSTGRES_DB_NAME:test}
    # имя пользователя/логин для авторизации в ADP
    user: ${POSTGRES_USERNAME:dtm}
    # пароль для авторизации в ADP
    password: ${POSTGRES_PASS:dtm}
    # сетевой адрес хоста с ADP и номер порта на хосте
    hosts: ${POSTGRES_HOSTS:postgres.host:5432}
    # максимальный размер пула потоков
    poolSize: ${POSTGRES_POOLSIZE:10}
    # максимальный размер кэша запроса prepared statement   
    preparedStatementsCacheMaxSize: ${POSTGRES_CACHE_MAX_SIZE:256}
    # максимальный размер запроса prepared statement, который может быть закэширован
    preparedStatementsCacheSqlLimit: ${POSTGRES_CACHE_SQL_LIMIT:2048}
    # признак кэширования запросов prepared statement
    preparedStatementsCache: ${POSTGRES_CACHE:true}
    # количество попыток установки соединения (0 — количество не ограничено, 1 — без повторных попыток)
    connectRetryCount: ${POSTGRES_CONNECT_RETRY_COUNT:3}
    # время ожидания между попытками установки соединения в миллисекундах
    connectRetryTimeoutMs: ${POSTGRES_CONNECT_RETRY_TIMEOUT_MS:1000}
    # время бездействия подключения к датасорсу до тайм-аута и закрытия подключения
    idleTimeoutMs: ${POSTGRES_IDLE_TIMEOUT_MS:60000}
    # время поддержания подключения к датасорсу до его закрытия (0 — время не ограничено)
    maxLifetimeTimeoutMs: ${POSTGRES_MAX_LIFETIME_TIMEOUT_MS:0}
Конфигурационный файл kafka-postgres-reader `application.yml`
# настройки журналирования
logging:
  # уровни журналирования событий
  level:
    # уровень важности сообщений, записываемых в лог-файл
    ru.datamart.kafka: ${LOG_LEVEL:DEBUG}
    # уровень важности сообщений, записываемых в лог-файл по событиям Kafka 
    org.apache.kafka: ${KAFKA_LOG_LEVEL:INFO}

# настройки HTTP-подключений
http:
  # порт, на котором работает коннектор
  port: ${SERVER_PORT:8094}

# настройки для работы с Vertx
vertx:
  # настройки пулов Vertx
  pools:
    # максимальный размер пула потоков, обрабатывающих события Vertx
    eventLoopPoolSize: ${VERTX_EVENT_LOOP_SIZE:12}
    # максимальный размер пула потоков, выполняющих долгие операции
    workersPoolSize: ${VERTX_WORKERS_POOL_SIZE:32}
  # настройки вертиклов Vertx
  verticle:
    query:
      # количество экземпляров, принимающих запросы
      instances: ${QUERY_VERTICLE_INSTANCES:12}
    producer:
      # количество экземпляров, записывающих данные
      instances: ${PRODUCER_VERTICLE_INSTANCES:6}
      # максимальный размер пула потоков, записывающих данные
      poolSize: ${PRODUCER_POOL_SIZE:12}
    executor:
      # максимальный размер пула потоков, обрабатывающих запросы в базу данных
      poolSize: ${EXECUTOR_POOL_SIZE:12}

reader:
  # максимальный размер буфера вычитанных строк из базы данных
  uploadChunkBuffer: ${UPLOAD_CHUNK_BUFFER:1000}

# настройки для работы с ADP
datasource:
  postgres:
    # имя базы данных ADP
    database: ${POSTGRES_DB_NAME:test}
    # имя пользователя/логин для авторизации в ADP
    user: ${POSTGRES_USERNAME:dtm}
    # пароль для авторизации в ADP
    password: ${POSTGRES_PASS:dtm}
    # сетевой адрес хоста с ADP и номер порта на хосте
    hosts: ${POSTGRES_HOSTS:postgres.host:5432}
    # максимальный размер пула потоков
    poolSize: ${POSTGRES_POOLSIZE:10}
    pipeliningLimit: ${POSTGRES_PIPELINING_LIMIT:1}
    # максимальный размер кэша запроса prepared statement 
    preparedStatementsCacheMaxSize: ${POSTGRES_CACHE_MAX_SIZE:256}
    # максимальный размер запроса prepared statement, который может быть закэширован  
    preparedStatementsCacheSqlLimit: ${POSTGRES_CACHE_SQL_LIMIT:2048}
    # признак кэширования запросов prepared statement
    preparedStatementsCache: ${POSTGRES_CACHE:true}
    # максимальный размер результата, возвращаемого по FETCH-запросу к датасорсу  
    fetchSize: ${POSTGRES_FETCH_SIZE:10000}
    # время бездействия подключения к датасорсу до тайм-аута и закрытия подключения
    idleTimeoutMs: ${POSTGRES_IDLE_TIMEOUT_MS:60000}
    # время поддержания подключения к датасорсу до его закрытия (0 — время не ограничено)
    maxLifetimeTimeoutMs: ${POSTGRES_MAX_LIFETIME_TIMEOUT_MS:0}

# настройки для работы с брокером сообщений Kafka
kafka:
  client:
    property:
      # сериализатор строковых ключей
      key.serializer: org.apache.kafka.common.serialization.ByteArraySerializer
      # сериализатор строковых значений
      value.serializer: org.apache.kafka.common.serialization.ByteArraySerializer

Запуск сервисов Apache ZooKeeper и Apache Kafka

# запуск одного экземпляра сервера ZooKeeper, если он еще не запущен
sudo systemctl start zookeeper
# запуск сервера Kafka и проверка его состояния
sudo systemctl start kafka
sudo systemctl status kafka

Запуск коннектора Kafka-Postgres

# запуск kafka-postgres-writer в отдельном окне терминала 
cd ~/kafka-postgres-connector/kafka-postgres-writer/target
java -jar kafka-postgres-writer-<version>.jar
# запуск kafka-postgres-reader в отдельном окне терминала
cd ~/kafka-postgres-connector/kafka-postgres-reader/target
java -jar kafka-postgres-reader-<version>.jar

Запуск Prostore

Запуск с номером порта для сбора метрик, указанным в конфигурации Prostore (по умолчанию — 8080):

# запуск файла dtm-query-execution-core-<version>.jar (например, dtm-query-execution-core-6.5.0.jar)
cd ~/prostore/dtm-query-execution-core/target
java -jar dtm-query-execution-core-<version>.jar

Чтобы запустить Prostore с другим номером порта для сбора метрик, задайте нужное значение с помощью параметра конфигурации server:port или переменной окружения DTM_METRICS_PORT. Подробнее о параметрах конфигурации и способах их переопределения см. в разделе Конфигурация ноды.

Подключение к Prostore с помощью SQL-клиента

Чтобы подключиться к системе, следуйте инструкциям в разделе Подключение с помощью SQL-клиента.

Демонстрационный сценарий

Создание необходимых логических сущностей

-- создание логической базы данных
CREATE DATABASE marketing;

-- выбор логической БД по умолчанию
USE marketing;

-- создание логической таблицы в БД marketing
CREATE TABLE sales (
  id BIGINT NOT NULL,
  transaction_date TIMESTAMP NOT NULL,
  product_code VARCHAR(256) NOT NULL,
  product_units BIGINT NOT NULL,
  store_id BIGINT NOT NULL,
  description VARCHAR(256),
  PRIMARY KEY (id)
)
DISTRIBUTED BY (id);

-- создание внешней таблицы загрузки
CREATE UPLOAD EXTERNAL TABLE sales_ext_upload (
  id BIGINT,
  transaction_date TIMESTAMP,
  product_code VARCHAR(256),
  product_units BIGINT,
  store_id BIGINT,
  description VARCHAR(256)
)
LOCATION  'kafka://localhost:2181/salesTopic'
FORMAT 'AVRO'
MESSAGE_LIMIT 1000;

-- создание логического представления stores_by_sold_products
CREATE VIEW stores_by_sold_products AS
  SELECT store_id, SUM(product_units) AS product_amount
  FROM sales
  GROUP BY store_id
  ORDER BY product_amount DESC
  LIMIT 30;
  
-- создание внешней таблицы выгрузки в топик Kafka "salesTopicOut"
CREATE DOWNLOAD EXTERNAL TABLE sales_ext_download (
  id BIGINT,
  transaction_date TIMESTAMP,
  product_code VARCHAR(256),
  product_units BIGINT,
  store_id BIGINT,
  description VARCHAR(256)
)
LOCATION  'kafka://localhost:2181/salesTopicOut'
FORMAT 'AVRO'
CHUNK_SIZE 1000;

Создание топика Kafka для последующей загрузки данных

Создание топика Kafka salesTopic в терминале:

cd /opt/kafka/bin
bash kafka-topics.sh --create --replication-factor 1 --partitions 1 --topic salesTopic --zookeeper localhost:2181

Создание бинарного avro-файла kafka_upload_sales.avro из avro-схемы и данных

JSON-файл avro-схемы `kafka_upload_sales.avsc`
{
  "name": "sales",
  "namespace": "sales",
  "type": "record",
  "fields": [
    {
      "name": "id",
      "type": "long"
    },
    {
      "name": "transaction_date",
      "type": {
        "type": "long",
        "logicalType": "timestamp-micros"
      }
    },
    {
      "name": "product_code",
      "type": "string"
    },
    {
      "name": "product_units",
      "type": "long"
    },
    {
      "name": "store_id",
      "type": "long"
    },
    {
      "name": "description",
      "type": "string"
    },
    {
      "name": "sys_op",
      "type": "int"
    }
  ]
}
JSON-файл данных `kafka_upload_sales.json`
[
  {
    "id": 1000111,
    "transaction_date": 1614269474000000,
    "product_code": "ABC102101",
    "product_units": 2,
    "store_id": 1000012345,
    "description": "Покупка по акции 1+1",
    "sys_op": 0
  },
  {
    "id": 1000112,
    "transaction_date": 1614334214000000,
    "product_code": "ABC102001",
    "product_units": 1,
    "store_id": 1000000123,
    "description": "Покупка без акций",
    "sys_op": 0
  },
  {
    "id": 1000020,
    "transaction_date": 1614636614000000,
    "product_code": "ABC102010",
    "product_units": 4,
    "store_id": 1000000123,
    "description": "Покупка по акции 1+1",
    "sys_op": 1
  }
]
бинарный AVRO-файл `kafka_upload_sales.avro`

Сохранить бинарный файл

Загрузка avro-файла kafka_upload_sales.avro

Загрузка avro-файла kafka_upload_sales.avro в топик Kafka salesTopic через терминал с помощью kafkacat:

# получение docker-образа kafkacat
sudo docker pull edenhill/kcat:1.7.0
# запуск docker-образа kafkacat для загрузки в топик salesTopic
# avro-файла /opt/kafka/sales/kafka_upload_sales.avro
sudo docker run -it --network host \
--volume /opt/kafka/sales/kafka_upload_sales.avro:/data/kafka_upload_sales.avro \
edenhill/kcat:1.7.0 -b localhost:9092 -t salesTopic -P /data/kafka_upload_sales.avro

Загрузка данных

-- открытие новой дельты
BEGIN DELTA;
-- запуск загрузки данных в логическую таблицу sales
INSERT INTO sales SELECT * FROM sales_ext_upload;
-- закрытие дельты
COMMIT DELTA;

Вставка данных

-- открытие новой дельты
BEGIN DELTA;

-- запуск вставки данных в логическую таблицу sales
INSERT INTO sales
(id, transaction_date, product_code, product_units, store_id, description)
VALUES
(2000111, '2020-05-01 13:14:16', 'ABC202010', 7, 1000000123, 'Покупка без акций'),
(2000112, '2020-05-02 16:13:17', 'ABC202011', 11, 1000000456, 'Покупка без акций'),
(2000113, '2020-05-03 21:15:17', 'ABC202012', 5, 1000000789, 'Покупка без акций'),
(2000114, '2020-05-04 23:03:13', 'ABC202013', 7, 1000000123, 'Покупка без акций'),
(2000115, '2020-05-05 14:10:21', 'ABC202014', 21, 1000000623, 'Покупка без акций'),
(2000116, '2020-06-12 08:43:56', 'ABC202015', 32, 1000000987, 'Покупка без акций');

-- закрытие дельты
COMMIT DELTA;

Выборка данных

-- запрос с неявным указанием столбцов и ключевым словом WHERE
SELECT * FROM sales
WHERE store_id = 1000000123;

-- запрос с агрегацией, группировкой и сортировкой данных, а также выбором первых 5 строк
SELECT s.store_id, SUM(s.product_units) AS product_amount
FROM sales AS s
GROUP BY (s.store_id)
ORDER BY product_amount DESC
LIMIT 5;

-- запрос к логическому представлению stores_by_sold_products
SELECT * FROM stores_by_sold_products;

Выгрузка в топик Kafka

-- запуск выгрузки данных из логической таблицы sales
INSERT INTO sales_ext_download 
SELECT * FROM sales WHERE product_units > 2;

Удаление логических сущностей

-- удаление внешней таблицы загрузки
DROP UPLOAD EXTERNAL TABLE sales_ext_upload;

-- удаление внешней таблицы выгрузки
DROP DOWNLOAD EXTERNAL TABLE sales_ext_download;