实时数仓与维表时态Join实战
下单用户是新客还是老客用户当前的等级、城市、渠道商品所属品类、类目层级这些信息通常存放在维度表维表中例如 MySQL 的dim_user、dim_product等。我们希望在实时计算时能把「事实流」和「维表」在时间维度上正确地关联起来构建一张带有完整业务属性的明细宽表。这就是维表时态 JoinTemporal Table Join要解决的问题。本文我们就以「订单事实流 用户维表」为例完成一个从 Kafka 到 MySQL 的简易实时数仓 Demo并重点理解 Flink SQL 中维表时态 Join 的语法和注意事项。一、业务场景与数仓目标设想一个简化的电商业务场景Kafka 中有实时写入的orders订单事实流MySQL 中维护一张dim_user用户维表包含用户等级、所属城市、注册渠道等信息我们想要在 Flink 中构建一张「订单明细宽表」字段大致包括订单信息订单号、下单用户、下单金额、下单时间用户属性用户昵称、等级、城市、注册渠道并且要求当我们回看 10 分钟前的某条订单时看到的是当时用户的等级和城市而不是被后续变更“冲掉”的最新值这正是时态 Join和「实时数仓」的关键按事件发生时刻回放维度视图。二、环境前提与依赖准备1. 基础组件本篇默认你已经完成前几篇中的环境准备Flink 1.20.1WSL2 Ubuntu 下部署Kafka 集群已启动且能正常写入 / 读取 TopicFlink SQL Client 可以正常连接集群在此基础上我们还需要一套可访问的 MySQL本地或远程均可Flink 的 JDBC Connector JAR 包2. 安装 Flink JDBC Connector和 Kafka 一样JDBC 连接器也需要以 JAR 包形式放到 Flink 的lib目录中。以 Flink 1.20.x 对应的flink-connector-jdbc为例确认 Flink 安装目录假设为/opt/flinkexport FLINK_HOME/opt/flink下载 JDBC Connector JAR 到 Flink 的lib目录cd $FLINK_HOME/lib wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.3.0-1.20/flink-connector-jdbc-3.3.0-1.20.jar如果你使用的是独立集群或远程集群需要重启 Flink 集群让新 JAR 在 JobManager/TaskManager 上生效cd $FLINK_HOME bin/stop-cluster.sh bin/start-cluster.sh重启 Flink SQL Client使用新 Connectorcd $FLINK_HOME bin/sql-client.sh如果你在 Windows WSL2 上部署只需在 WSL2 内执行上述命令即可或者手动下载 JAR 后拷贝到lib目录步骤完全一致。三、准备 MySQL 用户维度表 dim_user首先在 MySQL 中准备一张简单的用户维度表用来存用户的基础属性。在 MySQL 中执行CREATE DATABASE IF NOT EXISTS realtime_dwh; USE realtime_dwh; CREATE TABLE dim_user ( user_id VARCHAR(32) PRIMARY KEY, user_name VARCHAR(64), user_level VARCHAR(16), city VARCHAR(64), register_time DATETIME ); INSERT INTO dim_user (user_id, user_name, user_level, city, register_time) VALUES (u_1, 张三, VIP1, 北京, 2025-12-01 10:00:00), (u_2, 李四, VIP2, 上海, 2025-12-05 11:00:00), (u_3, 王五, VIP1, 广州, 2025-12-10 12:00:00);为了演示「时态」效果你可以在后续实验中手动更新某个用户的等级或城市例如UPDATE dim_user SET user_level VIP3 WHERE user_id u_2;这样我们在 Flink 里做时态 Join 时就能观察“变更前后”的区别。四、在 Flink 中注册事实流与维表接下来回到 Flink SQL Client把 Kafka 中的订单事实流和 MySQL 中的维表都注册成 Flink 表。1. Kafka 订单事实表 orders和上一篇双流 JOIN 类似我们假设 Kafka 中有一个ordersTopic写入订单事实数据。在 Flink SQL Client 中执行CREATE TABLE orders ( order_id STRING, user_id STRING, order_amount DECIMAL(10, 2), order_time TIMESTAMP_LTZ(3), WATERMARK FOR order_time AS order_time - INTERVAL 5 SECOND, proc_time AS PROCTIME() ) WITH ( connector kafka, topic orders, properties.bootstrap.servers 127.0.0.1:9092, properties.group.id flink-orders-dim, scan.startup.mode earliest-offset, format json, json.timestamp-format.standard ISO-8601 );你可以沿用上一篇中 Kafka 造数的方式用kafka-console-producer.sh发送 JSON 订单数据只需要保证字段名一致。2. MySQL 用户维表 dim_userJDBC Lookup 表然后把刚才在 MySQL 中建好的dim_user注册为 Flink 的 JDBC 表CREATE TABLE dim_user ( user_id STRING, user_name STRING, user_level STRING, city STRING, register_time TIMESTAMP(3), PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( connector jdbc, url jdbc:mysql://127.0.0.1:3306/realtime_dwh, table-name dim_user, driver com.mysql.cj.jdbc.Driver, username root, password 1qazWSX );注意几点PRIMARY KEY (user_id) NOT ENFORCED告诉 Flink 这是一张以user_id为主键的表是做时态 Join 的前提这里使用的是典型的 JDBC Lookup 模式Flink 会在 Join 时按需去 MySQL 查维度信息在生产环境中你可以把 MySQL 作为维度存储或者通过 CDC 把维表变更同步到 Kafka构造成 changelog 流这些都可以和 Temporal Join 结合使用。五、维表时态 Join把订单打上用户维度有了订单事实表orders和维度表dim_user就可以通过时态 Join 来构建订单明细宽表。1. 基础时态 Join 语法Flink SQL 中的 Temporal Table Join 对于 JDBC 这类外部维表通常采用「处理时间Processing Time」语义来做 Lookup Join典型写法如下SELECT o.order_id, o.user_id, d.user_name, d.user_level, d.city, o.order_amount, o.order_time FROM orders AS o LEFT JOIN dim_user FOR SYSTEM_TIME AS OF o.proc_time AS d ON o.user_id d.user_id;这里有几个关键点proc_time AS PROCTIME()是在orders上定义的处理时间字段FOR SYSTEM_TIME AS OF o.proc_time表示“以 Flink 处理这条订单记录的当前时间去查维表的一个快照”这是 JDBC Lookup 支持的典型用法Join 条件依然是user_id等值关联使用LEFT JOIN可以保留找不到维度的订单并用空值来表示“维度缺失”在 SQL Client 中执行这段查询会看到实时流式刷新的结果每一行订单都带上了对应的用户属性。2. 验证时态效果修改维表再观察 Join为了验证这是“时态 Join”而不是“始终查最新维度”可以按下面步骤操作先往 Kafka 的ordersTopic 写入几条订单数据例如用户u_2下单的记录观察 Flink SQL 中 Join 后的结果此时u_2的等级是VIP2回到 MySQL执行UPDATE dim_user SET user_level VIP3 WHERE user_id u_2;再写入一批新的订单仍然是用户u_2bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic orders在命令行中输入一条 JSON 数据按回车发送一条{order_id:o_3,user_id:u_2,order_amount:200.00,order_time:2026-02-19T14:42:00Z}这时你会看到变更前的订单维度字段仍然显示VIP2变更后的订单维度字段变成了VIP3这就说明 Flink 的时态 Join 确实是“按订单发生时刻去回放维度视图”的而不是简单查当前最新值。六、把结果写回 Kafka 或 MySQL形成实时数仓明细层在真实项目中我们不会只在 SQL Client 里SELECT一下就结束而是要把 Join 后的订单明细宽表写回到下游存储形成实时数仓的一个层级。例如可以把结果写回 Kafka作为 DWD 层的订单宽表CREATE TABLE dwd_order_user_wide ( order_id STRING, user_id STRING, user_name STRING, user_level STRING, city STRING, order_amount DECIMAL(10, 2), order_time TIMESTAMP_LTZ(3), WATERMARK FOR order_time AS order_time - INTERVAL 5 SECOND ) WITH ( connector kafka, topic dwd_order_user_wide, properties.bootstrap.servers 127.0.0.1:9092, properties.group.id flink-dwd-order-wide, scan.startup.mode earliest-offset, format json, json.timestamp-format.standard ISO-8601 ); INSERT INTO dwd_order_user_wide SELECT o.order_id, o.user_id, d.user_name, d.user_level, d.city, o.order_amount, o.order_time FROM orders AS o LEFT JOIN dim_user FOR SYSTEM_TIME AS OF o.proc_time AS d ON o.user_id d.user_id;这样下游的实时应用或 BI 查询就可以直接订阅dwd_order_user_wide这个 Topic拿到已经打好用户标签的订单明细数据。你也可以把结果同步到 MySQL、ClickHouse 等分析型数据库中构建实时明细表为报表和可视化提供数据。七、小结与下一步建议通过这篇文章我们完成了这样一件事在 Kafka 中维护订单事实流orders在 MySQL 中维护用户维度表dim_user使用 Flink SQL 的 JDBC Connector 把 MySQL 注册为维表利用FOR SYSTEM_TIME AS OF语法做维表时态 Join将 Join 结果写回 Kafka形成实时数仓中的一张订单明细宽表这背后有几个非常重要的实时数仓设计理念事实流是不断追加的事件序列维表是相对缓慢变更的业务视图时态 Join 让你能够“按事件发生的时间点”回看当时的维度快照实时数仓的 DWD 层往往就是「事实表 多个维表时态 Join」后形成的明细宽表在后续的文章中我们可以继续沿着这个方向深入在一个任务里同时关联多张维表构建更宽的明细表引入 CDC把维表变更实时同步到 Kafka再在 Flink 中构建 changelog 维表把实时数仓的明细层、汇总层DWS、指标主题层ADS串起来做一个端到端的实时数仓小项目如果你已经跑通了本文的 Demo不妨试着自己设计一张商品维表dim_product再给订单打上商品品类维度体验一下“事实 多维表时态 Join”在 Flink SQL 里的完整味道。