Flink Table & SQL 时态表Temporal Table
举个栗子,假设你在Mysql中有两张表: browse_event、product_history_info。
browse_event: 事件表,某个用户在某个时刻浏览了某个商品,以及商品的价值。如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17SELECT * FROM browse_event;
+--------+---------------------+-----------+-----------+--------------+
| userID | eventTime | eventType | productID | productPrice |
+--------+---------------------+-----------+-----------+--------------+
| user_1 | 2016-01-01 00:00:00 | browse | product_5 | 20 |
| user_1 | 2016-01-01 00:00:01 | browse | product_5 | 20 |
| user_1 | 2016-01-01 00:00:02 | browse | product_5 | 20 |
| user_1 | 2016-01-01 00:00:03 | browse | product_5 | 20 |
| user_1 | 2016-01-01 00:00:04 | browse | product_5 | 20 |
| user_1 | 2016-01-01 00:00:05 | browse | product_5 | 20 |
| user_1 | 2016-01-01 00:00:06 | browse | product_5 | 20 |
| user_2 | 2016-01-01 00:00:01 | browse | product_3 | 20 |
| user_2 | 2016-01-01 00:00:02 | browse | product_3 | 20 |
| user_2 | 2016-01-01 00:00:05 | browse | product_3 | 20 |
| user_2 | 2016-01-01 00:00:06 | browse | product_3 | 20 |
+--------+---------------------+-----------+-----------+--------------+product_history_info:商品基础信息表,记录了商品历史以来的基础信息。如下:
1
2
3
4
5
6
7
8
9
10SELECT * FROM product_history_info;
+-----------+-------------+-----------------+---------------------+
| productID | productName | productCategory | updatedAt |
+-----------+-------------+-----------------+---------------------+
| product_5 | name50 | category50 | 2016-01-01 00:00:00 |
| product_5 | name52 | category52 | 2016-01-01 00:00:02 |
| product_5 | name55 | category55 | 2016-01-01 00:00:05 |
| product_3 | name32 | category32 | 2016-01-01 00:00:02 |
| product_3 | name35 | category35 | 2016-01-01 00:00:05 |
+-----------+-------------+-----------------+---------------------+
此刻,你想获取事件发生时,对应的最新的商品基础信息。可能需要借助以下SQL实现:
1 | SELECT l.userID, |
在Flink中,为了处理类似场景,从1.7开始,提出了时态表(即Temporal Table)的概念。Temporal Table可以简化和加速此类查询,并减少对状态的使用。Temporal Table是将一个Append-Only表(如上product_history_info)中追加的行,根据设置的主键和时间(如上productID、updatedAt),解释成Chanlog,并在特定时间提供数据的版本。
以下,在Flink中,实现如上逻辑,并总结在使用Flink Temporal Table时得注意点。
测试数据
自己造的测试数据,browse log和product history info,如下:
1 | // browse log |
示例
1 | package com.bigdata.flink.tableSqlTemporalTable; |
结果
在对应Kafka Topic中发送如上测试数据后,得到结果。
1 | // 可以看到,获取到了,事件发生时,对应的历史最新的商品基础信息 |
总结
在使用时态表(Temporal Table)时,要注意以下问题。
- Temporal Table可提供历史某个时间点上的数据。
- Temporal Table根据时间来跟踪版本。
- Temporal Table需要提供时间属性和主键。
- emporal Table一般和关键词
LATERAL TABLE
结合使用。 - Temporal Table在基于ProcessingTime时间属性处理时,每个主键只保存最新版本的数据。
- Temporal Table在基于EventTime时间属性处理时,每个主键保存从上个Watermark到当前系统时间的所有版本。
- 侧Append-Only表Join右侧Temporal Table,本质上还是左表驱动Join,即从左表拿到Key,根据Key和时间(可能是历史时间)去右侧Temporal Table表中查询。
- Temporal Table Join目前只支持Inner Join & Left Join。
- Temporal Table Join时,右侧Temporal Table表返回最新一个版本的数据。举个栗子,左侧事件时间如是2016-01-01 00:00:01秒,Join时,只会从右侧Temporal Table中选取<=2016-01-01 00:00:01的最新版本的数据。