Skip to content

Multi-Consumer and Parallel Processing

**Referenced Files in This Document** - [[multi_consumer.go]](file/bi-common/mq/kafkax/multi-consumer.go) - [[consumer_manager.go]](file/bi-common/mq/kafkax/consumer-manager.go) - [[worker_pool.go]](file/bi-common/mq/kafkax/worker-pool.go) - [[consumer.go]](file/bi-common/mq/kafkax/consumer.go) - [[consume.go]](file/bi-common/mq/kafkax/consume.go) - [[retry.go]](file/bi-common/mq/kafkax/retry.go) - [[multi_consumer_option.go]](file/bi-common/mq/kafkax/multi-consumer-option.go) - [[config.go]](file/bi-common/mq/kafkax/config.go) - [[kafkax-default.yaml]](file/bi-common/mq/kafkax/kafkax-default.yaml) - [[option.go]](file/bi-common/mq/kafkax/option.go) - [[balancer.go]](file/bi-common/mq/kafkax/balancer.go) - [[dialer.go]](file/bi-common/mq/kafkax/dialer.go) - [[errors.go]](file/bi-common/mq/kafkax/errors.go) - [[multi_consumer_test.go]](file/bi-common/mq/kafkax/multi-consumer-test.go)

Table of Contents

  1. Introduction
  2. Project Structure
  3. Core Components
  4. Architecture Overview
  5. Detailed Component Analysis
  6. Dependency Analysis
  7. Performance Considerations
  8. Troubleshooting Guide
  9. Conclusion
  10. Appendices

Introduction

This document explains multi-consumer patterns and parallel processing in the Kafka pipeline. It focuses on:

  • MultiConsumer for handling multiple topics and partitions simultaneously
  • Consumer group management, partition assignment strategies, and load balancing
  • Consumer Manager for coordinating multiple consumers and managing lifecycle
  • Parallel processing patterns: batch processing, concurrent message handling, and resource management
  • Configuration examples for scaling consumers, managing offsets, and handling rebalancing
  • Performance considerations, memory management, and error propagation in multi-consumer setups
  • Practical consumer group configurations and troubleshooting guides for partition distribution issues

Project Structure

The Kafka utilities reside under bi-common/mq/kafkax and expose:

  • Consumer and MultiConsumer for topic consumption
  • ConsumerManager for orchestrating multiple consumers
  • WorkerPool for concurrent message handling
  • Retry and DLQ mechanisms
  • Configuration and defaults for brokers, groups, timeouts, and security

Diagram sources

Section sources

Core Components

  • MultiTopicConsumer: Creates and runs one Consumer per topic, enabling simultaneous consumption across topics. Supports automatic and manual commit modes, graceful shutdown, and error propagation.
  • ConsumerManager: Manages multiple consumers defined by topic/group/workers, registers handlers globally or per-instance, and coordinates lifecycle via start/stop.
  • Consumer: Wraps kafka.Reader with configuration, logging, graceful shutdown, and DLQ support. Provides single-message and batch consumption with retry/DLQ.
  • WorkerPool: Starts N workers consuming from a channel fed by a dispatcher loop; supports configurable concurrency, buffering, retry/backoff, and optional manual commit.
  • RetryConfig: Defines exponential backoff with jitter and max attempts; integrates with Consumer and WorkerPool.
  • Config and Options: Provide default values and flexible overrides for brokers, group/session timeouts, isolation level, partition watching, SASL/TLS, and more.
  • Balancer and Dialer: Transport and authentication builders for secure connections and partition balancing.

Section sources

Architecture Overview

The system supports two primary patterns:

  • Multi-consumer per topic: Each topic gets its own Consumer; MultiTopicConsumer starts goroutines per topic and propagates the first error or context cancellation.
  • Worker pool per topic: Single Consumer per topic with N concurrent workers sharing a buffered channel; supports retries, DLQ, and optional manual commit.

Diagram sources

Detailed Component Analysis

MultiTopicConsumer

  • Responsibilities:
    • Build per-topic Consumers with shared/global settings
    • Start each Consumer concurrently
    • Propagate first error or context cancellation
    • Support manual commit mode
    • Graceful shutdown with signal handling
    • Expose stats and lifecycle helpers
  • Key behaviors:
    • Uses TopicConfig to associate a handler per topic
    • Falls back to global GroupID if per-topic GroupID is empty
    • Starts goroutines per topic and waits on WaitGroup
    • Returns immediately on first error; cancels context to stop others

Diagram sources

Section sources

ConsumerManager

  • Responsibilities:
    • Define multiple consumers via ConsumerConfigItem (topic, group_id, workers)
    • Register handlers globally or per-instance
    • Start all consumers with WorkerPool
    • Stop all consumers gracefully
    • Track running count and manage cleanup functions
  • Key behaviors:
    • Copies global handlers into instance registry
    • Applies ConsumeConfig to WorkerPool when present
    • Starts each consumer with WatchPartitionChanges enabled by default

Diagram sources

Section sources

