Skip to content

bi-api-leke 乐客数据同步

**本文引用的文件** - [[[[[README.md]]]]](../file/bi-api-leke/readme.md) - [[[[[go.mod]]]]](../file/bi-api-leke/go.mod) - [[[[[main.go]]]]](../file/bi-api-leke/cmd/bi-api-leke/main.go) - [[[[[APITYPES_INTEGRATION.md]]]]](../file/bi-api-leke/apitypes-integration.md) - [[[[[shop.go]]]]](../file/bi-api-leke/internal/service/shop.go) - [[[[[goods.go]]]]](../file/bi-api-leke/internal/service/goods.go) - [[[[[order.go]]]]](../file/bi-api-leke/internal/service/order.go) - [[[[[client.go]]]]](../file/bi-api-leke/internal/data/client/client.go) - [[[[[database.go]]]]](../file/bi-api-leke/internal/data/store/database.go) - [[[[[shop.go]]]]](../file/bi-api-leke/internal/biz/usecase/shop.go) - [[[[[goods.go]]]]](../file/bi-api-leke/internal/biz/usecase/goods.go) - [[[[[order.go]]]]](../file/bi-api-leke/internal/biz/usecase/order.go) - [[[[[kafkax.go]]]]](../file/bi-api-leke/internal/data/client/kafkax.go) - [[[[[order.go]]]]](../file/bi-api-leke/internal/service/consumer-handlers/order.go) - [[[[[grpc_client.go]]]]](../file/bi-api-leke/internal/data/client/grpc-client.go)

更新摘要

所做更改

  • 更新了依赖分析部分,反映移除了对 bi-proto 模块的依赖
  • 新增了服务间通信机制的说明
  • 更新了架构图以体现新的通信方式

目录

  1. 简介
  2. 项目结构
  3. 核心组件
  4. 架构总览
  5. 详细组件分析
  6. 依赖分析
  7. 性能考虑
  8. 故障排查指南
  9. 结论
  10. 附录

简介

bi-api-leke 是基于 Kratos 框架的乐客(Taobao)数据同步服务,负责对接淘宝商家数据(订单、商品、退款等),并通过 Kafka 实现异步数据同步。项目采用分层架构(服务层、业务层、数据层),结合 gRPC/HTTP 提供 API,使用 Wire 进行依赖注入,并通过 Nacos 进行配置管理。

项目结构

  • 应用入口与依赖注入
    • cmd/bi-api-leke/main.go:应用启动、配置加载、服务注册、Kafka 消费者管理
    • internal/data/client/client.go:客户端提供者集合(Kafka、gRPC、StreamLoad、Kafka 生产者工厂)
  • 服务层
    • internal/service/shop.go:店铺信息服务(获取淘宝用户信息)
    • internal/service/goods.go:商品信息服务(商品详情、在售商品同步)
    • internal/service/order.go:订单信息服务(全量/增量订单同步)
    • internal/service/consumer_handlers/order.go:Kafka 订单消费处理器(全量/增量/退款)
  • 业务层
    • internal/biz/usecase/shop.go:店铺业务逻辑(调用 leke_helper、记录请求日志)
    • internal/biz/usecase/goods.go:商品业务逻辑(按商户/店铺异步构建 Kafka 消息并发送)
    • internal/biz/usecase/order.go:订单业务逻辑(按商户/店铺异步构建 Kafka 消息并发送)
  • 数据层
    • internal/data/store/database.go:数据库连接初始化
    • internal/data/client/kafkax.go:Kafka 客户端按需创建与复用
  • 配置与集成
    • go.mod:依赖模块(bi-common、Kratos、Wire、Kafka 等)
    • APITYPES_INTEGRATION.md:统一 HTTP 响应格式与错误码管理

图表来源

章节来源

核心组件

  • 店铺服务(ShopService)
    • TaobaoGetUserInfo:获取淘宝商铺用户信息,调用 usecase 层并返回标准化响应
  • 商品服务(GoodsService)
    • TaobaoItemSellerGet:获取单个商品详情
    • TaobaoItemsOnsale:触发在售商品异步同步(批量构建 Kafka 消息)
  • 订单服务(OrderService)
    • TaobaoSyncFullOrder:触发全量订单异步同步(近三个月)
    • TaobaoSyncIncrementOrder:触发增量订单异步同步(当日时间段)
  • Kafka 消费处理器(ConsumerHandlers)
    • HandleFullOrder:拉取全量订单,解析子订单与退款,转换为统一消息并写入 bi-basic gRPC 接口
    • HandleIncrementOrder:拉取增量订单,解析子订单与退款,转换为统一消息并写入 bi-basic gRPC 接口

章节来源

架构总览

  • 服务注册与配置
    • 通过 Nacos 加载配置,启动 HTTP/gRPC 服务,注册服务发现
  • 异步同步流程
    • 服务层接收同步请求后,调用业务层
    • 业务层按商户/店铺翻页获取授权信息,构建 Kafka 消息并发送
    • Kafka 消费处理器拉取消息,调用乐客 API 获取订单/商品数据,解析并转换为统一消息,写入 bi-basic gRPC 接口
  • 统一响应与错误码
    • HTTP 服务器集成 apitypes 中间件,统一响应格式与错误码

