Skip to content

Message Processing and Transformation Pipeline

**Referenced Files in This Document** - [[consumer.go]](file/bi-common/mq/kafkax/consumer.go) - [[producer.go]](file/bi-common/mq/kafkax/producer.go) - [[consume.go]](file/bi-common/mq/kafkax/consume.go) - [[multi_consumer.go]](file/bi-common/mq/kafkax/multi-consumer.go) - [[retry.go]](file/bi-common/mq/kafkax/retry.go) - [[config.go]](file/bi-common/mq/kafkax/config.go) - [[kafkax-default.yaml]](file/bi-common/mq/kafkax/kafkax-default.yaml) - [[errors.go]](file/bi-common/mq/kafkax/errors.go) - [[consumer.go]](file/bi-basic/internal/kafka/consumer/consumer.go) - [[producer.go]](file/bi-basic/internal/kafka/producer/producer.go) - [[config.go]](file/bi-basic/internal/kafka/config/config.go) - [[consumer.go]](file/bi-api-jushuitan/internal/service/consumer-handlers/consumer.go) - [[consumer.go]](file/bi-api-leke/internal/service/consumer-handlers/consumer.go)

Table of Contents

  1. Introduction
  2. Project Structure
  3. Core Components
  4. Architecture Overview
  5. Detailed Component Analysis
  6. Dependency Analysis
  7. Performance Considerations
  8. Troubleshooting Guide
  9. Conclusion

Introduction

This document describes the complete message processing and transformation pipeline in the Kafka data synchronization system. It explains how external data sources enter the system via Kafka, how messages are routed and validated, transformed into internal formats, and delivered to internal processing services. It also covers error handling, dead letter queues (DLQ), message ordering guarantees, idempotent processing, duplicate detection, and monitoring/debugging approaches.

Project Structure

The Kafka pipeline spans two primary layers:

  • Common Kafka utilities and abstractions under bi-common/mq/kafkax
  • Application-specific producers/consumers under bi-basic and API services

Diagram sources

Section sources

Core Components

  • Producer: Sends messages to Kafka topics with batching, compression, and optional async writes.
  • Consumer: Reads messages from Kafka topics, supports manual/automatic commit, retries, and DLQ.
  • MultiTopicConsumer: Manages multiple topic subscriptions with per-topic handlers.
  • RetryConfig: Defines exponential backoff with jitter and max attempts.
  • Config: Centralized producer/consumer configuration with YAML defaults and environment overrides.
  • Application-level Producers/Consumers: bi-basic provides topic-specific producers/consumers for order, refund, and SKU events.
  • Handler Registration: API services register topic-specific handlers with retry and DLQ policies.

Section sources

Architecture Overview

The pipeline begins with external systems emitting events to Kafka topics. Application services subscribe to topics, transform messages into internal formats, and deliver them to downstream systems (e.g., StarRocks via StreamLoad). Errors are retried with exponential backoff; failures are routed to DLQ topics for later inspection.

Diagram sources

Detailed Component Analysis

Kafka Producer (bi-common)

  • Responsibilities:
    • Build transport with SASL/TLS
    • Configure batching, compression, acks, timeouts
    • Send JSON or raw messages with optional headers
  • Key behaviors:
    • Async vs synchronous writes controlled by configuration
    • JSON helpers marshal payload before sending
    • Topic override supported for cross-topic routing

Section sources

Kafka Consumer (bi-common)

  • Responsibilities:
    • Create kafka.Reader with group/partition/session settings
    • Read/Fetch messages, commit offsets, expose stats
    • Graceful shutdown with signal handling
  • Modes:
    • Single partition vs consumer group
    • Manual vs automatic commit
    • Batch consumption with timers

Section sources

Multi-Topic Consumer

  • Responsibilities:
    • Manage multiple Consumer instances for different topics
    • Per-topic handler registration
    • Shared logging and graceful shutdown
  • Behavior:
    • Starts one goroutine per topic
    • Aggregates errors and cancels on first failure

Section sources

Retry and DLQ

  • RetryConfig:
    • Max retries, initial/backoff caps, multiplier, jitter
  • DLQ:
    • On max retries exceeded, message is sent to configured DLQ topic with original topic and error headers
    • Supports shared DLQ writer or cached per-topic writer

Section sources

Application-Level Producer/Consumer (bi-basic)

  • Producer:
    • Sends to topic-specific topics (order-combined, after-sale, goods-sku, kafka_data)
    • Supports raw and typed messages
  • Consumer:
    • Subscribes to multiple topics with worker pools
    • Manual commit with dedicated offset committers
    • Batch processing with configurable batch sizes and byte limits

Section sources

Handler Registration (API Services)

  • Jushuitan and Leke services register topic-specific handlers with:
    • Retry policy (backoff, jitter, max retries)
    • DLQ topic naming convention
    • Error callbacks for logging

Section sources

Dependency Analysis

Diagram sources

Section sources

Performance Considerations

  • Batching and Compression:
    • Tune batch size/bytes/timeouts to balance latency and throughput
    • Enable compression (snappy) to reduce network overhead
  • Backoff Strategy:
    • Use jitter to avoid thundering herds during retries
  • Worker Pools:
    • Adjust worker counts per topic to match CPU and downstream capacity
  • Manual Commit:
    • Ensures exactly-once semantics when combined with idempotent processing
  • Monitoring:
    • Use Stats() and Reader/Writer stats to track lag, bytes/sec, and error rates

[No sources needed since this section provides general guidance]

Troubleshooting Guide

  • Health Checks:
    • Use Ping() on Producer/Consumer to validate broker connectivity
  • Logging:
    • Set Logger/ErrorLogger on Producer/Consumer for operational visibility
  • DLQ Inspection:
    • Messages routed to DLQ include original topic and error headers
  • Common Issues:
    • No brokers/topic specified: errors returned by constructors
    • TLS/SASL misconfiguration: validation errors during transport build
    • Consumer group rebalancing: enable partition change watching for auto-created topics

Section sources

Conclusion

The Kafka data synchronization system provides a robust, extensible pipeline for ingesting, transforming, and delivering messages to internal services. With configurable retries, DLQ, and manual commit support, it achieves reliable processing at scale. Application-specific producers and consumers encapsulate topic routing and transformation logic, while common utilities offer consistent configuration and operational controls.