WorkerPool (Parallel Processing)

  • Responsibilities:
    • Launch N workers consuming from a buffered channel
    • Dispatch messages from kafka.Reader to workers
    • Apply retry/backoff and optional DLQ
    • Optional auto/manual commit
    • Provide OnError callback and ConsumeConfig integration
  • Key behaviors:
    • Channel buffer defaults to Workers * 2
    • Retry uses exponential backoff with jitter
    • DLQ topic and writer supported; falls back to cached writer

Diagram sources

Section sources

Consumer (Single Topic)

  • Responsibilities:
    • Wrap kafka.Reader with configuration and logging
    • Provide ReadMessage/FetchMessage/CommitMessages
    • Support manual commit mode and DLQ
    • Graceful shutdown and health checks
  • Key behaviors:
    • Supports single-partition mode vs consumer group mode
    • Isolation level mapping to read_committed
    • DLQ writer caching and reuse

Diagram sources

Section sources

Configuration and Defaults

  • Default values for consumers and producers are embedded in YAML and loaded at runtime.
  • Consumer defaults emphasize partition watching and sensible timeouts for robustness.
  • Options provide granular overrides for brokers, group/session timeouts, isolation level, SASL/TLS, and batching.

Diagram sources

Section sources

Dependency Analysis

  • MultiTopicConsumer depends on Consumer and uses per-topic handlers.
  • ConsumerManager depends on Consumer and WorkerPool; it also uses global handler registrations.
  • WorkerPool depends on Consumer’s dispatch loop and retry/DLQ logic.
  • Dialer builds SASL and TLS transports for Reader/Writer.
  • Balancer selects partitioning strategy for producers.

Diagram sources

Section sources

Performance Considerations

  • Concurrency:
    • WorkerPool workers scale throughput per topic; tune Workers and ChannelBuffer to match CPU and IO capacity.
    • MultiTopicConsumer runs one goroutine per topic; keep topic count aligned to available cores.
  • Backpressure:
    • ChannelBuffer prevents hot-path blocking; set to Workers * 2 by default but adjust based on latency targets.
  • Retries and DLQ:
    • Exponential backoff with jitter reduces thundering herd; configure MaxRetries and MaxBackoff to balance latency vs reliability.
    • DLQ topic should be separate to avoid reprocessing loops.
  • Offsets and commits:
    • AutoCommit simplifies correctness but may increase duplication risk; ManualCommit gives precise control.
    • CommitInterval affects lag; smaller intervals reduce lag but increase metadata writes.
  • Network and security:
    • TLS adds overhead; enable only when required.
    • SASL mechanisms vary in CPU cost; choose appropriate mechanism.
  • Partition watching:
    • WatchPartitionChanges helps with dynamic topics; consider impact on rebalance frequency.

[No sources needed since this section provides general guidance]

Troubleshooting Guide

Common issues and resolutions:

  • No consumers configured or brokers missing:
    • Ensure ConsumerManager has brokers and consumers; verify MultiTopicConsumer has topics and brokers.
  • Handler not registered:
    • Register handlers globally or per-instance before starting consumers.
  • First-start no consumption:
    • Enable WatchPartitionChanges to trigger initial rebalance after topic creation.
  • Rebalancing storms:
    • Tune HeartbeatInterval, SessionTimeout, and RebalanceTimeout; avoid frequent consumer churn.
  • Memory spikes during bursts:
    • Increase ChannelBuffer and Workers; monitor ReaderStats and adjust backpressure.
  • DLQ not receiving messages:
    • Verify DLQTopic and DLQWriter; ensure security settings match producer transport.
  • Offsets not committing:
    • In manual commit mode, ensure handler returns success before commit; check commit errors.
  • Authentication failures:
    • Validate SASL mechanism and credentials; confirm TLS CA/cert/key paths.

Section sources

Conclusion

The Kafka utilities provide robust primitives for multi-consumer and parallel processing:

  • MultiTopicConsumer enables simultaneous consumption across topics with shared configuration and coordinated lifecycle.
  • ConsumerManager streamlines orchestration of multiple consumers with worker pools and centralized handler registration.
  • WorkerPool delivers efficient concurrent processing with retry/backoff and DLQ integration.
  • Comprehensive configuration and defaults, combined with strong error handling and graceful shutdown, support scalable and resilient deployments.

[No sources needed since this section summarizes without analyzing specific files]

Appendices

Configuration Examples and Best Practices

  • Scaling consumers:
    • Use ConsumerManager with multiple ConsumerConfigItem entries for different topics and groups.
    • Adjust Workers per topic based on throughput needs; monitor lag and CPU saturation.
  • Managing offsets:
    • Prefer manual commit for idempotent processing; set CommitInterval to balance latency and throughput.
  • Handling rebalancing:
    • Keep HeartbeatInterval and SessionTimeout aligned; avoid overly aggressive timeouts.
    • Enable WatchPartitionChanges for dynamic topics.
  • Security:
    • Configure SASL and TLS via options; ensure certificates and mechanisms match cluster settings.
  • Testing:
    • Use tests to validate configuration composition and option application.

Section sources