Конфигурация коннекторов

Содержание раздела
  1. Переопределение параметров конфигурации
  2. Конфигурация коннектора Kafka-Clickhouse reader
  3. Конфигурация коннектора Kafka-Clickhouse writer
  4. Конфигурация коннектора Kafka-Postgres reader
  5. Конфигурация коннектора Kafka-Postgres writer
  6. Конфигурация коннектора Kafka Jet writer

Следующие коннекторы требуют настройки конфигурации:

  • Kafka-Clickhouse reader connector,
  • Kafka-Clickhouse writer connector,
  • Kafka-Postgres reader connector,
  • Kafka-Postgres writer connector,
  • Kafka Jet writer connector.

Ссылки на репозитории с исходным кодом коннекторов доступны в разделе Ресурсы.

Конфигурация коннектора представляет собой текстовый YAML-файл, параметры которого организованы в древовидную структуру. Ниже приведены примеры конфигурационных файлов, где для каждого параметра указаны:

  • назначение,
  • имя,
  • имя переменной окружения,
  • значение параметра.

Переопределение параметров конфигурации

По умолчанию коннектор использует конфигурационный файл, входящий в состав JAR-файла коннектора. Значения параметров конфигурации можно переопределить любым из способов:

  • поместить свой YAML-файл с нужными значениями параметров в директорию <рабочая_директория_коннектора>/config;
  • задать переменные окружения.

Если параметр определен в нескольких источниках, то наиболее приоритетным считается значение переменной окружения, следующим источником по приоритету — конфигурация в директории config, затем — конфигурация в составе JAR-файла.

Переопределить значение параметра с помощью переменной окружения можно, если эта переменная определена в конфигурационном файле. Если вы поместили свой конфигурационный файл в директорию config, он должен определять нужные переменные. При использовании конфигурационного файла из JAR-файла ничего определять не нужно, так как все переменные в нем уже определены.

Конфигурация коннектора Kafka-Clickhouse reader

# настройки журналирования
logging:
  # уровни журналирования событий
  level:
    # уровень важности сообщений, записываемых в лог-файл
    ru.datamart.kafka: ${LOG_LEVEL:DEBUG}
    # уровень важности сообщений, записываемых в лог-файл по событиям Kafka
    org.apache.kafka: ${KAFKA_LOG_LEVEL:INFO}

# настройки HTTP-подключений
http:
  # порт, на котором работает коннектор
  port: ${SERVER_PORT:8080}

# настройки для работы с Vertx
vertx:
  # настройки пулов Vertx
  pools:
    # максимальный размер пула потоков, обрабатывающих события Vertx
    eventLoopPoolSize: ${VERTX_EVENT_LOOP_SIZE:12}
    # максимальный размер пула потоков, выполняющих долгие операции
    workersPoolSize: ${VERTX_WORKERS_POOL_SIZE:32}
  # настройки вертиклов Vertx
  verticle:
    query:
      # количество экземпляров, принимающих запросы
      instances: ${QUERY_VERTICLE_INSTANCES:12}
    producer:
      # количество экземпляров, записывающих данные
      instances: ${PRODUCER_VERTICLE_INSTANCES:6}
      # максимальный размер пула потоков, записывающих данные
      poolSize: ${PRODUCER_POOL_SIZE:12}
    executor:
      # максимальный размер пула потоков, обрабатывающих запросы в базу данных
      poolSize: ${EXECUTOR_POOL_SIZE:12}

reader:
  # максимальный размер буфера вычитанных строк из базы данных
  uploadChunkBuffer: ${UPLOAD_CHUNK_BUFFER:1000}

# настройки для работы с ADQM
datasource:
  clickhouse:
    # имя базы данных в ADQM
    database: ${CLICKHOUSE_DB_NAME:test1}
    # имя пользователя/логин для авторизации в ADQM
    user: ${CLICKHOUSE_USERNAME:default}
    # пароль для авторизации в ADQM
    password: ${CLICKHOUSE_PASS:}
    # сетевой адрес хоста с ADQM и номер порта на хосте
    hosts: ${CLICKHOUSE_HOSTS:clickhouse.host:8123}
    # максимальный размер результата, возвращаемого по FETCH-запросу к ADQM
    fetchSize: ${CLICKHOUSE_FETCH_SIZE:1000}