图表来源

章节来源

详细组件分析

店铺服务(ShopService)

  • 职责
    • 对外提供获取淘宝用户信息的接口
    • 调用 usecase 层获取数据并返回标准化响应
  • 关键流程
    • 记录请求日志
    • 调用 leke_helper 客户端
    • 校验响应状态
    • 保存请求日志
    • 构造并返回响应

图表来源

章节来源

商品服务(GoodsService)

  • 职责
    • 获取单个商品详情
    • 触发在售商品异步同步(批量构建 Kafka 消息)
  • 关键流程
    • 获取授权详情(分页)
    • 校验 DataKey
    • 计算总页数并构建消息批次
    • 发送到 Kafka(topic: items-onsale)

图表来源

章节来源

订单服务(OrderService)

  • 职责
    • 触发全量订单同步(近三个月)
    • 触发增量订单同步(当日时间段)
  • 关键流程
    • 获取授权详情(分页)
    • 校验 DataKey
    • 计算总页数并构建消息批次
    • 发送到 Kafka(topic: full-order-sync / increment-order-sync)

图表来源

章节来源

Kafka 消费处理器(ConsumerHandlers)

  • 职责
    • 处理全量/增量订单消息
    • 调用乐客 API 获取订单详情与退款信息
    • 转换为统一消息并写入 bi-basic gRPC 接口
  • 关键流程
    • 解析消息、校验 DataKey
    • 调用订单/商品 API 获取数据
    • 解析子订单与退款
    • 转换为统一消息并批量发送

图表来源

章节来源

数据层与客户端

  • 数据库
    • 通过 gormx 初始化数据库连接,提供 DB 客户端
  • Kafka 客户端
    • 按需创建 Producer/Consumer,自动复用
    • 支持压缩、分区键、清理函数

章节来源

依赖分析

更新 移除了对 bi-proto 模块的依赖,采用直接 gRPC 通信方式

  • 外部依赖
    • bi-common:公共组件(Nacos、GORM、Redis、Kafka、日志、指标)
    • Kratos:框架
    • Wire:依赖注入
    • SegmentIO Kafka:消息队列
  • 内部模块
    • bi-api-leke 内部模块依赖 bi-common
  • 服务间通信
    • 直接通过 gRPC 客户端调用 bi-basic 服务的 ShopAuthService
    • 使用服务发现机制进行服务定位

图表来源

章节来源

性能考虑

  • 并发控制
    • 商品/订单同步使用信号量限制并发(最大并发 10),避免对上游 API 与 Kafka 造成压力
  • 批量处理
    • 构建消息时按页计算总页数,分批发送 Kafka,减少网络开销
  • Kafka 生产者复用
    • Kafkax 按主题缓存 Producer,避免重复创建带来的性能损耗
  • 超时与重试
    • gRPC 调用使用独立上下文,避免受 HTTP 请求上下文影响;建议在上层增加指数退避重试策略(参考 bi-common 的重试组件)

章节来源

故障排查指南

  • HTTP 统一响应与错误码
    • 所有 HTTP 响应统一为 {code,message,data},HTTP 状态码固定为 200
    • 通过 code 字段判断成功/失败,便于前端与监控系统识别
  • 请求日志
    • 业务层在调用外部 API 后记录请求日志(请求参数、响应参数、处理耗时、错误信息),便于问题定位
  • 常见问题
    • DataKey 校验失败:检查 Leke 配置与授权信息
    • Kafka 发送失败:检查 Broker 地址、主题权限与压缩配置
    • gRPC 调用超时:检查 bi-basic 服务可用性与网络连通性
  • 建议
    • 结合 OpenTelemetry 与指标系统进行链路追踪与告警
    • 对 Kafka 发送失败的消息进行死信队列或延迟重试

章节来源

结论

bi-api-leke 通过清晰的分层架构与 Kafka 异步处理,实现了对乐客(淘宝)订单、商品、退款数据的高效同步。服务层提供简洁的 API,业务层负责与上游系统交互与消息构建,消费处理器完成数据转换与写入下游系统。移除 bi-proto 依赖后,采用直接 gRPC 通信方式,简化了服务间调用,提升了系统的灵活性与可维护性。配合统一响应格式与请求日志,整体具备良好的可观测性与可维护性。

附录

  • 快速开始
    • 环境要求:Go 1.22+、MySQL 8.0+、Redis 6.0+、Kafka 2.8+、Nacos 2.0+
    • 配置文件:configs/application-{dev,test,prod}.yaml
    • 运行方式:./bin/app -env
  • API 端口
    • HTTP: 8000
    • gRPC: 9000
  • 部署建议
    • 使用 Kubernetes 部署,配置 HPA/PDB 保障弹性与高可用
    • 为 Kafka 消费者设置合理的并发与分区策略
    • 结合 Nacos 进行配置热更新与灰度发布

章节来源