Skip to content

Kafka Configuration and Monitoring

**Referenced Files in This Document** - [[config.go]](file/bi-common/mq/kafkax/config.go) - [[env.go]](file/bi-common/mq/kafkax/env.go) - [[kafkax-default.yaml]](file/bi-common/mq/kafkax/kafkax-default.yaml) - [[setup.go]](file/bi-common/mq/kafkax/setup.go) - [[option.go]](file/bi-common/mq/kafkax/option.go) - [[producer.go]](file/bi-common/mq/kafkax/producer.go) - [[consumer.go]](file/bi-common/mq/kafkax/consumer.go) - [[consume.go]](file/bi-common/mq/kafkax/consume.go) - [[multi_consumer.go]](file/bi-common/mq/kafkax/multi-consumer.go) - [[consumer_manager.go]](file/bi-common/mq/kafkax/consumer-manager.go) - [[tracing.go]](file/bi-common/mq/kafkax/tracing.go) - [[logger.go]](file/bi-common/observability/logger/logger.go)

Table of Contents

  1. Introduction
  2. Project Structure
  3. Core Components
  4. Architecture Overview
  5. Detailed Component Analysis
  6. Dependency Analysis
  7. Performance Considerations
  8. Troubleshooting Guide
  9. Conclusion
  10. Appendices

Introduction

This document explains how Kafka configuration is managed and monitored within the project’s messaging library. It covers:

  • Configuration loading from environment variables for brokers, topics, authentication, and TLS/SASL
  • TLS and SASL configuration options for secure connections
  • Monitoring and statistics collection via built-in APIs (consumer lag, connection health checks, and reader/writer stats)
  • Configuration validation, environment-specific overrides, and default value management
  • Production-ready configuration examples, security hardening, and performance tuning
  • Troubleshooting using built-in monitoring tools, logging, and health checks
  • Capacity planning and scaling guidance

Project Structure

The Kafka integration resides under the messaging library and is composed of:

  • Configuration model and defaults
  • Environment variable loading
  • Producer and consumer builders with options
  • Multi-topic and manager-based orchestration
  • Health checks and statistics
  • Optional OpenTelemetry tracing integration

Diagram sources

Section sources

Core Components

  • Configuration model: RootConfig, ProducerConfig, ConsumerConfig, SASLConfig, TLSConfig
  • Defaults: Embedded YAML defines sensible defaults for producers and consumers
  • Environment loading: Dedicated constants and loaders for producer/consumer and shared settings
  • Builders: NewProducer/NewConsumer with extensive Option helpers
  • Protobuf setup: Convert proto config to options for programmatic setup
  • Health and stats: Ping for connectivity, Stats/Lag for monitoring
  • Retry and DLQ: Built-in retry/backoff and dead-letter queue support
  • Multi-consumer and manager: Orchestrate multiple topics and groups
  • Tracing: Optional OpenTelemetry integration for produce/consume spans

Section sources

Architecture Overview

The configuration lifecycle:

  • Defaults loaded from embedded YAML
  • Environment variables override defaults
  • Programmatic options further refine configuration
  • Producers/Consumers are constructed with merged configuration
  • Health checks and stats exposed via Ping and Stats APIs
  • Optional tracing integrates with OpenTelemetry

Diagram sources

Detailed Component Analysis

Configuration Model and Defaults

  • RootConfig nests ProducerConfig and ConsumerConfig
  • ProducerConfig includes brokers, topic, client ID, batching, reliability, performance, timeouts, SASL, TLS, and runtime loggers
  • ConsumerConfig mirrors producer fields plus consumer-specific settings (group ID, partition, offsets, intervals, isolation level, partition change monitoring)
  • SASLConfig supports enabling mechanism and credentials
  • TLSConfig supports enabling, skipping verification, and certificate paths
  • Defaults are loaded from embedded YAML and returned by DefaultProducerConfig/DefaultConsumerConfig

Diagram sources

Section sources

Environment Variable Loading

  • Constants define environment variable names for brokers, client ID, SASL, TLS, producer, and consumer settings
  • LoadProducerFromEnv and LoadConsumerFromEnv parse and apply environment overrides
  • Broker lists are comma-separated and trimmed
  • Partition environment handling distinguishes between “set” and “not set” to select single-partition vs consumer-group modes

Diagram sources

Section sources

TLS and SASL Configuration

  • SASLConfig supports enabling mechanism and credentials; applied via WithProducerSASL/WithConsumerSASL or environment
  • TLSConfig supports enabling, skipping verification, and providing CA/cert/key paths; applied via WithProducerTLS*/WithConsumerTLS*
  • Transport/Dialer construction uses SASL/TLS settings during producer/consumer creation

Diagram sources

Section sources

Monitoring and Statistics

  • Producer: Stats() returns WriterStats; Ping() validates connectivity
  • Consumer: Stats() returns ReaderStats; Lag() reports consumer lag; Ping() validates connectivity
  • MultiTopicConsumer: Stats() aggregates per-topic ReaderStats; Ping() delegates to first consumer
  • ConsumerManager: Orchestrates multiple consumers and exposes RunningCount()

Diagram sources

Section sources

