bi-api-leke 乐客数据同步
**本文引用的文件** - [[README.md]](../file/bi-api-leke/readme.md) - [[go.mod]](../file/bi-api-leke/go.mod) - [[main.go]](../file/bi-api-leke/cmd/bi-api-leke/main.go) - [[APITYPES_INTEGRATION.md]](../file/bi-api-leke/apitypes-integration.md) - [[shop.go]](../file/bi-api-leke/internal/service/shop.go) - [[goods.go]](../file/bi-api-leke/internal/service/goods.go) - [[order.go]](../file/bi-api-leke/internal/service/order.go) - [[client.go]](../file/bi-api-leke/internal/data/client/client.go) - [[database.go]](../file/bi-api-leke/internal/data/store/database.go) - [[shop.go]](../file/bi-api-leke/internal/biz/usecase/shop.go) - [[goods.go]](../file/bi-api-leke/internal/biz/usecase/goods.go) - [[order.go]](../file/bi-api-leke/internal/biz/usecase/order.go) - [[kafkax.go]](../file/bi-api-leke/internal/data/client/kafkax.go) - [[order.go]](../file/bi-api-leke/internal/service/consumer-handlers/order.go) - [[grpc_client.go]](../file/bi-api-leke/internal/data/client/grpc-client.go)
更新摘要
所做更改
- 更新了依赖分析部分,反映移除了对 bi-proto 模块的依赖
- 新增了服务间通信机制的说明
- 更新了架构图以体现新的通信方式
目录
简介
bi-api-leke 是基于 Kratos 框架的乐客(Taobao)数据同步服务,负责对接淘宝商家数据(订单、商品、退款等),并通过 Kafka 实现异步数据同步。项目采用分层架构(服务层、业务层、数据层),结合 gRPC/HTTP 提供 API,使用 Wire 进行依赖注入,并通过 Nacos 进行配置管理。
项目结构
- 应用入口与依赖注入
- cmd/bi-api-leke/main.go:应用启动、配置加载、服务注册、Kafka 消费者管理
- internal/data/client/client.go:客户端提供者集合(Kafka、gRPC、StreamLoad、Kafka 生产者工厂)
- 服务层
- internal/service/shop.go:店铺信息服务(获取淘宝用户信息)
- internal/service/goods.go:商品信息服务(商品详情、在售商品同步)
- internal/service/order.go:订单信息服务(全量/增量订单同步)
- internal/service/consumer_handlers/order.go:Kafka 订单消费处理器(全量/增量/退款)
- 业务层
- internal/biz/usecase/shop.go:店铺业务逻辑(调用 leke_helper、记录请求日志)
- internal/biz/usecase/goods.go:商品业务逻辑(按商户/店铺异步构建 Kafka 消息并发送)
- internal/biz/usecase/order.go:订单业务逻辑(按商户/店铺异步构建 Kafka 消息并发送)
- 数据层
- internal/data/store/database.go:数据库连接初始化
- internal/data/client/kafkax.go:Kafka 客户端按需创建与复用
- 配置与集成
- go.mod:依赖模块(bi-common、Kratos、Wire、Kafka 等)
- APITYPES_INTEGRATION.md:统一 HTTP 响应格式与错误码管理
图表来源
章节来源
核心组件
- 店铺服务(ShopService)
- TaobaoGetUserInfo:获取淘宝商铺用户信息,调用 usecase 层并返回标准化响应
- 商品服务(GoodsService)
- TaobaoItemSellerGet:获取单个商品详情
- TaobaoItemsOnsale:触发在售商品异步同步(批量构建 Kafka 消息)
- 订单服务(OrderService)
- TaobaoSyncFullOrder:触发全量订单异步同步(近三个月)
- TaobaoSyncIncrementOrder:触发增量订单异步同步(当日时间段)
- Kafka 消费处理器(ConsumerHandlers)
- HandleFullOrder:拉取全量订单,解析子订单与退款,转换为统一消息并写入 bi-basic gRPC 接口
- HandleIncrementOrder:拉取增量订单,解析子订单与退款,转换为统一消息并写入 bi-basic gRPC 接口
章节来源
架构总览
- 服务注册与配置
- 通过 Nacos 加载配置,启动 HTTP/gRPC 服务,注册服务发现
- 异步同步流程
- 服务层接收同步请求后,调用业务层
- 业务层按商户/店铺翻页获取授权信息,构建 Kafka 消息并发送
- Kafka 消费处理器拉取消息,调用乐客 API 获取订单/商品数据,解析并转换为统一消息,写入 bi-basic gRPC 接口
- 统一响应与错误码
- HTTP 服务器集成 apitypes 中间件,统一响应格式与错误码
图表来源
章节来源
详细组件分析
店铺服务(ShopService)
- 职责
- 对外提供获取淘宝用户信息的接口
- 调用 usecase 层获取数据并返回标准化响应
- 关键流程
- 记录请求日志
- 调用 leke_helper 客户端
- 校验响应状态
- 保存请求日志
- 构造并返回响应
图表来源
章节来源
商品服务(GoodsService)
- 职责
- 获取单个商品详情
- 触发在售商品异步同步(批量构建 Kafka 消息)
- 关键流程
- 获取授权详情(分页)
- 校验 DataKey
- 计算总页数并构建消息批次
- 发送到 Kafka(topic: items-onsale)
图表来源
章节来源
订单服务(OrderService)
- 职责
- 触发全量订单同步(近三个月)
- 触发增量订单同步(当日时间段)
- 关键流程
- 获取授权详情(分页)
- 校验 DataKey
- 计算总页数并构建消息批次
- 发送到 Kafka(topic: full-order-sync / increment-order-sync)
图表来源
章节来源
Kafka 消费处理器(ConsumerHandlers)
- 职责
- 处理全量/增量订单消息
- 调用乐客 API 获取订单详情与退款信息
- 转换为统一消息并写入 bi-basic gRPC 接口
- 关键流程
- 解析消息、校验 DataKey
- 调用订单/商品 API 获取数据
- 解析子订单与退款
- 转换为统一消息并批量发送
图表来源
章节来源
数据层与客户端
- 数据库
- 通过 gormx 初始化数据库连接,提供 DB 客户端
- Kafka 客户端
- 按需创建 Producer/Consumer,自动复用
- 支持压缩、分区键、清理函数
章节来源
依赖分析
更新 移除了对 bi-proto 模块的依赖,采用直接 gRPC 通信方式
- 外部依赖
- bi-common:公共组件(Nacos、GORM、Redis、Kafka、日志、指标)
- Kratos:框架
- Wire:依赖注入
- SegmentIO Kafka:消息队列
- 内部模块
- bi-api-leke 内部模块依赖 bi-common
- 服务间通信
- 直接通过 gRPC 客户端调用 bi-basic 服务的 ShopAuthService
- 使用服务发现机制进行服务定位
图表来源
章节来源
性能考虑
- 并发控制
- 商品/订单同步使用信号量限制并发(最大并发 10),避免对上游 API 与 Kafka 造成压力
- 批量处理
- 构建消息时按页计算总页数,分批发送 Kafka,减少网络开销
- Kafka 生产者复用
- Kafkax 按主题缓存 Producer,避免重复创建带来的性能损耗
- 超时与重试
- gRPC 调用使用独立上下文,避免受 HTTP 请求上下文影响;建议在上层增加指数退避重试策略(参考 bi-common 的重试组件)
章节来源
故障排查指南
- HTTP 统一响应与错误码
- 所有 HTTP 响应统一为 {code,message,data},HTTP 状态码固定为 200
- 通过 code 字段判断成功/失败,便于前端与监控系统识别
- 请求日志
- 业务层在调用外部 API 后记录请求日志(请求参数、响应参数、处理耗时、错误信息),便于问题定位
- 常见问题
- DataKey 校验失败:检查 Leke 配置与授权信息
- Kafka 发送失败:检查 Broker 地址、主题权限与压缩配置
- gRPC 调用超时:检查 bi-basic 服务可用性与网络连通性
- 建议
- 结合 OpenTelemetry 与指标系统进行链路追踪与告警
- 对 Kafka 发送失败的消息进行死信队列或延迟重试
章节来源
结论
bi-api-leke 通过清晰的分层架构与 Kafka 异步处理,实现了对乐客(淘宝)订单、商品、退款数据的高效同步。服务层提供简洁的 API,业务层负责与上游系统交互与消息构建,消费处理器完成数据转换与写入下游系统。移除 bi-proto 依赖后,采用直接 gRPC 通信方式,简化了服务间调用,提升了系统的灵活性与可维护性。配合统一响应格式与请求日志,整体具备良好的可观测性与可维护性。
附录
- 快速开始
- 环境要求:Go 1.22+、MySQL 8.0+、Redis 6.0+、Kafka 2.8+、Nacos 2.0+
- 配置文件:configs/application-{dev,test,prod}.yaml
- 运行方式:./bin/app -env
- API 端口
- HTTP: 8000
- gRPC: 9000
- 部署建议
- 使用 Kubernetes 部署,配置 HPA/PDB 保障弹性与高可用
- 为 Kafka 消费者设置合理的并发与分区策略
- 结合 Nacos 进行配置热更新与灰度发布
章节来源