Data Ingestion and Processing Pipeline
Table of Contents
- Introduction
- Project Structure
- Core Components
- Architecture Overview
- Detailed Component Analysis
- Dependency Analysis
- Performance Considerations
- Troubleshooting Guide
- Conclusion
Introduction
This document describes the data ingestion and processing pipeline that integrates external ERP systems (Leke ERP and Jushuitan ERP), streams real-time events via Apache Kafka, transforms and normalizes data, and loads it into StarRocks for analytics. It covers authentication with external APIs, Kafka producer and consumer configurations, message serialization, transformation to normalized tables, batch historical loads, streaming real-time updates, error handling, retries, validation, consistency guarantees, idempotency, and failure recovery.
Project Structure
The pipeline spans several modules:
- bi-api-jushuitan: HTTP/gRPC server, Kafka consumer manager, SDK helper for Jushuitan ERP, and StarRocks StreamLoad integration.
- bi-api-leke: Kratos application entrypoint with Kafka consumer manager.
- bi-common: Shared Kafka client library, StarRocks StreamLoad client, and common configuration structures.
- bi-basic: Business logic for ERP account management and related entities.
- bi-cron: Scheduled job runner for historical data loads.
Diagram sources
Section sources
Core Components
- Kafka Producer/Consumer Library: Provides typed configuration, batching, compression, retries, and worker pools for consumption.
- Jushuitan SDK Helper: Implements OAuth-like token exchange and authorization URL building for Jushuitan ERP.
- StarRocks StreamLoad Client: Handles HTTP-based StreamLoad with authentication, retry, and response parsing.
- Bi-API-Jushuitan: Orchestrates Kafka consumers and writes normalized records to StarRocks via StreamLoad.
- Bi-API-Leke: Kratos-based entrypoint wiring Kafka consumers.
- Bi-Basic: Manages ERP accounts and related metadata used by ingestion services.
- Bi-Cron: Scheduled runner for historical data loads.
Section sources
Architecture Overview
The ingestion pipeline follows a publish-subscribe pattern:
- External ERP systems push change events to Kafka topics.
- Kafka consumers (per service) subscribe to topics and process messages.
- Messages are transformed into normalized StarRocks table rows.
- StarRocks StreamLoad ingests JSON payloads into target tables.
Diagram sources
Detailed Component Analysis
Kafka-Based Ingestion Architecture
- Producer configuration supports batching, compression, acknowledgments, retries, timeouts, and SASL/TLS.
- Consumer configuration supports group-based partition assignment, isolation level, heartbeat/session timeouts, and partition change monitoring.
- Consumer Manager provisions multiple consumers with worker pools and optional global handler registration.
- Multi-Topic Consumer enables consuming from multiple topics concurrently with shared settings.
Diagram sources
Section sources
API Authentication with Jushuitan ERP
- Builds authorization URLs and exchanges authorization codes for tokens.
- Supports refresh tokens and signing requests.
- Used by bi-api-jushuitan to obtain credentials for ERP API calls.
Diagram sources
Section sources
Data Transformation and StarRocks StreamLoad
- Records are normalized and serialized to JSON.
- StreamLoad client posts JSON to StarRocks FE endpoint with Basic auth and label.
- Response includes status, row counts, and timing metrics; success determined by status field.
Diagram sources
Section sources
Batch Historical Loads and Streaming Updates
- Historical loads are orchestrated by bi-cron, scheduled via Kubernetes CronJob.
- Real-time updates are handled by Kafka consumers started in bi-api-jushuitan and bi-api-leke entrypoints.
Diagram sources
Section sources
Error Handling, Retries, and Validation
- Kafka producers support retries with exponential backoff and jitter.
- Consumers expose commit/ack controls and lag metrics; graceful shutdown supported.
- StreamLoad responses define success/failure semantics; tests simulate Nacos-backed configuration loading.
Diagram sources
Section sources
Data Consistency Guarantees and Idempotency
- Kafka consumer groups with committed offsets enable at-least-once delivery; idempotency depends on downstream deduplication.
- StreamLoad labels prevent duplicate ingestion of the same payload; success/failure semantics guide retry decisions.
- Snowflake IDs ensure uniqueness during normalization.
Section sources
Dependency Analysis
- bi-api-jushuitan depends on bi-common for Kafka and StarRocks clients.
- Both bi-api services depend on bi-common for shared configuration and messaging primitives.
- bi-basic provides entity definitions and business logic for ERP account management.
Diagram sources
Section sources
Performance Considerations
- Producer batching and compression reduce network overhead; tune BatchSize/BatchBytes/BatchTimeout per throughput targets.
- Consumer worker pools parallelize message processing; adjust worker count based on CPU and IO capacity.
- StreamLoad timeouts and retry limits should align with SLAs; monitor response timings for bottlenecks.
- Partitioning strategy should match topic volume and consumer scaling needs.
Troubleshooting Guide
- Kafka connectivity: Use Ping helpers on producer/consumer to validate broker reachability.
- Consumer lag: Monitor Lag() and ReadLagInterval settings to detect backlog.
- StreamLoad failures: Inspect Response fields (Status, Message, ErrorURL) and re-ingest with unique labels.
- Configuration loading: Verify Nacos-backed configs for StarRocks and Kafka; fallback defaults are embedded.
Section sources
Conclusion
The pipeline integrates external ERP systems with a robust Kafka-based ingestion architecture, enabling scalable real-time processing and reliable historical loads into StarRocks. With structured authentication, typed Kafka configurations, explicit transformation and validation, and comprehensive error handling/recovery, the system supports high-throughput, observable, and maintainable data operations.