Skip to content

Kafka Producer and Consumer Implementation

**Referenced Files in This Document** - [[consumer.go]](file/bi-common/mq/kafkax/consumer.go) - [[producer.go]](file/bi-common/mq/kafkax/producer.go) - [[config.go]](file/bi-common/mq/kafkax/config.go) - [[kafkax-default.yaml]](file/bi-common/mq/kafkax/kafkax-default.yaml) - [[option.go]](file/bi-common/mq/kafkax/option.go) - [[env.go]](file/bi-common/mq/kafkax/env.go) - [[dialer.go]](file/bi-common/mq/kafkax/dialer.go) - [[setup.go]](file/bi-common/mq/kafkax/setup.go) - [[consume.go]](file/bi-common/mq/kafkax/consume.go) - [[multi_consumer.go]](file/bi-common/mq/kafkax/multi-consumer.go) - [[retry.go]](file/bi-common/mq/kafkax/retry.go) - [[errors.go]](file/bi-common/mq/kafkax/errors.go)

Table of Contents

  1. Introduction
  2. Project Structure
  3. Core Components
  4. Architecture Overview
  5. Detailed Component Analysis
  6. Dependency Analysis
  7. Performance Considerations
  8. Troubleshooting Guide
  9. Conclusion
  10. Appendices

Introduction

This document describes the Kafka producer and consumer implementation used in the BI Analysis Platform. It covers the Consumer and Producer structs, configuration options, initialization patterns, message handling functions, authentication and TLS/SASL support, error handling, logging, graceful shutdown, and operational guidance such as connection management, health checks, and monitoring. Practical patterns for producer-consumer workflows, serialization/deserialization, and batch processing are included.

Project Structure

The Kafka integration lives under bi-common/mq/kafkax and exposes a thin wrapper around github.com/segmentio/kafka-go. Key areas:

  • Producer and Consumer creation and lifecycle
  • Configuration via YAML defaults, environment variables, and programmatic options
  • Authentication (SASL) and TLS
  • Consumption modes: auto-commit, manual-commit, and batch
  • Dead-letter queue (DLQ) handling and retry policies
  • Multi-topic consumer orchestration
  • Health checks and statistics

Diagram sources

Section sources

Core Components

  • Producer: wraps kafka.Writer, supports batching, compression, async mode, and various ack levels. Provides helpers for JSON, headers, and topic-specific sends.
  • Consumer: wraps kafka.Reader, supports auto/manual commit, batch consumption, partition selection, and consumer groups. Includes graceful shutdown and health checks.
  • Configuration: hierarchical defaults (YAML), environment overrides, and option builders for runtime customization.
  • Authentication and TLS: SASL PLAIN and SCRAM mechanisms, and TLS client certificates with optional CA bundle and hostname verification.
  • Retry and DLQ: configurable exponential backoff retry policy and automatic dead-letter routing.

Section sources

Architecture Overview

The wrapper composes kafka-go’s Reader and Writer with transport/dialer configuration for SASL/TLS. Producers and consumers are initialized with layered configuration from defaults, environment, and options. Consumers support three primary modes:

  • Auto-commit read loop
  • Manual-commit fetch loop
  • Batch accumulation with timeout and size thresholds

Diagram sources

Diagram sources

Detailed Component Analysis

Producer

  • Struct and fields: holds kafka.Writer, config pointer, and loggers.
  • Initialization:
    • NewProducer builds a kafka.Writer with transport (SASL/TLS), batching, timeouts, acks, compression, and balancer.
    • MustNewProducer panics on failure.
    • SetLogger updates writer loggers at runtime.
  • Health check: Ping connects to broker via dialer and queries broker list.
  • Message operations:
    • WriteMessages writes multiple messages.
    • SendMessage sends a record with key/value.
    • SendToTopic sends to a specific topic.
    • SendJSON marshals payload and sends.
    • SendJSONToTopic sends JSON to a specific topic.
    • SendMessageWithHeaders and SendToTopicWithHeaders add headers.
  • Monitoring: Stats returns writer stats; Writer returns underlying writer; Config returns current config; Close closes writer.

Diagram sources

Section sources

Consumer

  • Struct and fields: holds kafka.Reader, config pointer, loggers, shutdown state, and cached DLQ writer.
  • Initialization:
    • NewConsumer constructs kafka.Reader with Dialer (SASL/TLS), consumer group/partition, timeouts, and offsets.
    • MustNewConsumer panics on failure.
    • SetLogger sets internal loggers.
  • Health check: Ping connects to broker and verifies cluster info.
  • Message operations:
    • ReadMessage reads with auto-commit.
    • FetchMessage fetches without committing.
    • CommitMessages commits offsets.
    • SetOffset and SetOffsetAt seek to absolute offset/time.
    • Lag and Stats expose lag and reader stats.
    • Reader and Config expose underlying objects.
  • Lifecycle:
    • Close closes reader and DLQ writer.
    • IsShutdown indicates shutdown state.
    • GracefulShutdown cancels context and closes.
    • RunWithGracefulShutdown listens for signals and shuts down gracefully.

Diagram sources

Section sources

Configuration and Initialization Patterns

  • Defaults: YAML embedded defaults define sensible production values for batch sizes, timeouts, compression, and consumer group parameters.
  • Environment: LoadProducerFromEnv and LoadConsumerFromEnv populate configs from environment variables.
  • Options: WithProducer*/WithConsumer* builders override defaults and environment values.
  • Protobuf setup: SetupProducer/SetupConsumer convert protobuf configs to options for declarative configuration.