# настройки для работы с брокером сообщений Kafka
kafka:
  clickhouse:
    # настройки производителя данных
    producer:
      property:
        # сериализатор строковых ключей
        key.serializer: org.apache.kafka.common.serialization.ByteArraySerializer
        # сериализатор строковых значений
        value.serializer: org.apache.kafka.common.serialization.ByteArraySerializer
    # настройки кластера
    cluster:
      # сетевой адрес хоста Zookeeper для брокера сообщений Kafka
      zookeeperHosts: ${ZOOKEEPER_HOSTS:zk-1.dtm.local}
      # корневой путь к хосту Zookeeper для брокера сообщений Kafka
      rootPath: ${KAFKA_CLUSTER_ROOTPATH:arenadata/cluster/21}

Конфигурация коннектора Kafka-Clickhouse writer

# настройки журналирования
logging:
  # уровни журналирования
  level:
    # уровень важности сообщений, записываемых в лог-файл
    ru.datamart.kafka: ${LOG_LEVEL:DEBUG}
    # уровень важности сообщений, записываемых в лог-файл по событиям Kafka
    org.apache.kafka: ${KAFKA_LOG_LEVEL:INFO}

# настройки HTTP-подключений
http:
  # порт, на котором работает коннектор
  port: ${SERVER_PORT:8080}

# настройки для работы с Vertx
vertx:
  # настройки пулов Vertx
  pools:
    # максимальный размер пула потоков, обрабатывающих события Vertx
    eventLoopPoolSize: ${VERTX_EVENT_LOOP_SIZE:12}
    # максимальный размер пула потоков, выполняющих долгие операции
    workersPoolSize: ${VERTX_WORKERS_POOL_SIZE:32}
  # настройки вертиклов Vertx
  verticle:
    query:
      # количество экземпляров вертиклов, обрабатывающих запросы
      instances: ${QUERY_VERTICLE_INSTANCES:12}
    insert:
      # максимальный размер пула потоков, вставляющих данные
      poolSize: ${INSERT_WORKER_POOL_SIZE:32}
      # периодичность вставки новых данных (в миллисекундах)
      insertPeriodMs: ${INSERT_PERIOD_MS:1000}
      # размер пакета операций при вставке данных
      batchSize: ${INSERT_BATCH_SIZE:500}
    consumer:
      # максимальный размер пула потоков, считывающих данные
      poolSize: ${KAFKA_CONSUMER_WORKER_POOL_SIZE:32}
      # максимальный размер результата, возвращаемого по FETCH-запросу к ADQM
      maxFetchSize: ${KAFKA_CONSUMER_MAX_FETCH_SIZE:10000}
    commit:
      # размер пула потоков, записывающих смещение в топиках Kafka
      poolSize: ${KAFKA_COMMIT_WORKER_POOL_SIZE:1}
      # периодичность записи смещения в топиках Kafka (в миллисекундах)
      commitPeriodMs: ${KAFKA_COMMIT_WORKER_COMMIT_PERIOD_MS:1000}

# настройки для работы с брокером сообщений Kafka
client:
  kafka:
    # настройки консьюмера (потребителя) Kafka
    consumer:
      # периодичность проверки (в миллисекундах) статуса брокера сообщений Kafka
      checkingTimeoutMs: ${KAFKA_CHECKING_TIMEOUT_MS:10000}
      # время ожидания (в миллисекундах) ответа от брокера сообщений Kafka до тайм-аута
      responseTimeoutMs: ${KAFKA_RESPONSE_TIMEOUT_MS:10000}
      # количество консьюмеров Kafka
      consumerSize: ${KAFKA_CONSUMER_SIZE:10}
      # время ожидания (в миллисекундах) до закрытия соединения с брокером сообщений Kafka
      closeConsumersTimeout: ${KAFKA_CLOSE_CONSUMER_TIMEOUT:15000}
      # свойства консьюмера в соответствии с конфигурацией консьюмеров Kafka (https://kafka.apache.org/documentation/#consumerconfigs)
      property:
        # список адресов и портов Bootstrap-серверов брокера сообщений Kafka
        bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:kafka.host:9092}
        # режим обнуления смещения в топиках Kafka
        auto.offset.reset: ${KAFKA_AUTO_OFFSET_RESET:earliest}
        # признак автоматической записи смещения в топиках Kafka
        enable.auto.commit: ${KAFKA_AUTO_COMMIT:false}
        # периодичность (в миллисекундах) автоматической записи смещения в топиках Kafka
        auto.commit.interval.ms: ${KAFKA_AUTO_INTERVAL_MS:1000}

