Multi-Consumer and Parallel Processing
**Referenced Files in This Document** - [[multi_consumer.go]](file/bi-common/mq/kafkax/multi-consumer.go) - [[consumer_manager.go]](file/bi-common/mq/kafkax/consumer-manager.go) - [[worker_pool.go]](file/bi-common/mq/kafkax/worker-pool.go) - [[consumer.go]](file/bi-common/mq/kafkax/consumer.go) - [[consume.go]](file/bi-common/mq/kafkax/consume.go) - [[retry.go]](file/bi-common/mq/kafkax/retry.go) - [[multi_consumer_option.go]](file/bi-common/mq/kafkax/multi-consumer-option.go) - [[config.go]](file/bi-common/mq/kafkax/config.go) - [[kafkax-default.yaml]](file/bi-common/mq/kafkax/kafkax-default.yaml) - [[option.go]](file/bi-common/mq/kafkax/option.go) - [[balancer.go]](file/bi-common/mq/kafkax/balancer.go) - [[dialer.go]](file/bi-common/mq/kafkax/dialer.go) - [[errors.go]](file/bi-common/mq/kafkax/errors.go) - [[multi_consumer_test.go]](file/bi-common/mq/kafkax/multi-consumer-test.go)
Table of Contents
- Introduction
- Project Structure
- Core Components
- Architecture Overview
- Detailed Component Analysis
- Dependency Analysis
- Performance Considerations
- Troubleshooting Guide
- Conclusion
- Appendices
Introduction
This document explains multi-consumer patterns and parallel processing in the Kafka pipeline. It focuses on:
- MultiConsumer for handling multiple topics and partitions simultaneously
- Consumer group management, partition assignment strategies, and load balancing
- Consumer Manager for coordinating multiple consumers and managing lifecycle
- Parallel processing patterns: batch processing, concurrent message handling, and resource management
- Configuration examples for scaling consumers, managing offsets, and handling rebalancing
- Performance considerations, memory management, and error propagation in multi-consumer setups
- Practical consumer group configurations and troubleshooting guides for partition distribution issues
Project Structure
The Kafka utilities reside under bi-common/mq/kafkax and expose:
- Consumer and MultiConsumer for topic consumption
- ConsumerManager for orchestrating multiple consumers
- WorkerPool for concurrent message handling
- Retry and DLQ mechanisms
- Configuration and defaults for brokers, groups, timeouts, and security
Diagram sources
- [multi_consumer.go]
- [consumer_manager.go]
- [consumer.go]
- [consume.go]
- [worker_pool.go]
- [retry.go]
- [config.go]
- [option.go]
- [multi_consumer_option.go]
- [balancer.go]
- [dialer.go]
- [errors.go]
- [kafkax-default.yaml]
Section sources
- [multi_consumer.go]
- [consumer_manager.go]
- [worker_pool.go]
- [consume.go]
- [config.go]
- [option.go]
- [multi_consumer_option.go]
- [balancer.go]
- [dialer.go]
- [errors.go]
- [kafkax-default.yaml]
Core Components
- MultiTopicConsumer: Creates and runs one Consumer per topic, enabling simultaneous consumption across topics. Supports automatic and manual commit modes, graceful shutdown, and error propagation.
- ConsumerManager: Manages multiple consumers defined by topic/group/workers, registers handlers globally or per-instance, and coordinates lifecycle via start/stop.
- Consumer: Wraps kafka.Reader with configuration, logging, graceful shutdown, and DLQ support. Provides single-message and batch consumption with retry/DLQ.
- WorkerPool: Starts N workers consuming from a channel fed by a dispatcher loop; supports configurable concurrency, buffering, retry/backoff, and optional manual commit.
- RetryConfig: Defines exponential backoff with jitter and max attempts; integrates with Consumer and WorkerPool.
- Config and Options: Provide default values and flexible overrides for brokers, group/session timeouts, isolation level, partition watching, SASL/TLS, and more.
- Balancer and Dialer: Transport and authentication builders for secure connections and partition balancing.
Section sources
- [multi_consumer.go]
- [consumer_manager.go]
- [consumer.go]
- [worker_pool.go]
- [consume.go]
- [retry.go]
- [config.go]
- [option.go]
- [multi_consumer_option.go]
- [balancer.go]
- [dialer.go]
Architecture Overview
The system supports two primary patterns:
- Multi-consumer per topic: Each topic gets its own Consumer; MultiTopicConsumer starts goroutines per topic and propagates the first error or context cancellation.
- Worker pool per topic: Single Consumer per topic with N concurrent workers sharing a buffered channel; supports retries, DLQ, and optional manual commit.
Diagram sources
Detailed Component Analysis
MultiTopicConsumer
- Responsibilities:
- Build per-topic Consumers with shared/global settings
- Start each Consumer concurrently
- Propagate first error or context cancellation
- Support manual commit mode
- Graceful shutdown with signal handling
- Expose stats and lifecycle helpers
- Key behaviors:
- Uses TopicConfig to associate a handler per topic
- Falls back to global GroupID if per-topic GroupID is empty
- Starts goroutines per topic and waits on WaitGroup
- Returns immediately on first error; cancels context to stop others
Diagram sources
Section sources
ConsumerManager
- Responsibilities:
- Define multiple consumers via ConsumerConfigItem (topic, group_id, workers)
- Register handlers globally or per-instance
- Start all consumers with WorkerPool
- Stop all consumers gracefully
- Track running count and manage cleanup functions
- Key behaviors:
- Copies global handlers into instance registry
- Applies ConsumeConfig to WorkerPool when present
- Starts each consumer with WatchPartitionChanges enabled by default
Diagram sources
Section sources
WorkerPool (Parallel Processing)
- Responsibilities:
- Launch N workers consuming from a buffered channel
- Dispatch messages from kafka.Reader to workers
- Apply retry/backoff and optional DLQ
- Optional auto/manual commit
- Provide OnError callback and ConsumeConfig integration
- Key behaviors:
- Channel buffer defaults to Workers * 2
- Retry uses exponential backoff with jitter
- DLQ topic and writer supported; falls back to cached writer
Diagram sources
Section sources
Consumer (Single Topic)
- Responsibilities:
- Wrap kafka.Reader with configuration and logging
- Provide ReadMessage/FetchMessage/CommitMessages
- Support manual commit mode and DLQ
- Graceful shutdown and health checks
- Key behaviors:
- Supports single-partition mode vs consumer group mode
- Isolation level mapping to read_committed
- DLQ writer caching and reuse
Diagram sources
Section sources
Configuration and Defaults
- Default values for consumers and producers are embedded in YAML and loaded at runtime.
- Consumer defaults emphasize partition watching and sensible timeouts for robustness.
- Options provide granular overrides for brokers, group/session timeouts, isolation level, SASL/TLS, and batching.
Diagram sources
Section sources
Dependency Analysis
- MultiTopicConsumer depends on Consumer and uses per-topic handlers.
- ConsumerManager depends on Consumer and WorkerPool; it also uses global handler registrations.
- WorkerPool depends on Consumer’s dispatch loop and retry/DLQ logic.
- Dialer builds SASL and TLS transports for Reader/Writer.
- Balancer selects partitioning strategy for producers.
Diagram sources
- [multi_consumer.go]
- [consumer_manager.go]
- [worker_pool.go]
- [consume.go]
- [dialer.go]
- [balancer.go]
- [config.go]
- [kafkax-default.yaml]
Section sources
- [multi_consumer.go]
- [consumer_manager.go]
- [worker_pool.go]
- [consume.go]
- [dialer.go]
- [balancer.go]
- [config.go]
- [kafkax-default.yaml]
Performance Considerations
- Concurrency:
- WorkerPool workers scale throughput per topic; tune Workers and ChannelBuffer to match CPU and IO capacity.
- MultiTopicConsumer runs one goroutine per topic; keep topic count aligned to available cores.
- Backpressure:
- ChannelBuffer prevents hot-path blocking; set to Workers * 2 by default but adjust based on latency targets.
- Retries and DLQ:
- Exponential backoff with jitter reduces thundering herd; configure MaxRetries and MaxBackoff to balance latency vs reliability.
- DLQ topic should be separate to avoid reprocessing loops.
- Offsets and commits:
- AutoCommit simplifies correctness but may increase duplication risk; ManualCommit gives precise control.
- CommitInterval affects lag; smaller intervals reduce lag but increase metadata writes.
- Network and security:
- TLS adds overhead; enable only when required.
- SASL mechanisms vary in CPU cost; choose appropriate mechanism.
- Partition watching:
- WatchPartitionChanges helps with dynamic topics; consider impact on rebalance frequency.
[No sources needed since this section provides general guidance]
Troubleshooting Guide
Common issues and resolutions:
- No consumers configured or brokers missing:
- Ensure ConsumerManager has brokers and consumers; verify MultiTopicConsumer has topics and brokers.
- Handler not registered:
- Register handlers globally or per-instance before starting consumers.
- First-start no consumption:
- Enable WatchPartitionChanges to trigger initial rebalance after topic creation.
- Rebalancing storms:
- Tune HeartbeatInterval, SessionTimeout, and RebalanceTimeout; avoid frequent consumer churn.
- Memory spikes during bursts:
- Increase ChannelBuffer and Workers; monitor ReaderStats and adjust backpressure.
- DLQ not receiving messages:
- Verify DLQTopic and DLQWriter; ensure security settings match producer transport.
- Offsets not committing:
- In manual commit mode, ensure handler returns success before commit; check commit errors.
- Authentication failures:
- Validate SASL mechanism and credentials; confirm TLS CA/cert/key paths.
Section sources
Conclusion
The Kafka utilities provide robust primitives for multi-consumer and parallel processing:
- MultiTopicConsumer enables simultaneous consumption across topics with shared configuration and coordinated lifecycle.
- ConsumerManager streamlines orchestration of multiple consumers with worker pools and centralized handler registration.
- WorkerPool delivers efficient concurrent processing with retry/backoff and DLQ integration.
- Comprehensive configuration and defaults, combined with strong error handling and graceful shutdown, support scalable and resilient deployments.
[No sources needed since this section summarizes without analyzing specific files]
Appendices
Configuration Examples and Best Practices
- Scaling consumers:
- Use ConsumerManager with multiple ConsumerConfigItem entries for different topics and groups.
- Adjust Workers per topic based on throughput needs; monitor lag and CPU saturation.
- Managing offsets:
- Prefer manual commit for idempotent processing; set CommitInterval to balance latency and throughput.
- Handling rebalancing:
- Keep HeartbeatInterval and SessionTimeout aligned; avoid overly aggressive timeouts.
- Enable WatchPartitionChanges for dynamic topics.
- Security:
- Configure SASL and TLS via options; ensure certificates and mechanisms match cluster settings.
- Testing:
- Use tests to validate configuration composition and option application.
Section sources