bi-api-jushuitan 聚水潭数据同步
**本文引用的文件** - [[go.mod]](../file/bi-api-jushuitan/go.mod) - [[main.go]](../file/bi-api-jushuitan/cmd/bi-api-jushuitan/main.go) - [[client.go]](../file/bi-api-jushuitan/common/sdkhelper/client.go) - [[types.go]](../file/bi-api-jushuitan/common/sdkhelper/types.go) - [[jushuitan.go]](../file/bi-api-jushuitan/internal/data/client/jushuitan.go) - [[README.md(数据层)]](../file/bi-api-jushuitan/internal/data/readme.md) - [[README.md(服务层)]](../file/bi-api-jushuitan/internal/service/readme.md) - [[README.md(业务层)]](../file/bi-api-jushuitan/internal/biz/readme.md) - [[application-dev.yaml]](../file/bi-api-jushuitan/configs/application-dev.yaml) - [[auth.go]](../file/bi-api-jushuitan/internal/data/repo/auth.go) - [[inventory.go]](../file/bi-api-jushuitan/internal/data/repo/inventory.go) - [[inventory.go(消费者处理)]](../file/bi-api-jushuitan/internal/service/consumer-handlers/inventory.go) - [[grpc_client.go]](../file/bi-api-jushuitan/internal/data/client/grpc-client.go) - [[grpc.go]](../file/bi-api-jushuitan/internal/server/grpc.go) - [[http.go]](../file/bi-api-jushuitan/internal/server/http.go) - [[auth.go(服务层)]](../file/bi-api-jushuitan/internal/service/auth.go) - [[inventory.go(服务层)]](../file/bi-api-jushuitan/internal/service/inventory.go)
更新摘要
所做更改
- 更新了依赖分析部分,反映对 bi-proto 模块的依赖关系
- 新增了对 gRPC 客户端和服务端实现的详细分析
- 补充了服务层接口实现的具体内容
目录
简介
本项目为 bi-api-jushuitan 聚水潭数据同步服务,围绕聚水潭开放平台的仓储、采购、销售等业务数据,提供 SDK 集成、API 调用、消息队列异步处理与 StarRocks 写入的完整链路。系统支持实时同步、定时同步与手动触发三种模式,并通过令牌桶限流、并发控制与重试策略保障高可用与稳定性。
项目结构
- 二进制入口与依赖注入:cmd/bi-api-jushuitan/main.go 负责启动、读取 Nacos 配置、初始化日志、注册服务、启动 Kafka 消费者并运行 Kratos 服务。
- SDK 封装:common/sdkhelper 提供聚水潭开放平台 API 的统一客户端与签名、分页、错误类型等能力。
- 数据层:internal/data 负责外部客户端(Kafka、聚水潭 SDK、gRPC、StreamLoad)、仓库实现(授权、库存)、限流器与模型。
- 业务层:internal/biz 定义业务用例与仓库接口,采用依赖倒置由数据层实现。
- 服务层:internal/service 提供 gRPC/HTTP 接口与 Kafka 消费处理器。
- 配置:configs 下的 application-*.yaml 从 Nacos 加载配置。
图表来源
章节来源
核心组件
- SDK 客户端与请求封装:统一签名、请求与响应解析,提供分页与错误类型。
- 聚水潭客户端工厂:基于 Bootstrap 配置创建 SDK 客户端。
- 数据仓库(授权/库存):封装 gRPC 获取店铺授权、聚水潭店铺校验、库存查询与成本价合并、批量写入 StarRocks。
- 限流与并发控制:商家级令牌桶限流(100次/分钟)、并发许可(每商家最多5并发)。
- Kafka 消费者:按 Topic 消费库存任务,逐条处理并写入。
- gRPC 客户端:封装 bi-proto 定义的服务接口,支持 Kafka、异步处理和店铺授权服务。
- 日志与监控:基于 Kratos 日志与 Nacos 注册发现。
章节来源
- [client.go]
- [types.go]
- [jushuitan.go]
- [grpc_client.go]
- [auth.go]
- [inventory.go]
- [README.md(数据层)]
- [README.md(服务层)]
架构总览
系统采用"配置中心 + SDK + 数据层仓库 + 服务层接口/消费者 + 消息队列 + 存储"的分层架构。Kratos 作为运行时容器,负责服务注册与启停;Nacos 提供配置与服务发现;Kafka 作为异步处理通道;StarRocks 作为最终存储。
图表来源
详细组件分析
SDK 客户端与请求封装
- 统一配置项:AppKey、AppSecret、AccessToken、BaseURL、Timeout、Debug、HTTPClient、Logger。
- 请求流程:组装 biz 参数、生成签名、构造表单、发起 HTTP 请求、记录日志、解析响应。
- 响应与错误:统一 Response 结构,区分成功/失败;提供 APIError 类型与常见错误码常量。
- 分页与泛型:PageResult 支持任意数据类型分页返回。
图表来源
章节来源
聚水潭客户端工厂
- 依据 Bootstrap 配置创建 SDK 客户端,设置 AppKey/AppSecret/BaseURL/超时/日志。
- 若无配置则创建空客户端,便于本地开发或降级场景。
章节来源
gRPC 客户端集合
- 封装多个 bi-proto 服务的 gRPC 客户端:KafkaServiceClient、BiBaseAsyncClient、ShopAuthServiceClient。
- 支持服务发现、超时配置和中间件链。
- 提供统一的连接管理和客户端创建方法。
章节来源
授权仓库(AuthRepo)
- 通过 SDK 的 AuthClient 构建授权 URL,返回给前端进行 OAuth 授权。
- 当未配置 SDK 参数时,返回空 URL 以避免异常。
章节来源
库存仓库(InventoryRepo)
- 异步拉取:从 bi-basic 通过 gRPC 获取店铺授权列表,遍历每个商户/店铺,校验聚水潭店铺存在性后,批量向 Kafka 发送库存查询任务消息。
- 同步拉取:对指定商户与 SKU 列表进行同步查询,合并成本价,写入 StarRocks。
- 限流与并发:商家级令牌桶限流(100次/分钟),并发许可(每商家最多5并发)。
- 缓存:聚水潭店铺校验结果缓存 15 天,减少重复查询。
图表来源
章节来源
Kafka 消费者处理(库存)
- 解析消息:反序列化库存查询任务,设置 AccessToken。
- 限流与并发:商家级令牌桶限流(100次/分钟)与并发许可(每商家最多5并发)。
- 调用聚水潭接口:库存查询与成本价查询。
- 数据写入:将库存与成本价合并写入 StarRocks;同时通过 gRPC 异步更新 SKU 信息。
- 错误处理:解析失败、API 调用失败、写入失败均记录错误并可进入死信队列。
图表来源
章节来源
服务层接口实现
- gRPC 服务:InventoryService 和 AuthService 实现 bi-proto 定义的接口。
- HTTP 服务:提供 RESTful API 接口,支持 JSON 格式请求。
- 中间件:集成请求 ID、日志记录、错误恢复和链路追踪中间件。
- 服务注册:自动注册到 Kratos 服务器,支持网络配置和超时设置。
章节来源
业务层与依赖注入
- 业务层仅定义接口,实现由数据层完成,遵循依赖倒置原则。
- 通过 Wire ProviderSet 组织依赖注入,确保模块解耦与可测试性。
章节来源
依赖分析
- 运行时框架:Kratos v2,提供服务注册、HTTP/GRPC 服务器与配置加载。
- 消息队列:kafka-go,用于异步任务编排与解耦。
- 配置与注册:Nacos,提供配置中心与服务注册发现。
- 数据库:GORM + MySQL(间接依赖),StarRocks 通过 StreamLoad 写入。
- 工具库:automaxprocs、OpenTelemetry、Prometheus 等(间接依赖)。
- 接口定义:bi-proto 模块,提供跨服务通信的标准接口规范。
图表来源
章节来源
性能考虑
- 限流策略:商家级令牌桶限流(100次/分钟),避免聚水潭接口限频;并发许可(每商家最多5并发)控制瞬时压力。
- 批量处理:异步路径批量发送 Kafka 消息,减少网络往返;同步路径批量写入 StarRocks。
- 缓存优化:聚水潭店铺校验结果缓存 15 天,降低重复查询开销。
- 超时与重试:SDK 默认超时与 Kafka 生产者/消费者配置需结合实际环境调优;消费者具备指数退避重试与死信队列兜底。
- gRPC 优化:连接池复用、超时配置、中间件链优化,提升服务间通信效率。
故障排查指南
- 授权与鉴权
- 现象:获取授权 URL 为空或调用失败。
- 排查:确认 SDK 参数配置、授权页面 URL、网络连通性。
- 认证失败/签名错误
- 现象:返回认证错误或签名验证失败。
- 排查:核对 AppKey/AppSecret、时间戳、biz 参数、签名算法与顺序。
- 令牌过期/权限不足
- 现象:返回令牌过期或权限不足。
- 排查:重新获取 AccessToken 或申请相应 API 权限。
- 限流与并发
- 现象:请求被限流或并发许可获取失败。
- 排查:降低并发或提升限流阈值;检查 Redis 限流状态。
- Kafka 消费异常
- 现象:消息堆积、重复消费、写入失败。
- 排查:检查消费者组、Topic 分区、死信队列、重试策略与消费者日志。
- 写入失败
- 现象:StarRocks 导入失败。
- 排查:检查表结构、分区键、数据格式、网络连通性与磁盘空间。
- gRPC 通信问题
- 现象:服务调用超时或连接失败。
- 排查:检查服务发现配置、网络连通性、目标服务状态与日志。
章节来源
结论
本项目通过清晰的分层架构与完善的异步处理链路,实现了对聚水潭多类业务数据的稳定同步。SDK 封装、限流与并发控制、Kafka 消费与死信队列、以及与 StarRocks 的高效写入共同构成了高可用的数据同步体系。新增的 bi-proto 接口定义进一步规范了服务间的通信标准,提升了系统的可维护性和扩展性。建议在生产环境中结合监控指标持续优化限流阈值与重试策略,并完善告警与回滚机制。
附录
配置与部署要点
- 配置来源:Nacos,通过 application-*.yaml 指定配置项与命名空间。
- 服务注册:启用 Kratos Registrar 与 Nacos 注册中心。
- Kafka:消费者组与 Topic 需与服务侧配置一致,确保消息路由正确。
章节来源
同步模式与触发方式
- 实时同步:消费者从 Kafka 拉取消息后立即处理并写入。
- 定时同步:通过独立 CronJob 或调度器触发异步拉取任务。
- 手动触发:提供接口或命令行入口,按需拉取指定商户/店铺的库存数据。
章节来源
接口定义与使用
- bi-proto 接口:提供标准化的 gRPC 接口定义,支持跨服务通信。
- 自动代码生成:基于 proto 文件生成 Go 代码和桩代码。
- Swagger 文档:自动生成 OpenAPI v3 文档,便于接口测试与集成。
章节来源