Diagram sources

Section sources

Message Handling Functions

  • Auto-commit loop: Consume continuously reads messages and invokes handler; errors are logged or routed to OnError callback; supports exponential backoff retry and DLQ.
  • Manual-commit loop: ConsumeWithManualCommit fetches messages without auto-commit; handler success triggers explicit CommitMessages.
  • Batch consumption: ConsumeBatch accumulates messages up to BatchSize or until BatchTimeout elapses, then calls handler; supports same retry/DLQ pipeline.
  • DLQ: sendToDLQ attaches original topic and error headers and writes to configured DLQ topic or cached DLQ writer.

Diagram sources

Section sources

Multi-Topic Consumer

  • MultiTopicConsumer orchestrates multiple Consumer instances for different topics, sharing security and consumer group settings.
  • Supports shared Consume/ConsumeWithManualCommit with per-topic handlers and unified graceful shutdown.
  • Stats aggregates reader stats per topic.

Diagram sources

Section sources

Authentication and TLS/SASL

  • SASL: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512 supported; disabled by default.
  • TLS: optional client certificate and CA bundle; hostname verification; skip-verify flag.
  • Transport vs Dialer: producers use kafka.Transport (with SASL/TLS), consumers use kafka.Dialer.

Diagram sources

Section sources

Connection Management and Health Checks

  • Ping: attempts TCP/TLS connect and queries broker list to validate connectivity.
  • Stats: exposes writer or reader stats for monitoring.
  • Graceful shutdown: signals and context cancellation coordinate shutdown with a bounded timeout.

Section sources

Serialization and Deserialization

  • JSON helpers: SendJSON and SendJSONToTopic marshal payloads; consumers receive []byte values.
  • Headers: SendMessageWithHeaders and related methods attach kafka.Header metadata.
  • Application-level deserialization: Consumers decode message.Value into domain types.

Section sources

Practical Patterns

  • Producer-consumer pair:
    • Producer: configure brokers, topic, batching, compression, and acks; optionally enable SASL/TLS.
    • Consumer: choose auto/manual commit or batch mode; set retry/DLQ; handle graceful shutdown.
  • Batch processing: tune BatchSize and BatchTimeout to balance latency and throughput.
  • Multi-topic: use MultiTopicConsumer to manage several topics with shared settings.

Section sources

Dependency Analysis

  • Internal dependencies:
    • config.go defines data structures and defaults.
    • option.go provides functional options to customize configs.
    • env.go loads environment variables into configs.
    • dialer.go constructs SASL/TLS transport/dialer.
    • setup.go converts protobuf configs to options.
    • producer.go and consumer.go depend on kafka.Reader/Writer and dialer/transport.
    • consume.go orchestrates message loops and integrates retry/DLQ.
    • multi_consumer.go composes multiple Consumer instances.
    • retry.go supplies exponential backoff policy.
    • errors.go centralizes error constants.

Diagram sources

Section sources

Performance Considerations

  • Batching: Tune BatchSize, BatchBytes, and BatchTimeout to optimize throughput and latency.
  • Compression: Enable compression (e.g., snappy) to reduce network bandwidth.
  • Acknowledgements: Adjust RequiredAcks for durability vs performance trade-offs.
  • Async mode: Producer Async enables fire-and-forget behavior; ensure application-level idempotency and DLQ handling.
  • Consumer isolation: IsolationLevel controls read_committed semantics for transactional consumers.
  • Partitioning: Use consumer groups for horizontal scaling; avoid single-partition mode unless necessary.
  • Backoff: Configure RetryConfig to avoid thundering herds on transient failures.

[No sources needed since this section provides general guidance]

Troubleshooting Guide

Common issues and remedies:

  • No brokers/topic specified: Initialization fails fast with explicit errors; ensure brokers and topic are set via options/YAML/env.
  • SASL/TLS misconfiguration: Invalid mechanisms or missing credentials/certs cause connection failures; verify mechanism and paths.
  • Connectivity problems: Use Ping to validate broker reachability; check network ACLs and DNS resolution.
  • Consumer not consuming: Enable WatchPartitionChanges and adjust PartitionWatchInterval for topics created dynamically.
  • Memory/CPU spikes: Reduce BatchSize/BatchTimeout, disable Async, or switch to manual commit with smaller batches.
  • DLQ not receiving messages: Verify DLQTopic and DLQWriter configuration; ensure DLQ topic exists and permissions are granted.

Section sources

Conclusion

The kafkax wrapper provides a robust, production-ready abstraction over kafka-go with strong defaults, flexible configuration, and comprehensive operational features. It supports secure connections, resilient consumption patterns, and scalable multi-topic orchestration. By tuning batching, compression, and retry policies, teams can achieve reliable and efficient Kafka integration in the BI Analysis Platform.

[No sources needed since this section summarizes without analyzing specific files]

Appendices

Configuration Reference

  • ProducerConfig fields: brokers, topic, client_id, batch_size, batch_bytes, batch_timeout, required_acks, max_attempts, async, compression, balancer, write_timeout, read_timeout, sasl, tls, logger, error_logger.
  • ConsumerConfig fields: brokers, topic, group_id, client_id, partition, min_bytes, max_bytes, start_offset, max_wait, commit_interval, heartbeat_interval, session_timeout, rebalance_timeout, retention_time, read_lag_interval, watch_partition_changes, partition_watch_interval, isolation_level, sasl, tls, logger, error_logger.
  • Environment variables: prefixed KAFKAX_* for brokers, client_id, SASL/TLS, producer and consumer settings.

Section sources