# настройки окружения
env:
  # имя окружения
  name: ${ENV:test}

# настройки для работы с ADQM
datasource:
  clickhouse:
    # имя базы данных в ADQM
    database: ${CLICKHOUSE_DB_NAME:test1}
    # имя пользователя/логин для авторизации в ADQM    
    user: ${CLICKHOUSE_USERNAME:default}
    # пароль для авторизации в ADQM
    password: ${CLICKHOUSE_PASS:}
    # список хостов ADQM в формате: address1:port1,address2:port2...addressN:portN  
    hosts: ${CLICKHOUSE_HOSTS:clickhouse.host:8123}

Конфигурация коннектора Kafka-Postgres reader

# настройки журналирования
logging:
  # уровни журналирования событий
  level:
    # уровень важности сообщений, записываемых в лог-файл
    ru.datamart.kafka: ${LOG_LEVEL:DEBUG}
    # уровень важности сообщений, записываемых в лог-файл по событиям Kafka 
    org.apache.kafka: ${KAFKA_LOG_LEVEL:INFO}

# настройки HTTP-подключений
http:
  # порт, на котором работает коннектор
  port: ${SERVER_PORT:8094}

# настройки для работы с Vertx
vertx:
  # настройки пулов Vertx
  pools:
    # максимальный размер пула потоков, обрабатывающих события Vertx
    eventLoopPoolSize: ${VERTX_EVENT_LOOP_SIZE:12}
    # максимальный размер пула потоков, выполняющих долгие операции
    workersPoolSize: ${VERTX_WORKERS_POOL_SIZE:32}
  # настройки вертиклов Vertx
  verticle:
    query:
      # количество экземпляров, принимающих запросы
      instances: ${QUERY_VERTICLE_INSTANCES:12}
    producer:
      # количество экземпляров, записывающих данные
      instances: ${PRODUCER_VERTICLE_INSTANCES:6}
      # максимальный размер пула потоков, записывающих данные
      poolSize: ${PRODUCER_POOL_SIZE:12}
    executor:
      # максимальный размер пула потоков, обрабатывающих запросы в базу данных
      poolSize: ${EXECUTOR_POOL_SIZE:12}

reader:
  # максимальный размер буфера вычитанных строк из базы данных
  uploadChunkBuffer: ${UPLOAD_CHUNK_BUFFER:1000}

# настройки для работы с ADP
datasource:
  postgres:
    # имя базы данных ADP
    database: ${POSTGRES_DB_NAME:db}
    # имя пользователя/логин для авторизации в ADP
    user: ${POSTGRES_USERNAME:user}
    # пароль для авторизации в ADP
    password: ${POSTGRES_PASS:password}
    # сетевой адрес хоста с ADP и номер порта на хосте
    hosts: ${POSTGRES_HOSTS:postgres.host:5432}
    # максимальный размер пула потоков
    poolSize: ${POSTGRES_POOLSIZE:10}
    pipeliningLimit: ${POSTGRES_PIPELINING_LIMIT:1}
    # максимальный размер кэша запроса prepared statement 
    preparedStatementsCacheMaxSize: ${POSTGRES_CACHE_MAX_SIZE:256}
    # максимальный размер запроса prepared statement, который может быть закэширован  
    preparedStatementsCacheSqlLimit: ${POSTGRES_CACHE_SQL_LIMIT:2048}
    # признак кэширования запросов prepared statement
    preparedStatementsCache: ${POSTGRES_CACHE:true}
    # максимальный размер результата, возвращаемого по FETCH-запросу к ADP  
    fetchSize: ${POSTGRES_FETCH_SIZE:10000}

