Skip to main content Link Menu Expand (external link) Document Search Copy Copied

Вы находитесь на странице архива. Актуальная документация доступна по ссылке.

Конфигурация коннекторов

Содержание раздела
  1. Переопределение параметров конфигурации
  2. Конфигурация коннектора Kafka-Clickhouse reader
  3. Конфигурация коннектора Kafka-Clickhouse writer
  4. Конфигурация коннектора Kafka-Postgres reader
  5. Конфигурация коннектора Kafka-Postgres writer

Следующие коннекторы требуют настройки конфигурации:

  • Kafka-Clickhouse reader connector,
  • Kafka-Clickhouse writer connector,
  • Kafka-Postgres reader connector,
  • Kafka-Postgres writer connector.

Ссылки на репозитории с исходным кодом коннекторов доступны в разделе Ресурсы.

Конфигурация коннектора представляет собой текстовый YAML-файл, параметры которого организованы в древовидную структуру. Ниже приведены примеры конфигурационных файлов, где для каждого параметра указаны:

  • назначение,
  • имя,
  • имя переменной окружения,
  • значение параметра.

Переопределение параметров конфигурации

По умолчанию коннектор использует конфигурационный файл, входящий в состав JAR-файла коннектора. Значения параметров конфигурации можно переопределить любым из способов:

  • поместить свой YAML-файл с нужными значениями параметров в директорию <рабочая_директория_коннектора>/config;
  • задать переменные окружения.

Если параметр определен в нескольких источниках, то наиболее приоритетным считается значение переменной окружения, следующим источником по приоритету — конфигурация в директории config, затем — конфигурация в составе JAR-файла.

Переопределить значение параметра с помощью переменной окружения можно, если эта переменная определена в конфигурационном файле. Если вы поместили свой конфигурационный файл в директорию config, он должен определять нужные переменные. При использовании конфигурационного файла из JAR-файла ничего определять не нужно, так как все переменные в нем уже определены.

Конфигурация коннектора Kafka-Clickhouse reader

# настройки Vertx
vertx:
  # признак кластеризованного режима Vertx
  clustered:true

# настройки журналирования
logging:
  level:
    # уровень важности сообщений, записываемых в лог-файл
    ru.datamart.kafka.clickhouse.reader: ${LOG_LEVEL:DEBUG}

# настройки вертиклов Vertx
verticle:
  worker:
    task-worker:
      # максимальный размер пула подключений веб-клиентов к ADQM
      poolSize: ${TASK_WORKER_POOL_SIZE:12}
      # имя пула подключений веб-клиентов к ADQM
      poolName: ${TASK_WORKER_POOL_NAME:task-worker}
      # внутренний таймаут обработки запросов (в миллисекундах)
      responseTimeoutMs: ${TASK_WORKER_RESPONSE_TIMEOUT_MS:86400000}

# настройки HTTP-подключений
http:
  # порт, на котором работает коннектор
  port: ${SERVER_PORT:8086}

# временная зона коннектора
timezone: ${MPPR_TIME_ZONE:UTC}

# настройки для работы с ADQM
datasource:
  clickhouse:
    # имя базы данных в ADQM
    database: ${CLICKHOUSE_DB_NAME:test1}
    # имя пользователя/логин для авторизации в ADQM
    user: ${CLICKHOUSE_USERNAME:default}
    # пароль для авторизации в ADQM
    password: ${CLICKHOUSE_PASS:}
    # сетевой адрес хоста с ADQM и номер порта на хосте
    hosts: ${CLICKHOUSE_HOSTS:clickhouse.host:8123}
    # максимальный размер результата, возвращаемого по FETCH-запросу к ADQM
    fetchSize: ${CLICKHOUSE_FETCH_SIZE:1000}

# настройки для работы с брокером сообщений Kafka
kafka:
  clickhouse:
    # настройки производителя данных
    producer:
      property:
        # сериализатор строковых ключей   
        key.serializer: org.apache.kafka.common.serialization.ByteArraySerializer
        # сериализатор строковых значений
        value.serializer: org.apache.kafka.common.serialization.ByteArraySerializer
    # настройки кластера
    cluster:
      # сетевой адрес хоста Zookeeper для брокера сообщений Kafka    
      zookeeperHosts: ${ZOOKEEPER_HOSTS:zk-1.dtm.local}
      # корневой путь к хосту Zookeeper для брокера сообщений Kafka   
      rootPath: ${KAFKA_CLUSTER_ROOTPATH:arenadata/cluster/21}

