在DataStream中,要实现流维Join,可以用Function,如MapFunctionFlatMapFunctionProcessFunction等等; 或通过Async I/O实现。

从Flink 1.9.0开始,提供了LookableTableSource,只需将Lookup数据源(如Mysql、HBase表)注册成LookableTableSource,即可用SQL的方式,实现流维Join。

注意:

  1. LookableTableSource只支持Blink Planner。
  2. Lookup数据源要注册成LookableTableSource,需要实现LookableTableSource接口。

LookableTableSource接口

1
2
3
4
5
6
7
8
public interface LookupableTableSource<T> extends TableSource<T> {

TableFunction<T> getLookupFunction(String[] lookupKeys);

AsyncTableFunction<T> getAsyncLookupFunction(String[] lookupKeys);

boolean isAsyncEnabled();
}

可看到,LookupableTableSource主要有三个方法:getLookupFunctiongetAsyncLookupFunctionisAsyncEnabled

  • getLookupFunction: 返回一个TableFunction。该Function可和LATERAL TABLE关键字结合使用,根据指定的key同步查找匹配的行。
  • getAsyncLookupFunction: 返回一个TableFunction。该Function可和LATERAL TABLE关键字结合使用,根据指定的key异步(Async I/O的方式)查找匹配的行。
  • isAsyncEnabled: 如果启用了异步Lookup,则此方法应返回true。当返回true时,必须实现getAsyncLookupFunction(String[] lookupKeys)方法。

用LookableTableSource实现Kafka Join HBase/Mysql维度数据

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
package com.bigdata.flink.tableSqlLookableTableSource;

import com.alibaba.fastjson.JSON;
import com.bigdata.flink.beans.table.UserBrowseLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.addons.hbase.HBaseTableSource;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.io.jdbc.JDBCLookupOptions;
import org.apache.flink.api.java.io.jdbc.JDBCOptions;
import org.apache.flink.api.java.io.jdbc.JDBCTableSource;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.hadoop.conf.Configuration;

import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Properties;

/**
* Summary:
* Lookup Table Source
*/
@Slf4j
public class Test {
public static void main(String[] args) throws Exception{

args=new String[]{"--application","flink/src/main/java/com/bigdata/flink/tableSqlLookableTableSource/application.properties"};

//1、解析命令行参数
ParameterTool fromArgs = ParameterTool.fromArgs(args);
ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fromArgs.getRequired("application"));

String kafkaBootstrapServers = parameterTool.getRequired("kafkaBootstrapServers");
String browseTopic = parameterTool.getRequired("browseTopic");
String browseTopicGroupID = parameterTool.getRequired("browseTopicGroupID");

String hbaseZookeeperQuorum = parameterTool.getRequired("hbaseZookeeperQuorum");
String hbaseZnode = parameterTool.getRequired("hbaseZnode");
String hbaseTable = parameterTool.getRequired("hbaseTable");

String mysqlDBUrl = parameterTool.getRequired("mysqlDBUrl");
String mysqlUser = parameterTool.getRequired("mysqlUser");
String mysqlPwd = parameterTool.getRequired("mysqlPwd");
String mysqlTable = parameterTool.getRequired("mysqlTable");

//2、设置运行环境
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
streamEnv.setParallelism(1);

//3、注册Kafka数据源
//自己造的测试数据,某个用户在某个时刻点击了某个商品,以及商品的价值,如下
//{"userID": "user_1", "eventTime": "2016-01-01 10:02:00", "eventType": "browse", "productID": "product_1", "productPrice": 20}
Properties browseProperties = new Properties();
browseProperties.put("bootstrap.servers",kafkaBootstrapServers);
browseProperties.put("group.id",browseTopicGroupID);
DataStream<UserBrowseLog> browseStream=streamEnv
.addSource(new FlinkKafkaConsumer010<>(browseTopic, new SimpleStringSchema(), browseProperties))
.process(new BrowseKafkaProcessFunction());
tableEnv.registerDataStream("kafka",browseStream,"userID,eventTime,eventTimeTimestamp,eventType,productID,productPrice");
//tableEnv.toAppendStream(tableEnv.scan("kafka"),Row.class).print();

//4、注册HBase数据源(Lookup Table Source)
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", hbaseZookeeperQuorum);
conf.set("zookeeper.znode.parent",hbaseZnode);
HBaseTableSource hBaseTableSource = new HBaseTableSource(conf, hbaseTable);
hBaseTableSource.setRowKey("uid",String.class);
hBaseTableSource.addColumn("f1","name",String.class);
hBaseTableSource.addColumn("f1","age",Integer.class);
tableEnv.registerTableSource("hbase",hBaseTableSource);
//注册TableFunction
tableEnv.registerFunction("hbaseLookup", hBaseTableSource.getLookupFunction(new String[]{"uid"}));

//5、注册Mysql数据源(Lookup Table Source)
String[] mysqlFieldNames={"pid","productName","productCategory","updatedAt"};
DataType[] mysqlFieldTypes={DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING()};
TableSchema mysqlTableSchema = TableSchema.builder().fields(mysqlFieldNames, mysqlFieldTypes).build();
JDBCOptions jdbcOptions = JDBCOptions.builder()
.setDriverName("com.mysql.jdbc.Driver")
.setDBUrl(mysqlDBUrl)
.setUsername(mysqlUser)
.setPassword(mysqlPwd)
.setTableName(mysqlTable)
.build();

JDBCLookupOptions jdbcLookupOptions = JDBCLookupOptions.builder()
.setCacheExpireMs(10 * 1000) //缓存有效期
.setCacheMaxSize(10) //最大缓存数据条数
.setMaxRetryTimes(3) //最大重试次数
.build();

JDBCTableSource jdbcTableSource = JDBCTableSource.builder()
.setOptions(jdbcOptions)
.setLookupOptions(jdbcLookupOptions)
.setSchema(mysqlTableSchema)
.build();
tableEnv.registerTableSource("mysql",jdbcTableSource);
//注册TableFunction
tableEnv.registerFunction("mysqlLookup",jdbcTableSource.getLookupFunction(new String[]{"pid"}));


//6、查询
//根据userID, 从HBase表中获取用户基础信息
//根据productID, 从Mysql表中获取产品基础信息
String sql = ""
+ "SELECT "
+ " userID, "
+ " eventTime, "
+ " eventType, "
+ " productID, "
+ " productPrice, "
+ " f1.name AS userName, "
+ " f1.age AS userAge, "
+ " productName, "
+ " productCategory "
+ "FROM "
+ " kafka, "
+ " LATERAL TABLE(hbaseLookup(userID)), "
+ " LATERAL TABLE (mysqlLookup(productID))";

tableEnv.toAppendStream(tableEnv.sqlQuery(sql),Row.class).print();

//7、开始执行
tableEnv.execute(Test.class.getSimpleName());
}