Retry, Dead Letter Queue, and Delivery Guarantees

  • ConsumeConfig supports OnError callback, RetryConfig for exponential backoff, and DLQTopic/DLQWriter
  • Automatic/manual commit modes supported
  • Batch consumption with configurable batch size and timeout

Diagram sources

Section sources

Multi-Topic and Manager-Based Consumption

  • MultiTopicConsumer creates a Consumer per topic with shared security and consumer settings
  • ConsumerManager registers handlers globally or per-instance and starts workers with configurable concurrency

Diagram sources

Section sources

OpenTelemetry Tracing

  • ProducerWithTracing and ConsumerWithTracing wrap underlying components
  • Inject/extract tracing context into/from message headers
  • Spans capture messaging attributes and status

Diagram sources

Section sources

Dependency Analysis

  • Configuration depends on embedded defaults and environment variables
  • Builders depend on options and configuration
  • Consumers depend on Kafka reader internals; producers depend on Kafka writer internals
  • Tracing depends on OpenTelemetry SDK
  • Logging is decoupled via Logger interface

Diagram sources

Section sources

Performance Considerations

  • Batching: Tune BatchSize, BatchBytes, BatchTimeout for throughput/latency balance
  • Compression: Choose appropriate codec (e.g., snappy) to trade CPU for bandwidth
  • Balancer: Select partitioning strategy aligned with workload (round-robin, hash)
  • Async mode: Enable for fire-and-forget scenarios; monitor WriterStats for failures
  • Timeouts: Adjust WriteTimeout/ReadTimeout to match network conditions
  • Consumer intervals: Tune CommitInterval, HeartbeatInterval, SessionTimeout for stability and latency
  • Partitioning: Prefer consumer group mode for horizontal scaling; use single-partition mode sparingly
  • Retries and DLQ: Configure RetryConfig and DLQTopic to handle transient failures without blocking

[No sources needed since this section provides general guidance]

Troubleshooting Guide

  • Health checks: Use Ping() on Producer/Consumer/MultiTopicConsumer to validate connectivity
  • Logs: Set Logger/ErrorLogger via WithProducerLogger/WithConsumerLogger; integrate with structured logging stack
  • Stats: Inspect WriterStats/ReaderStats for connection and throughput anomalies
  • Lag: Use Lag() to detect consumer backlog; investigate slow handlers or insufficient workers
  • DLQ: Verify DLQTopic delivery and payload headers for failed messages
  • Environment overrides: Confirm environment variables are set correctly and parsed (comma-separated brokers)
  • Tracing: Enable ProducerWithTracing/ConsumerWithTracing to correlate end-to-end flows

Section sources

Conclusion

The Kafka integration provides a robust, configurable, and observable foundation for messaging. Defaults ensure safe operation, environment variables enable flexible deployments, and comprehensive APIs expose health and performance signals. With optional tracing and structured logging, operators can monitor and troubleshoot effectively while tuning performance and hardening security.

[No sources needed since this section summarizes without analyzing specific files]

Appendices

Configuration Validation and Defaults

  • Defaults are embedded and loaded via DefaultProducerConfig/DefaultConsumerConfig
  • Validation occurs at builder time (e.g., missing brokers or topic yields an error)
  • Environment variables override defaults; partition environment distinguishes explicit single-partition vs consumer-group modes

Section sources

Environment-Specific Settings and Examples

  • Broker lists: Comma-separated values via KAFKAX_BROKERS
  • Producer topic and client ID: KAFKAX_PRODUCER_TOPIC, KAFKAX_CLIENT_ID
  • Consumer topic, group ID, partition: KAFKAX_CONSUMER_TOPIC, KAFKAX_CONSUMER_GROUP_ID, KAFKAX_CONSUMER_PARTITION
  • SASL/TLS: KAFKAX_SASL_* and KAFKAX_TLS_* flags and paths
  • Example patterns:
    • Minimal production: set brokers, topic, group ID, and SASL/TLS as needed
    • Single-partition testing: set KAFKAX_CONSUMER_PARTITION to target a specific partition
    • TLS-only: set KAFKAX_TLS_ENABLED and provide CA/cert/key paths

Section sources

Security Hardening

  • Prefer TLS with strict certificate verification; disable SkipVerify in production
  • Use SASL with strong mechanisms and credentials; avoid plaintext passwords
  • Limit broker access via network policies and firewall rules
  • Rotate certificates and credentials regularly; manage secrets externally

Section sources

Performance Tuning Parameters

  • Producer: BatchSize, BatchBytes, BatchTimeout, RequiredAcks, MaxAttempts, Async, Compression, Balancer, WriteTimeout, ReadTimeout
  • Consumer: MinBytes, MaxBytes, MaxWait, CommitInterval, HeartbeatInterval, SessionTimeout, RebalanceTimeout, IsolationLevel, WatchPartitionChanges, PartitionWatchInterval
  • Multi-consumer: Share settings across topics; tune workers per ConsumerManager

Section sources

Capacity Planning and Scaling

  • Horizontal scale consumers via consumer groups; ensure partitions match concurrency needs
  • Monitor ReaderStats and Lag to assess throughput and backpressure
  • Use DLQ to isolate problematic messages and prevent system stalls
  • Right-size batching and compression for workload characteristics

Section sources