Конфигурация коннектора Kafka-Clickhouse writer

# настройки HTTP-подключений
http:
  # порт, на котором работает коннектор
  port: ${SERVER_PORT:8090}

# настройки для работы с брокером сообщений Kafka
client:
  kafka:
    # настройки консьюмера (потребителя) Kafka
    consumer:
      # периодичность проверки (в миллисекундах) статуса брокера сообщений Kafka 
      checkingTimeoutMs: ${KAFKA_CHECKING_TIMEOUT_MS:10000}
      # время ожидания (в миллисекундах) ответа брокера сообщений Kafka до тайм-аута
      responseTimeoutMs: ${KAFKA_RESPONSE_TIMEOUT_MS:10000}
      # количество консьюмеров Kafka
      consumerSize: ${KAFKA_CONSUMER_SIZE:10}
      # время ожидания (в миллисекундах) до закрытия соединения с брокером сообщений Kafka   
      closeConsumersTimeout: ${KAFKA_CLOSE_CONSUMER_TIMEOUT:15000}
      # свойства консьюмера в соответствии с конфигурацией консьюмеров Kafka (https://kafka.apache.org/documentation/#consumerconfigs)       
      property:
        # список адресов и портов Bootstrap-серверов брокера сообщений Kafka       
        bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:kafka.host:9092}
        # имя консьюмер-группы для загрузки данных в ADQM
        group.id: ${KAFKA_CONSUMER_GROUP_ID:clickhouse-query-execution}
        # режим обнуления смещения в топиках Kafka
        auto.offset.reset: ${KAFKA_AUTO_OFFSET_RESET:earliest}
        # признак автоматической записи смещения в топиках Kafka   
        enable.auto.commit: ${KAFKA_AUTO_COMMIT:false}
        # периодичность (в миллисекундах) автоматической записи смещения в топиках Kafka       
        auto.commit.interval.ms: ${KAFKA_AUTO_INTERVAL_MS:1000}

# настройки окружения
env:
  # имя окружения
  name: ${ENV:test}

# настройки, связанные с системными полями
datamart:
  # системное поле, хранящее номер операции записи
  hot-delta-field-name: sys_from

# настройки для работы с ADQM
datasource:
  clickhouse:
    # имя базы данных в ADQM
    database: ${CLICKHOUSE_DB_NAME:test1}
    # имя пользователя/логин для авторизации в ADQM    
    user: ${CLICKHOUSE_USERNAME:default}
    # пароль для авторизации в ADQM
    password: ${CLICKHOUSE_PASS:}
    # список хостов ADQM (адрес:порт через запятую)   
    hosts: ${CLICKHOUSE_HOSTS:clockhouse.host:8123}

# настройки VertX
verticle:
  worker:
    # настройки компонента, обрабатывающего запросы
    new-data-worker:
      # максимальный размер пула потоков
      poolSize: ${DATA_WORKER_POOL_SIZE:20}
      # имя пула потоков
      poolName: ${DATA_WORKER_POOL_NAME:new-data-worker}
    # настройки компонента, обрабатывающего полученные данные
    task-worker:
      # максимальный размер пула потоков
      poolSize: ${TASK_WORKER_POOL_SIZE:12}
      # имя пула потоков
      poolName: ${TASK_WORKER_POOL_NAME:task-worker}
      # внутренний таймаут обработки полученных данных (в миллисекундах)
      responseTimeoutMs: ${TASK_WORKER_RESPONSE_TIMEOUT_MS:86400000}
    # настройки компонента, выполняющего вставку данных
    insert-worker:
      # максимальный размер пула потоков
      poolSize: ${INSERT_WORKER_POOL_SIZE:32}
      # имя пула потоков
      poolName: ${INSERT_WORKER_POOL_NAME:insert-worker}
      # внутренний таймаут обработки вставки данных (в миллисекундах)
      responseTimeoutMs: ${INSERT_WORKER_RESPONSE_TIMEOUT_MS:86400000}
      # периодичность вставки данных (в миллисекундах)
      insertPeriodMs: ${INSERT_PERIOD_MS:1000}
      # размер пакета операций при вставке данных 
      batchSize: ${INSERT_BATCH_SIZE:500}
    # настройки компонента, считывающего данные их топиков Kafka  
    kafka-consumer-worker:
      # максимальный размер пула потоков
      poolSize: ${KAFKA_CONSUMER_WORKER_POOL_SIZE:32}
      # имя пула потоков
      poolName: ${KAFKA_CONSUMER_WORKER_POOL_NAME:insert-worker}
      # внутренний таймаут обработки консьюмера данных (в миллисекундах)
      responseTimeoutMs: ${KAFKA_CONSUMER_WORKER_RESPONSE_TIMEOUT_MS:86400000}
      # максимальный размер результата, возвращаемого по FETCH-запросу к ADQM
      maxFetchSize: ${KAFKA_CONSUMER_MAX_FETCH_SIZE:10000}
    # настройки компонента, записывающего смещение в топиках Kafka   
    kafka-commit-worker:
      # максимальный размер пула потоков
      poolSize: ${KAFKA_COMMIT_WORKER_POOL_SIZE:1}
      # имя пула потоков
      poolName: ${KAFKA_COMMIT_WORKER_POOL_NAME:commit-worker}
      # периодичность записи смещения в топиках Kafka (в миллисекундах)
      commitPeriodMs: ${KAFKA_COMMIT_WORKER_COMMIT_PERIOD_MS:1000}