/**
* 解析Kafka数据
*/
static class BrowseKafkaProcessFunction extends ProcessFunction<String, UserBrowseLog> {
@Override
public void processElement(String value, Context ctx, Collector<UserBrowseLog> out) throws Exception {
try {

UserBrowseLog log = JSON.parseObject(value, UserBrowseLog.class);

// 增加一个long类型的时间戳
// 指定eventTime为yyyy-MM-dd HH:mm:ss格式的北京时间
DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
OffsetDateTime eventTime = LocalDateTime.parse(log.getEventTime(), format).atOffset(ZoneOffset.of("+08:00"));
// 转换成毫秒时间戳
long eventTimeTimestamp = eventTime.toInstant().toEpochMilli();
log.setEventTimeTimestamp(eventTimeTimestamp);

out.collect(log);
}catch (Exception ex){
log.error("解析Kafka数据异常...",ex);
}
}
}

}

结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//向Kafka Topic中输入测试数据
{"userID": "user_1", "eventTime": "2016-01-01 10:02:00", "eventType": "browse", "productID": "product_1", "productPrice": 20}
{"userID": "user_1", "eventTime": "2016-01-01 10:02:02", "eventType": "browse", "productID": "product_1", "productPrice": 20}
{"userID": "user_1", "eventTime": "2016-01-01 10:02:06", "eventType": "browse", "productID": "product_2", "productPrice": 20}
{"userID": "user_2", "eventTime": "2016-01-01 10:02:10", "eventType": "browse", "productID": "product_2", "productPrice": 20}
{"userID": "user_2", "eventTime": "2016-01-01 10:02:15", "eventType": "browse", "productID": "product_3", "productPrice": 20}

//得到如下结果
//第一行name10,10 是从HBase获取的数据,productName1,productCategory1 是从Mysql获取的数据
//其他行,以此类推
user_1,2016-01-01 10:02:02,browse,product_1,20,name10,10,productName1,productCategory1
user_2,2016-01-01 10:02:15,browse,product_3,20,name2,20,productName3,productCategory3
user_1,2016-01-01 10:02:06,browse,product_2,20,name10,10,productName2,productCategory2
user_1,2016-01-01 10:02:00,browse,product_1,20,name10,10,productName1,productCategory1
user_2,2016-01-01 10:02:10,browse,product_2,20,name2,20,productName2,productCategory2

注意: 默认提供的HBaseTableSourceJDBCTableSource都只实现了同步查找方法,即getLookupFunction方法,如有需要,可自行实现getAsyncLookupFunction方法。