# настройки для работы с брокером сообщений Kafka
kafka:
  client:
    property:
      # сериализатор строковых ключей
      key.serializer: org.apache.kafka.common.serialization.ByteArraySerializer
      # сериализатор строковых значений
      value.serializer: org.apache.kafka.common.serialization.ByteArraySerializer

Конфигурация коннектора Kafka-Postgres writer

# настройки журналирования
logging:
  # уровни журналирования событий
  level:
    # уровень важности сообщений, записываемых в лог-файл
    ru.datamart.kafka: ${LOG_LEVEL:DEBUG}
    # уровень важности сообщений, записываемых в лог-файл по событиям Kafka
    org.apache.kafka: ${KAFKA_LOG_LEVEL:INFO}

# настройки HTTP-подключений
http:
  # порт, на котором работает коннектор
  port: ${SERVER_PORT:8096}

# настройки для работы с Vertx
vertx:
  # настройки пулов Vertx
  pools:
    # максимальный размер пула потоков, обрабатывающих события Vertx
    eventLoopPoolSize: ${VERTX_EVENT_LOOP_SIZE:12}
    # максимальный размер пула потоков, выполняющих долгие операции
    workersPoolSize: ${VERTX_WORKERS_POOL_SIZE:32}
  # настройки вертиклов Vertx
  verticle:
    query:
      # количество экземпляров вертиклов, обрабатывающих запросы
      instances: ${QUERY_VERTICLE_INSTANCES:12}
    insert:
      # максимальный размер пула потоков, вставляющих данные
      poolSize: ${INSERT_WORKER_POOL_SIZE:32}
      # периодичность вставки новых данных (в миллисекундах)
      insertPeriodMs: ${INSERT_PERIOD_MS:1000}
      # размер пакета операций при вставке данных
      batchSize: ${INSERT_BATCH_SIZE:500}
    consumer:
      # максимальный размер пула потоков, считывающих данные
      poolSize: ${KAFKA_CONSUMER_WORKER_POOL_SIZE:32}
      # максимальный размер результата, возвращаемого по FETCH-запросу к ADP
      maxFetchSize: ${KAFKA_CONSUMER_MAX_FETCH_SIZE:10000}
    commit:
      # размер пула потоков, записывающих смещение в топиках Kafka
      poolSize: ${KAFKA_COMMIT_WORKER_POOL_SIZE:1}
      # периодичность записи смещения в топиках Kafka (в миллисекундах)
      commitPeriodMs: ${KAFKA_COMMIT_WORKER_COMMIT_PERIOD_MS:1000}

# настройки для работы с брокером сообщений Kafka
client:
  kafka:
    # настройки консьюмера (потребителя) Kafka
    consumer:
      # периодичность проверки (в миллисекундах) статуса брокера сообщений Kafka
      checkingTimeoutMs: ${KAFKA_CHECKING_TIMEOUT_MS:10000}
      # время ожидания (в миллисекундах) ответа от брокера сообщений Kafka до тайм-аута
      responseTimeoutMs: ${KAFKA_RESPONSE_TIMEOUT_MS:10000}
      # количество консьюмеров Kafka
      consumerSize: ${KAFKA_CONSUMER_SIZE:10}
      # время ожидания (в миллисекундах) до закрытия соединения с брокером сообщений Kafka
      closeConsumersTimeout: ${KAFKA_CLOSE_CONSUMER_TIMEOUT:15000}
      # свойства консьюмера в соответствии с конфигурацией консьюмеров Kafka (https://kafka.apache.org/documentation/#consumerconfigs)
      property:
        # список адресов и портов Bootstrap-серверов брокера сообщений Kafka
        bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:kafka.host:9092}
        # режим обнуления смещения в топиках Kafka
        auto.offset.reset: ${KAFKA_AUTO_OFFSET_RESET:earliest}
        # признак автоматической записи смещения в топиках Kafka
        enable.auto.commit: ${KAFKA_AUTO_COMMIT:false}
        # периодичность (в миллисекундах) автоматической записи смещения в топиках Kafka
        auto.commit.interval.ms: ${KAFKA_AUTO_INTERVAL_MS:1000}