# настройки журналирования
logging:
  level:
    # уровень важности сообщений, записываемых в лог-файл   
    ru.datamart.prostore: DEBUG
    # уровень важности сообщений, записываемых в лог-файл по событиям Kafka     
    org.apache.kafka: INFO

Конфигурация коннектора Kafka-Postgres reader

# настройки журналирования
logging:
  level:
    # уровень важности сообщений, записываемых в лог-файл
    ru.datamart.kafka: ${LOG_LEVEL:DEBUG}
    # уровень важности сообщений, записываемых в лог-файл по событиям Kafka 
    org.apache.kafka: ${KAFKA_LOG_LEVEL:INFO}

# настройки HTTP-подключений
http:
  # порт, на котором работает коннектор
  port: ${SERVER_PORT:8094}

# настройки дла работы с Vertx
vertx:
  pools:
    # максимальный размер пула потоков, обрабатывающих события Vertx
    eventLoopPoolSize: ${VERTX_EVENT_LOOP_SIZE:12}
    # максимальный размер пула потоков, выполняющих долгие операции
    workersPoolSize: ${VERTX_WORKERS_POOL_SIZE:32}
  verticle:
    query:
      # количество экземпляров, принимающих запросы
      instances: ${QUERY_VERTICLE_INSTANCES:12}

# настройки для работы с ADP
datasource:
  postgres:
    # имя базы данных ADP
    database: ${POSTGRES_DB_NAME:db}
    # имя пользователя/логин для авторизации в ADP
    user: ${POSTGRES_USERNAME:user}
    # пароль для авторизации в ADP
    password: ${POSTGRES_PASS:password}
    # сетевой адрес хоста с ADP и номер порта на хосте
    hosts: ${POSTGRES_HOSTS:postgres.host:5432}
    # максимальный размер пула потоков
    poolSize: ${POSTGRES_POOLSIZE:10}
    # максимальный размер кэша запроса prepared statement 
    preparedStatementsCacheMaxSize: ${POSTGRES_CACHE_MAX_SIZE:256}
    # максимальный размер запроса prepared statement, который может быть закэширован  
    preparedStatementsCacheSqlLimit: ${POSTGRES_CACHE_SQL_LIMIT:2048}
    # признак кэширования запросов prepared statement
    preparedStatementsCache: ${POSTGRES_CACHE:true}
    # максимальный размер результата, возвращаемого по FETCH-запросу к ADP  
    fetchSize: ${POSTGRES_FETCH_SIZE:1000}

# настройки для работы с брокером сообщений Kafka
kafka:
  client:
    property:
      # сериализатор строковых ключей
      key.serializer: org.apache.kafka.common.serialization.ByteArraySerializer
      # сериализатор строковых значений
      value.serializer: org.apache.kafka.common.serialization.ByteArraySerializer

Конфигурация коннектора Kafka-Postgres writer

# настройки журналирования
logging:
  level:
    # уровень важности сообщений, записываемых в лог-файл
    ru.datamart.kafka: ${LOG_LEVEL:DEBUG}
    # уровень важности сообщений, записываемых в лог-файл по событиям Kafka   
    org.apache.kafka: ${KAFKA_LOG_LEVEL:INFO}

# настройки HTTP-подключений
http:
  # порт, на котором работает коннектор
  port: ${SERVER_PORT:8096}

