数据流设计
**本文引用的文件** - [[bi-common/mq/kafkax/producer.go]](../file/bi-common/mq/kafkax/producer.go) - [[bi-common/mq/kafkax/consumer.go]](../file/bi-common/mq/kafkax/consumer.go) - [[bi-common/mq/kafkax/multi_consumer.go]](../file/bi-common/mq/kafkax/multi-consumer.go) - [[bi-common/mq/kafkax/producer_tx.go]](../file/bi-common/mq/kafkax/producer-tx.go) - [[bi-common/mq/kafkax/config.go]](../file/bi-common/mq/kafkax/config.go) - [[bi-common/mq/kafkax/consume.go]](../file/bi-common/mq/kafkax/consume.go) - [[bi-api-jushuitan/internal/data/client/streamload.go]](../file/bi-api-jushuitan/internal/data/client/streamload.go) - [[bi-basic/internal/data/client/streamload.go]](../file/bi-basic/internal/data/client/streamload.go) - [[bi-api-leke/internal/data/client/streamload.go]](../file/bi-api-leke/internal/data/client/streamload.go) - [[bi-common/mq/kafkax/kafkax-default.yaml]](../file/bi-common/mq/kafkax/kafkax-default.yaml) - [[bi-proto/kafka/v1/kafka.pb.go]](../file/bi-proto/kafka/v1/kafka.pb.go) - [[bi-server/api/bi_base/v1/kafka/kafka.pb.go]](../file/bi-server/api/bi-base/v1/kafka/kafka.pb.go)
目录
引言
本文件面向BI分析平台的数据流设计,系统性阐述从外部数据源到最终用户展示的完整数据流转路径,重点覆盖以下方面:
- 实时数据流与批处理数据流的差异与适用场景
- Kafka消息队列在数据流中的角色与消息传递机制
- 数据ETL流程(采集、清洗、转换、加载)
- 数据一致性与事务处理策略
- 数据延迟与数据丢失的处理策略
- 数据流图与关键处理节点说明
项目结构
围绕数据流的关键模块主要分布在以下子项目与公共组件中:
- 公共消息中间件库:bi-common/mq/kafkax(生产者、消费者、多主题消费者、事务生产者、配置与消费控制)
- 数据加载客户端:bi-api-jushuitan、bi-basic、bi-api-leke 的 streamload 客户端封装
- 协议与模型:bi-proto 与 bi-server 中的 Kafka 协议定义
- 默认配置:bi-common/mq/kafkax/kafkax-default.yaml
图表来源
- [bi-common/mq/kafkax/config.go]
- [bi-common/mq/kafkax/producer.go]
- [bi-common/mq/kafkax/consumer.go]
- [bi-common/mq/kafkax/multi_consumer.go]
- [bi-common/mq/kafkax/producer_tx.go]
- [bi-common/mq/kafkax/consume.go]
- [bi-api-jushuitan/internal/data/client/streamload.go]
- [bi-api-leke/internal/data/client/streamload.go]
- [bi-basic/internal/data/client/streamload.go]
- [bi-proto/kafka/v1/kafka.pb.go]
- [bi-server/api/bi_base/v1/kafka/kafka.pb.go]
章节来源
- [bi-common/mq/kafkax/config.go]
- [bi-common/mq/kafkax/producer.go]
- [bi-common/mq/kafkax/consumer.go]
- [bi-common/mq/kafkax/multi_consumer.go]
- [bi-common/mq/kafkax/producer_tx.go]
- [bi-common/mq/kafkax/consume.go]
- [bi-api-jushuitan/internal/data/client/streamload.go]
- [bi-api-leke/internal/data/client/streamload.go]
- [bi-basic/internal/data/client/streamload.go]
- [bi-proto/kafka/v1/kafka.pb.go]
- [bi-server/api/bi_base/v1/kafka/kafka.pb.go]
核心组件
- Kafka生产者:负责将结构化消息写入指定主题,支持同步/异步、批量、压缩、幂等与事务能力。
- Kafka消费者:提供单条读取、拉取、自动/手动提交offset、批量消费、重试与DLQ投递。
- 多主题消费者:为多个主题分别配置消费者组,统一管理生命周期与优雅关闭。
- 事务生产者:基于幂等与事务协调器实现“恰好一次”语义,适合跨步骤的原子性写入。
- StreamLoad客户端:封装StarRocks StreamLoad接口,实现JSON数据的高效批量写入与部分更新。
- 协议模型:Kafka消息结构与批量发送响应模型,确保跨服务一致的序列化/反序列化。
章节来源
- [bi-common/mq/kafkax/producer.go]
- [bi-common/mq/kafkax/consumer.go]
- [bi-common/mq/kafkax/multi_consumer.go]
- [bi-common/mq/kafkax/producer_tx.go]
- [bi-api-jushuitan/internal/data/client/streamload.go]
- [bi-basic/internal/data/client/streamload.go]
- [bi-proto/kafka/v1/kafka.pb.go]
架构总览
下图展示了从外部数据源到StarRocks的典型数据流路径,以及Kafka在其中的枢纽作用。
图表来源
- [bi-common/mq/kafkax/producer.go]
- [bi-common/mq/kafkax/consumer.go]
- [bi-common/mq/kafkax/multi_consumer.go]
- [bi-common/mq/kafkax/producer_tx.go]
- [bi-common/mq/kafkax/kafkax-default.yaml]
详细组件分析
Kafka生产者与消息传递机制
- 批量与异步:支持批量大小、字节阈值、批次超时与异步写入,提升吞吐。
- 压缩与负载均衡:可配置压缩算法与分区均衡策略,优化网络与分区利用。
- 安全与超时:支持SASL/TLS与读写超时,保障传输安全与稳定性。
- 重试与幂等:通过最大重试次数与幂等写入,降低重复与丢失风险。
图表来源
章节来源
Kafka消费者与消费模式
- 自动提交:适合低延迟、允许重复的场景;消费者组自动维护offset。
- 手动提交:严格一次语义,处理成功后再提交offset,避免重复或丢失。
- 批量消费:按批次聚合消息,减少处理开销,适合高吞吐场景。
- 重试与DLQ:指数退避重试,超过最大次数后投递死信队列,保障消息不丢失。
图表来源
章节来源
多主题消费者与消费者组管理
- 为不同业务主题配置独立消费者,共享安全与超时参数。
- 支持优雅关闭与信号监听,统一管理多个消费者生命周期。
- 提供健康检查与统计信息,便于运维观测。
图表来源
章节来源
事务生产者与一致性保证
- 事务ID与幂等写入:要求至少all确认,结合事务协调器实现原子提交/中止。
- 事务内执行:提供ExecuteInTransaction封装,自动Begin/Commit/Abort。
- 优雅关闭:若存在未提交事务,关闭前自动中止,避免悬挂状态。
图表来源
章节来源
数据ETL流程(采集-清洗-转换-加载)
- 采集:业务系统通过生产者将事件写入Kafka主题。
- 清洗/转换:消费者对消息进行解析、校验、映射与转换,必要时进行聚合或派生计算。
- 加载:转换后的数据通过StreamLoad客户端批量写入StarRocks,支持部分更新与列过滤。
图表来源
- [bi-common/mq/kafkax/producer.go]
- [bi-common/mq/kafkax/consume.go]
- [bi-api-jushuitan/internal/data/client/streamload.go]
- [bi-basic/internal/data/client/streamload.go]
章节来源
实时流与批处理流对比与场景
- 实时流(流式处理):低延迟、高吞吐,适用于监控告警、实时风控、即时报表。消费者采用手动提交与批量消费,配合重试与DLQ。
- 批处理流(定时/周期性):强调一致性与顺序,适用于离线数仓构建、每日汇总、月结报表。生产者可采用幂等+事务,消费者采用自动提交或严格一次。
章节来源
数据一致性与事务策略
- 幂等写入:生产者配置幂等,避免分区重放导致的重复。
- 事务写入:跨步骤或多主题的原子性写入,失败自动中止。
- 消费一致性:手动提交确保“处理成功再提交”,结合重试与DLQ,实现至少一次或恰好一次的语义选择。
章节来源
- [bi-common/mq/kafkax/producer.go]
- [bi-common/mq/kafkax/producer_tx.go]
- [bi-common/mq/kafkax/consume.go]
Kafka消息模型与协议
- KafkaMessage/KafkaData:统一的消息结构,支持键、值与头部。
- 批量发送响应:包含成功标记、发送数量与消息,便于上层统计与告警。
章节来源
依赖关系分析
- 生产者依赖配置模块与传输层(SASL/TLS),并通过Writer写入Broker。
- 消费者依赖Reader与重试/DLQ逻辑,支持单分区与消费者组两种模式。
- 多主题消费者聚合多个消费者,统一生命周期管理。
- StreamLoad客户端依赖StarRocks StreamLoad服务,负责最终落库。
图表来源
- [bi-common/mq/kafkax/config.go]
- [bi-common/mq/kafkax/producer.go]
- [bi-common/mq/kafkax/consumer.go]
- [bi-common/mq/kafkax/multi_consumer.go]
- [bi-api-jushuitan/internal/data/client/streamload.go]
- [bi-api-leke/internal/data/client/streamload.go]
- [bi-basic/internal/data/client/streamload.go]
章节来源
- [bi-common/mq/kafkax/config.go]
- [bi-common/mq/kafkax/producer.go]
- [bi-common/mq/kafkax/consumer.go]
- [bi-common/mq/kafkax/multi_consumer.go]
- [bi-api-jushuitan/internal/data/client/streamload.go]
- [bi-api-leke/internal/data/client/streamload.go]
- [bi-basic/internal/data/client/streamload.go]
性能考量
- 批量与压缩:合理设置BatchSize/BatchBytes/BatchTimeout与压缩算法,平衡延迟与吞吐。
- 分区与均衡:根据主题分区数与消费者组规模调整分区策略,避免热点与倾斜。
- 超时与重试:WriteTimeout/ReadTimeout与指数退避策略,避免阻塞放大。
- 负载隔离:多主题消费者按业务域拆分,避免相互影响。
故障排查指南
- 生产者/消费者健康检查:通过Ping方法验证Broker连通性与元数据可用性。
- 消费延迟观测:使用Lag()与Stats()获取延迟与统计信息,定位积压。
- 优雅关闭:RunWithGracefulShutdown监听系统信号,确保处理完当前消息后退出。
- DLQ与重试:检查最大重试次数与DLQ配置,定位异常消息与根因。
章节来源
结论
本设计以Kafka为核心枢纽,结合生产者/消费者/多主题消费者与事务生产者,形成高吞吐、可扩展、可治理的数据流体系;通过StreamLoad实现高效批量加载,支撑BI分析层的实时与离线需求。通过幂等、事务、手动提交与DLQ等机制,兼顾一致性与可用性,并提供完善的可观测性与故障恢复能力。
附录
- 默认配置参考:kafkax-default.yaml
- 协议模型参考:bi-proto与bi-server中的Kafka消息定义
章节来源