# настройки для работы с ADP
datasource:
  postgres:
    # имя базы данных ADP
    database: ${POSTGRES_DB_NAME:db}
    # имя пользователя/логин для авторизации в ADP
    user: ${POSTGRES_USERNAME:user}
    # пароль для авторизации в ADP
    password: ${POSTGRES_PASS:password}
    # сетевой адрес хоста с ADP и номер порта на хосте
    hosts: ${POSTGRES_HOSTS:postgres.host:5432}
    # максимальный размер пула потоков
    poolSize: ${POSTGRES_POOLSIZE:10}
    # максимальный размер кэша запроса prepared statement   
    preparedStatementsCacheMaxSize: ${POSTGRES_CACHE_MAX_SIZE:256}
    # максимальный размер запроса prepared statement, который может быть закэширован
    preparedStatementsCacheSqlLimit: ${POSTGRES_CACHE_SQL_LIMIT:2048}
    # признак кэширования запросов prepared statement
    preparedStatementsCache: ${POSTGRES_CACHE:true}

Конфигурация коннектора Kafka Jet writer

# настройки журналирования событий
logging:
  # уровни журналирования
  level:
    # уровень важности сообщений, записываемых в лог-файл
    ru.datamart.jet: ${LOG_LEVEL:DEBUG}

# настройки HTTP-подключений
http:
  # порт, на котором работает коннектор
  port: ${SERVER_PORT:8080}

# настройки для работы с Vertx
vertx:
  # настройки пулов Vertx
  pools:
    # максимальный размер пула потоков, обрабатывающих события Vertx
    eventLoopPoolSize: ${VERTX_EVENT_LOOP_SIZE:12}
    # максимальный размер пула потоков, выполняющих долгие операции
    workersPoolSize: ${VERTX_WORKERS_POOL_SIZE:32}
  # настройки вертиклов Vertx
  verticle:
    query:
      # количество экземпляров вертиклов, обрабатывающих запросы
      instances: ${QUERY_VERTICLE_INSTANCES:12}
    postgres:
      # количество экземпляров вертиклов, выполняющих запросы в postgres
      instances: ${POSTGRES_VERTICLE_INSTANCES:12}

# настройки для работы с брокером сообщений Kafka
client:
  kafka:
    # настройки консьюмера (потребителя) Kafka
    consumer:
      # свойства консьюмера в соответствии с конфигурацией консьюмеров Kafka (https://kafka.apache.org/documentation/#consumerconfigs)
      properties:
        # режим обнуления смещения в топиках Kafka
        auto.offset.reset: ${KAFKA_AUTO_OFFSET_RESET:earliest}
        # признак автоматической записи смещения в топиках Kafka
        enable.auto.commit: ${KAFKA_AUTO_COMMIT:false}
        # периодичность (в миллисекундах) автоматической записи смещения в топиках Kafka
        auto.commit.interval.ms: ${KAFKA_AUTO_INTERVAL_MS:1000}

datasource:
  # настройки для работы с ADP
  postgres:
    # максимальный размер пула потоков
    poolSize: ${POSTGRES_POOLSIZE:10}
    # максимальный размер кэша запроса prepared statement  
    preparedStatementsCacheMaxSize: ${POSTGRES_CACHE_MAX_SIZE:256}
    # максимальный размер запроса prepared statement, который может быть закэширован
    preparedStatementsCacheSqlLimit: ${POSTGRES_CACHE_SQL_LIMIT:2048}
    # признак кэширования запросов prepared statement
    preparedStatementsCache: ${POSTGRES_CACHE:true}
    # ограничение количества одновременных запросов в рамках одного подключения к postgres
    pipeliningLimit: ${POSTGRES_PIPELINING_LIMIT:1}