# настройки для работы с Vertx
vertx:
  pools:
    # максимальный размер пула потоков, обрабатывающих события Vertx 
    eventLoopPoolSize: ${VERTX_EVENT_LOOP_SIZE:12}
    # максимальный размер пула потоков, выполняющих долгие операции 
    workersPoolSize: ${VERTX_WORKERS_POOL_SIZE:32}
  verticle:
    query:
      # количество экземпляров вертиклов, обрабатывающих запросы
      instances: ${QUERY_VERTICLE_INSTANCES:12}
    insert:
      # максимальный размер пула потоков, вставляющих данные 
      poolSize: ${INSERT_WORKER_POOL_SIZE:32}
      # периодичность вставки новых данных (в миллисекундах)
      insertPeriodMs: ${INSERT_PERIOD_MS:1000}
      # размер пакета операций при вставке данных
      batchSize: ${INSERT_BATCH_SIZE:500}
    consumer:
      # максимальный размер пула потоков, считывающих данные        
      poolSize: ${KAFKA_CONSUMER_WORKER_POOL_SIZE:32}
      # максимальный размер результата, возвращаемого по FETCH-запросу к ADP
      maxFetchSize: ${KAFKA_CONSUMER_MAX_FETCH_SIZE:10000}
    commit:
      # размер пула потоков, записывающих смещение в топиках Kafka
      poolSize: ${KAFKA_COMMIT_WORKER_POOL_SIZE:1}
      # периодичность записи смещения в топиках Kafka (в миллисекундах)
      commitPeriodMs: ${KAFKA_COMMIT_WORKER_COMMIT_PERIOD_MS:1000}

# настройки для работы с брокером сообщений Kafka
client:
  kafka:
    # настройки консьюмера (потребителя) Kafka
    consumer:
      # периодичность проверки (в миллисекундах) статуса брокера сообщений Kafka  
      checkingTimeoutMs: ${KAFKA_CHECKING_TIMEOUT_MS:10000}
      # время ожидания (в миллисекундах) ответа от брокера сообщений Kafka до тайм-аута    
      responseTimeoutMs: ${KAFKA_RESPONSE_TIMEOUT_MS:10000}
      # количество консьюмеров Kafka
      consumerSize: ${KAFKA_CONSUMER_SIZE:10}
      # время ожидания (в миллисекундах) до закрытия соединения с брокером сообщений Kafka    
      closeConsumersTimeout: ${KAFKA_CLOSE_CONSUMER_TIMEOUT:15000}
      # свойства консьюмера в соответствии с конфигурацией консьюмеров Kafka (https://kafka.apache.org/documentation/#consumerconfigs)  
      property:
        # список адресов и портов Bootstrap-серверов брокера сообщений Kafka
        bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:kafka.host:9092}
        # имя консьюмер-группы для загрузки данных в ADP
        group.id: ${KAFKA_CONSUMER_GROUP_ID:postgres-query-execution}
        # режим обнуления смещения в топиках Kafka
        auto.offset.reset: ${KAFKA_AUTO_OFFSET_RESET:earliest}
        # признак автоматической записи смещения в топиках Kafka
        enable.auto.commit: ${KAFKA_AUTO_COMMIT:false}
        # периодичность (в миллисекундах) автоматической записи смещения в топиках Kafka
        auto.commit.interval.ms: ${KAFKA_AUTO_INTERVAL_MS:1000}

# настройки для работы с ADP
datasource:
  postgres:
    # имя базы данных ADP
    database: ${POSTGRES_DB_NAME:db}
    # имя пользователя/логин для авторизации в ADP
    user: ${POSTGRES_USERNAME:user}
    # пароль для авторизации в ADP
    password: ${POSTGRES_PASS:password}
    # сетевой адрес хоста с ADP и номер порта на хосте   
    hosts: ${POSTGRES_HOSTS:postgres.host:5432}
    # максимальный размер пула потоков
    poolSize: ${POSTGRES_POOLSIZE:10}
    # максимальный размер кэша запроса prepared statement   
    preparedStatementsCacheMaxSize: ${POSTGRES_CACHE_MAX_SIZE:256}
    # максимальный размер запроса prepared statement, который может быть закэширован     
    preparedStatementsCacheSqlLimit: ${POSTGRES_CACHE_SQL_LIMIT:2048}
    # признак кэширования запросов prepared statement
    preparedStatementsCache: ${POSTGRES_CACHE:true}