跳到主要内容

项目三:商业数据

实时数据流

前端埋点(浏览器)
HTTP POST / Image beacon / JS SDK
网关服务 / 伏羲日志接收服务(如 Nginx + Lua、Node proxy、Agent 等)

Kafka(标准埋点入口,格式通常为 JSON / CSV

Flink / Spark Streaming 等(处理、清洗、聚合、分流)

落地存储(ClickHouse / Elasticsearch / HDFS / Hive / OLAP
  1. 数据源头: 前端埋点 SDK 上报数据;
  2. 数据采集: 伏羲日志平台,统一采集埋点数据;
    • 伏羲发送 Kafka 消息,但不是直接原始 JSON,转为了 CSV 格式的 Kafka 消息;
      • 节省 Kafka 存储和贷款开销:CSV 更紧凑,字节数比 JSON 小很多(不带字段名);
      • Kafka 设计为高吞吐顺序写入的消息系统,使用结构扁平、紧凑的格式可以极大减少 I/O 和网络消耗
  3. 数据处理: Flink 数据处理,订阅 Kafka 消息,CSV → JSON 格式化处理,处理后再次发送 Kafka 消息;
    • 上传 jar 包,在星河实时计算平台,运行这个 Flink 任务。
    • 实时任务,使用 Java 编写的 Flink Job,从 Kafka topic 中消费数据;
    • 将 CSV 数据解析为结构化对象,转换为 JSON 格式字符串;
  4. 数据转发: Flink 处理数据后,通过 Flink 的 KafkaSink 将 JSON 写入到另一个 Kafka topic;
  5. 数据入库: 星河平台,支持 Kafka 数据接入服务,自动同步到 ClickHouse;
    • 配置 Kafka topic → ClickHouse 表的字段映射;
    • JSON 格式数据就会被实时落地到 ClickHouse 表中;
    • ClickHouse 是通过 ClickHouse Kafka Engine 存储数据的;

如果是落到 es 数据库中:

  1. 数据源头、数据采集:流程不变
  2. 数据入库:无需 Flink 做数据格式处理,直觉通过 Logstash 入库操作,格式化 CSV。

数据库对比

特性\数据库ClickHouseMySQLMongoDBHiveElasticsearchRedis
类型列式存储,OLAP行式存储,OLTP文档数据库(NoSQL)数据仓库(基于 HDFS)分布式搜索引擎内存 KV 缓存数据库
存储模型列存行存BSON 文档HDFS 表(Hive 表)倒排索引Key-Value
查询语言SQL-likeSQLMongo Query LanguageHiveQL (SQL-like)DSL / SQL自有命令/模块支持
实时性高(近实时)低(批处理)高(近实时)极高(毫秒级)
查询性能高(聚合类优秀)高(点查、范围查)中(聚合性能差)高(全文检索强)高(KV 查找极快)
索引机制稀疏索引 + MergingB+Tree二级索引无(依赖 HDFS)倒排索引哈希表 / 跳表
事务支持基本不支持支持 ACID 事务支持事务(4.0+)不支持事务不支持(最终一致性)支持部分事务(Lua)
应用场景数据分析报表传统 Web 应用JSON 存储、日志离线大数据分析搜索、日志、监控缓存、排行榜、限流等

数据源

(1)后端订单数据

  1. 数据产生: 前端调用 order/buy 下单统一接口,下单订单信息同步至商业后端;
  2. 实时转发: 商业后端扣费成功,订单信息通过 kafka 同步至数仓;
  3. 数据推送: 数仓信息同步后,通过 kafka 再次发送数据;
  4. 数据入库:
    1. 离线:每日 8.30 定时任务,从云窗获取昨日订单数据:
      • 服务器调云窗 API,将查询 sql 传递给云窗,云窗在查询到结果后返回数据。
    2. 实时:商业产品管理平台订阅数据,落表到 mongodb;

(2)商业产品 source 数据

  1. 数据产生: 数据源是各商业产品的页面曝光埋点;
  2. 数据获取:
    1. 每日 10 点定时任务,服务器拿着所有项目的曝光埋点 id,去查这些曝光埋点的昨日数据;
    2. 获得埋点数据后,匹配本地数据库,是否有新增 source;
    3. 如果有新增,则登记该 source;

(3)服务器日志数据

  1. 数据源头: 性能 / 异常 sdk 上报数据;
  2. 数据保存: Node.js 服务器中,使用 bunyan 将数据保存在本地日志文件中;
  3. 日志收割: 接入公司 Kafka 日志实时采集服务,将日志收割并实时转发;
  4. 数据入库: 数据实时落库
    • 云 DB 系统:Kafka 数据接入,通过 Logstash 落库到 es 中;
    • 星河:Kafka 数据接入,落库到 ck 中;

Kafka 使用结构

Topic(主题):用于组织消息流。

  • name:主题名,唯一;
  • partitions: 分区数,决定并发度和吞吐;
    • 每个 partitions 独立维护一个 offset;
  • retention_ms:数据保留时间(3 天);

Producer(生产者):用于发送消息的客户端;

  • bootstrap.servers:Kafka 集群地址列表
  • 其他:压缩方法、失败重试次数、发送前等待时间、批量发送时大小

Consumer(消费者):用于从 Kafka 消费消息的客户端

  • group_id:消费组 ID,用于协调分区和 offset 管理;
    • Kafka 将 offset 与 group_id + topic + partition 关联
  • offset:新消费者如何选择 offset 起点;
  • fetch_min_bytes:拉取的最小数据量;
  • max_poll_records:每次 poll 拉取的最大消息数;

如果两个消费者使用不同的 group_id:

  • 每个 group 会独立维护自己的 offset
  • 同一个 partition 中的数据会分别提供给两个组
  • A 消费了不会影响 B,两者都能完整消费所有数据。

如果两个消费者的 group_id 相同:

  • 一个 partition 只能被 group 内的 一个 consumer 消费;
  • 多个消费者之间是竞争关系,不是顺序;

Kafka 的消息删除机制

Kafka 删除消息是基于 topic 的 保留策略(retention policy),而不是是否被消费。 不关心数据是否被所有 group 消费完,它只关心消息在 topic 中是否达到了保留时间或大小的阈值。

  • Kafka 是日志系统,不是 MQ(严格意义的消息队列),数据存在磁盘上是“追加日志”,不做消费者的强依赖管理
  • 方法一 默认策略:按照时间、按照大小到达阈值,自动清理旧数据;
  • 方法二 日志压缩模式:滚动更新,只保留固定量的日志总数;

权限管理

RBAC(基于角色的访问控制)权限管理系统

  1. 数据模型设计
    • User:用户实体、Role:角色实体、Permission:权限实体
    • 用户和角色是多对多关系,角色和权限也是多对多关系
  2. 核心组件
    • PermissionModule:权限模块,注册相关实体和服务
    • PermissionGuard:权限守卫,负责权限验证
    • PermissionService:提供权限管理相关功能
  3. 基础内容:
    • 使用 session 存储用户认证信息;
    • 需要验证的接口增加装饰器:@SetMetadata('grant-permission', ['权限名称'])
    • PermissionGuard 守卫在请求处理前进行权限验证:
      1. 获取路由上标记的权限名称,获取 session 中的用户信息;
      2. 根据用户信息,查询所属角色,根据角色,查询具体权限;
      3. 匹配当前接口权限是否拥有,拥有则继续执行接口逻辑,否则返回无权限。

还有一个 ACL(基于访问控制表)

  • 相比角色控制,相当于直接对每个用户定义可以访问的接口和操作权限,少了角色层。
  • 缺点:角色可批量管理,减少接入成本;人员变动时,仅需调整角色即可。

登录/鉴权

登录服务通常使用全局中间件实现,它会在服务端接口到前额请求后,进行统一的入口处理。

JWT Token

JWT 是在用户登录成功后,服务器签发给客户端的一段加密字符串,用来验证用户的登录身份

使用流程:

  1. 服务器生成 JWT 后发给前端,前端将其存储(通常放在 localStorage 或 cookie 中);
  2. 每次请求时,前端带上 JWT(HTTP Header:Authorization: Bearer <token>);
  3. 后端通过验证 JWT 来确认用户身份,无需在服务端维护 session 状态(无状态);

组成结构 'xxxx.yyyy.zzzz',分别为:

  • Header(头部):声明类型(JWT)和算法(如 HS256);
  • Payload(载荷):存放用户身份信息(如用户 ID、过期时间等) ;
  • Signature(签名):对前两部分进行加密签名,防止篡改;

优点:

  1. 无需服务器保存会话信息,无状态,适合分布式架构(无需不同服务器间分享状态信息);
  2. 格式标准、跨语言支持好,可以结合 Refresh Token 实现长期登录;

缺点:

  1. 无法主动失效。无状态令牌,一旦签发就不再受控。如果用户退出登录,Token 依然有效;
  2. Token 泄露,攻击者可伪装用户登录,所以 exp 过期设置需要较短(15min);
  3. Payload 可解码读取,不适合存储敏感数据。

Session

服务端的“会话状态管理机制”,在服务端记录前端用户的登录状态。

缺点:

  1. 对于分布式的多服务器,需要存储在 Redis 等共享内存中;
  2. 保持共享状态,占用存储空间。

CAS 单点登录

CAS 是一个开源的单点登录协议(Single Sign-On,简称 SSO),一个用户登录一次,就可以访问多个受信任的系统,无需重复登录。

  • 统一认证中心(CAS Server):企业提供统一的,负责验证用户身份的服务器;

SSO 单点登录仅提供了多点登录服务。具体服务器和前端的登录态校验,依然是服务器自己的 Session 或 JWT 维护。

使用流程:

  1. 假设有两个系统:A 系统 和 B 系统,接入了 CAS;
    1. 用户访问系统 A,系统 A 发现用户未登录 → 重定向用户到 CAS Server。
    2. CAS Server 让用户登录(账号密码)
    3. 用户登录成功,CAS Server:
      • 设置一个 TGC(Ticket Granting Cookie) 到浏览器;
      • 并返回一个一次性使用的票据 ST(Service Ticket);
      • 自动跳转回系统 A;
    4. 系统 A 收到 ST,拿去问 CAS Server 合法性:“这个票据是不是你发的?对应的是谁?”
      • CAS Server 验证 ST 合法 → 告诉系统 A 用户是谁
    5. 系统 A 登录成功
      • 系统 A 给用户返回一个自己服务器简历的 Session 或 JWT,保持后续登录态;
  2. 当用户访问系统 B,
    1. 系统 B 发现用户未登录 → 重定用户到 CAS Server。
      • 此时浏览器自动携带 CAS 的 Cookie(TGC);
      • CAS Server 发现已经登录 → 直接给系统 B 一个新的 ST → 跳转回系统 B
    2. 系统 B 收到 ST,和刚才一样去验证合法性;
    3. 系统 B 登录成功
      • 系统 B 给用户返回一个自己服务器简历的 Session 或 JWT,保持后续登录态;

MCP 登录流程

第三方客户端(cursor等)没有浏览器跳转能力,需要先在浏览器生成token:

  1. MCP Server 暴露一个登录入口页面;
  2. 用户根据页面进行公司CAS单点登录,获取本服务 JWT Token;
  3. 用户将获取到的访问令牌,粘贴到 Cursor 中;
  4. 客户端之后请求本 MCP server,携带 Authorization: Bearer