SparkSQL内核解析之逻辑计划

SparkSQL逻辑计划概述

逻辑计划阶段被定义为LogicalPlan类,主要有三个阶段:

  1. 由SparkSqlParser中的AstBuilder将语法树的各个节点转换为对应LogicalPlan节点,组成未解析的逻辑算子树,不包含数据信息与列信息
  2. 由Analyzer将一系列规则作用在未解析逻辑算子树上,生成解析后的逻辑算子树
  3. 有Optimizer将一系列优化规则应用在逻辑算子树中,确保结果正确的前提下改进低效结构,生成优化后的逻辑算子树

LogicalPlan简介

概述

LogicalPlan的父类QueryPlan主要分为六个模块:

  • 输入输出 涉及QueryPlan内属性相关的输入输出
  • 基本属性 QueryPlan内的基本属性
  • 字符串 主要用于打印QueryPlan的树形结构信息
  • 规范化 类似Expression中的规范化
  • 表达式操作
  • 约束 本质上也是数据过滤条件的一种,同样是表达式类型。通过显式的过滤条件推导约束

基本操作和分类

SparkSQL内核解析-执行全过程概述

从SQL到RDD

1
2
3
4
5
6
7

// 创建SparkSession类。从2.0开始逐步替代SparkContext称为Spark应用入口
var spark = SparkSession.builder().appName("appName").master("local").getOrCreate()
//创建数据表并读取数据
spark.read.json("./test.json").createOrReplaceTempView("test_table")
//通过SQL进行数据分析。可输入任何满足语法的语句
spark.sql("select name from test_table where a > 1").show()

SQL转换步骤

实际转换过程

InternalRow体系

用来表示一行数据的类,根据下标来访问和操作元素,其中每一列都是Catalyst内部定义的数据类型;物理算子树产生和转换的RDD类型为RDD[InternalRow];

{3.png}

  • BaseGenericInternalRow 实现了InternalRow中所有定义的get类型方法,通过调用此类定义的genericGet虚函数进行,实现在下级子类中
    • GenericInternalRow 构造参数是Array[Any],采用对象数据进行底层存储,不允许通过set进行改变
    • SpecificInternalRow 构造函数是Array[MutableValue] ,运行通过set进行修改
    • MutableUnsafeRow 用来支持对特定列数据进行修改
  • JoinedRow 用户Join操作,将两个InternalRow放在一起形成新的InternalRow
  • UnsafeRow 不采用Java对象存储方式,避免GC的开销。同时对行数据进行特殊编码使得更高效(Tungsten计划)。

TreeNode体系

TreeNode是SparkSQL中所有树节点的基类,定义了通用集合操作和树遍历接口

  • Expression是Catalyst的表达式体系
  • QueryPlan下包含逻辑算子树和物理执行算子树两个子类

Catalyst还提供了节点位置功能,根据TreeNode定位到对应SQL字串中的位置,方便Debug

YAML 语言教程

编程免不了要写配置文件,怎么写配置也是一门学问。

YAML 是专门用来写配置文件的语言,非常简洁和强大,远比 JSON 格式方便。

本文介绍 YAML 的语法,以JS-YAML的实现为例。你可以去在线Demo验证下面的例子。

简介

YAML 语言(发音 /ˈjæməl/ )的设计目标,就是方便人类读写。它实质上是一种通用的数据串行化格式。

它的基本语法规则如下。

  • 大小写敏感
  • 使用缩进表示层级关系
  • 缩进时不允许使用Tab键,只允许使用空格。
  • 缩进的空格数目不重要,只要相同层级的元素左侧对齐即可

# 表示注释,从这个字符一直到行尾,都会被解析器忽略。

YAML 支持的数据结构有三种。

  • 对象:键值对的集合,又称为映射(mapping)/ 哈希(hashes) / 字典(dictionary)
  • 数组:一组按次序排列的值,又称为序列(sequence) / 列表(list)
  • 纯量(scalars):单个的、不可再分的值

对象

对象的一组键值对,使用冒号结构表示。

1
animal: pets

转为 JavaScript 如下。

1
{ animal: 'pets' }

Yaml 也允许另一种写法,将所有键值对写成一个行内对象。

1
hash: { name: Steve, foo: bar }

转为 JavaScript 如下。

1
{ hash: { name: 'Steve', foo: 'bar' } }

数组

一组连词线开头的行,构成一个数组。

1
2
3
- Cat
- Dog
- Goldfish

转为 JavaScript 如下。

1
[ 'Cat', 'Dog', 'Goldfish' ]

数据结构的子成员是一个数组,则可以在该项下面缩进一个空格。

1
2
3
4
-
- Cat
- Dog
- Goldfish

转为 JavaScript 如下。

1
[ [ 'Cat', 'Dog', 'Goldfish' ] ]

数组也可以采用行内表示法。

1
animal: [Cat, Dog]

转为 JavaScript 如下。

1
{ animal: [ 'Cat', 'Dog' ] }

Kafka Connect简介与部署

什么是Kafka Connect

Kafka 0.9+增加了一个新的特性Kafka Connect,可以更方便的创建和管理数据流管道。它为Kafka和其它系统创建规模可扩展的、可信赖的流数据提供了一个简单的模型,通过connectors可以将大数据从其它系统导入到Kafka中,也可以从Kafka中导出到其它系统。Kafka Connect可以将完整的数据库注入到Kafka的Topic中,或者将服务器的系统监控指标注入到Kafka,然后像正常的Kafka流处理机制一样进行数据流处理。而导出工作则是将数据从Kafka Topic中导出到其它数据存储系统、查询系统或者离线分析系统等,比如数据库、Elastic Search、Apache Ignite等。

Kafka Connect特性包括:

  • Kafka connector通用框架,提供统一的集成API
  • 同时支持分布式模式和单机模式
  • REST 接口,用来查看和管理Kafka connectors
  • 自动化的offset管理,开发人员不必担心错误处理的影响
  • 分布式、可扩展
  • 流/批处理集成

KafkaCnnect有两个核心概念:Source和Sink。 Source负责导入数据到Kafka,Sink负责从Kafka导出数据,它们都被称为Connector。

Log Compacted Topics in Apache Kafka

当我开始阅读Kafka官方文档时,尽管看起来Log Compacted Topics是一个简单的概念,但是我并不是很清楚Kafka内部是如何在文件系统中保存他们的状态。通过详细了解这个feature相关的文档,形成这边文档。

在这篇文档中,我会首先描述介绍kafka的log compacted Topics,然后我会介绍Topics是一个简单的概念,但是我并不是很清楚Kafka内部是如何在文件系统中保存他们的状态。

Flink Table & SQL LookableTableSource

在DataStream中,要实现流维Join,可以用Function,如MapFunctionFlatMapFunctionProcessFunction等等; 或通过Async I/O实现。

从Flink 1.9.0开始,提供了LookableTableSource,只需将Lookup数据源(如Mysql、HBase表)注册成LookableTableSource,即可用SQL的方式,实现流维Join。

注意:

  1. LookableTableSource只支持Blink Planner。
  2. Lookup数据源要注册成LookableTableSource,需要实现LookableTableSource接口。

LookableTableSource接口

1
2
3
4
5
6
7
8
public interface LookupableTableSource<T> extends TableSource<T> {

TableFunction<T> getLookupFunction(String[] lookupKeys);

AsyncTableFunction<T> getAsyncLookupFunction(String[] lookupKeys);

boolean isAsyncEnabled();
}

可看到,LookupableTableSource主要有三个方法:getLookupFunctiongetAsyncLookupFunctionisAsyncEnabled

  • getLookupFunction: 返回一个TableFunction。该Function可和LATERAL TABLE关键字结合使用,根据指定的key同步查找匹配的行。
  • getAsyncLookupFunction: 返回一个TableFunction。该Function可和LATERAL TABLE关键字结合使用,根据指定的key异步(Async I/O的方式)查找匹配的行。
  • isAsyncEnabled: 如果启用了异步Lookup,则此方法应返回true。当返回true时,必须实现getAsyncLookupFunction(String[] lookupKeys)方法。

Flink Table & SQL 用户自定义函数: UDF、UDAF、UDTF

本文总结Flink Table & SQL中的用户自定义函数: UDF、UDAF、UDTF。

  • UDF: 自定义标量函数(User Defined Scalar Function)。一行输入一行输出。
  • UDAF: 自定义聚合函数。多行输入一行输出。
  • UDTF: 自定义表函数。一行输入多行输出或一列输入多列输出。

测试数据

1
2
3
4
5
6
7
8
9
10
11
12
13
// 某个用户在某个时刻浏览了某个商品,以及商品的价值
// eventTime: 北京时间,方便测试。如下,乱序数据:
{"userID": "user_5", "eventTime": "2019-12-01 10:02:00", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_4", "eventTime": "2019-12-01 10:02:02", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_5", "eventTime": "2019-12-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_4", "eventTime": "2019-12-01 10:02:10", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_5", "eventTime": "2019-12-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_5", "eventTime": "2019-12-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_4", "eventTime": "2019-12-01 10:02:12", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_5", "eventTime": "2019-12-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_5", "eventTime": "2019-12-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_4", "eventTime": "2019-12-01 10:02:15", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_4", "eventTime": "2019-12-01 10:02:16", "eventType": "browse", "productID": "product_5", "productPrice": 20}

Flink Table & SQL AppendStreamTableSink、RetractStreamTableSink、UpsertStreamTableSink

Flink Table & SQL StreamTableSink有三类接口: AppendStreamTableSinkUpsertStreamTableSinkRetractStreamTableSink

  • AppendStreamTableSink: 可将动态表转换为Append流。适用于动态表只有Insert的场景。
  • RetractStreamTableSink: 可将动态表转换为Retract流。适用于动态表有Insert、Delete、Update的场景。
  • UpsertStreamTableSink: 可将动态表转换为Upsert流。适用于动态表有Insert、Delete、Update的场景。

注意:

  • RetractStreamTableSink中: Insert被编码成一条Add消息; Delete被编码成一条Retract消息;Update被编码成两条消息(先是一条Retract消息,再是一条Add消息),即先删除再增加。
  • UpsertStreamTableSink: Insert和Update均被编码成一条消息(Upsert消息); Delete被编码成一条Delete消息。
  • UpsertStreamTableSink和RetractStreamTableSink最大的不同在于Update编码成一条消息,效率上比RetractStreamTableSink高。
  • 上述说的编码指的是动态表转换为DataStream时,表的增删改如何体现到DataStream上。

本文主要想总结在Update场景下,RetractStreamTableSink和UpsertStreamTableSink的消息编码。

测试数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 自己造的测试数据
// 某个用户在某个时刻浏览了某个商品,以及商品的价值
// eventTime: 北京时间,方便测试。如下,乱序数据:
{"userID": "user_1", "eventTime": "2016-01-01 10:02:00", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_1", "eventTime": "2016-01-01 10:02:02", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_1", "eventTime": "2016-01-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_2", "eventTime": "2016-01-01 10:02:10", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_2", "eventTime": "2016-01-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_3", "eventTime": "2016-01-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_3", "eventTime": "2016-01-01 10:02:12", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_4", "eventTime": "2016-01-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_5", "eventTime": "2016-01-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_1", "eventTime": "2016-01-01 10:02:15", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_1", "eventTime": "2016-01-01 10:02:16", "eventType": "browse", "productID": "product_5", "productPrice": 20}

Flink DataStream Window 窗口函数 ReduceFunction、AggregateFunction、ProcessWindowFunction

Window Function在窗口触发后,负责对窗口内的元素进行计算。Window Function分为两类: 增量聚合和全量聚合。

  • 增量聚合: 窗口不维护原始数据,只维护中间结果,每次基于中间结果和增量数据进行聚合。如: ReduceFunctionAggregateFunction
  • 全量聚合: 窗口需要维护全部原始数据,窗口触发进行全量聚合。如:ProcessWindowFunction

本文总结增量聚合函数(ReduceFunctionAggregateFunction)和全量聚合函数(ProcessWindowFunction)的使用。

注意:

  1. FoldFunction也是增量聚合函数,但在Flink 1.9.0中已被标为过时(可用AggregateFunction代替),这里不做总结。
  2. WindowFunction也是全量聚合函数,已被更高级的ProcessWindowFunction逐渐代替,这里也不做总结

增量聚合

ReduceFunction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 测试数据: 某个用户在某个时刻浏览了某个商品,以及商品的价值
// {"userID": "user_4", "eventTime": "2019-11-09 10:41:32", "eventType": "browse", "productID": "product_1", "productPrice": 10}

// API
// T: 输入输出元素类型
public interface ReduceFunction<T> extends Function, Serializable {
T reduce(T value1, T value2) throws Exception;
}

// 示例: 获取一段时间内(Window Size)每个用户(KeyBy)浏览的商品的最大价值的那条记录(ReduceFunction)
kafkaStream
// 将从Kafka获取的JSON数据解析成Java Bean
.process(new KafkaProcessFunction())
// 提取时间戳生成水印
.assignTimestampsAndWatermarks(new MyCustomBoundedOutOfOrdernessTimestampExtractor(Time.seconds(maxOutOfOrdernessSeconds)))
// 按用户分组
.keyBy((KeySelector<UserActionLog, String>) UserActionLog::getUserID)
// 构造TimeWindow
.timeWindow(Time.seconds(windowLengthSeconds))
// 窗口函数: 获取这段窗口时间内每个用户浏览的商品的最大价值对应的那条记录
.reduce(new ReduceFunction<UserActionLog>() {
@Override
public UserActionLog reduce(UserActionLog value1, UserActionLog value2) throws Exception {
return value1.getProductPrice() > value2.getProductPrice() ? value1 : value2;
}
})
.print();

# 结果
UserActionLog{userID='user_4', eventTime='2019-11-09 12:51:25', eventType='browse', productID='product_3', productPrice=30}
UserActionLog{userID='user_2', eventTime='2019-11-09 12:51:29', eventType='browse', productID='product_2', productPrice=20}
UserActionLog{userID='user_1', eventTime='2019-11-09 12:51:22', eventType='browse', productID='product_3', productPrice=30}
UserActionLog{userID='user_5', eventTime='2019-11-09 12:51:21', eventType='browse', productID='product_3', productPrice=30}

注意: ReduceFunction输入输出元素类型相同。

Flink DataStream 提取Timestamp与生成Watermark

为了基于事件时间来处理每个元素,Flink需要知道每个元素(即事件)的事件时间(Timestamp)。为了衡量事件时间的处理进度,需要指定水印(Watermark)。

本文总结Flink DataStream中提取Timestamp与生成Watermark的两种方式。

在Source Function中直接指定Timestamp和生成Watermark

在源端(即SourceFunction)中直接指定Timestamp和生成Watermark。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package com.bigdata.flink.dataStreamExtractTimestampAndGenerateWatermark;

import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;

import java.util.Random;

/**
* Summary:
* 在Source Function中直接指定Timestamp和生成Watermark
*/
public class ExtractTimestampAndGenerateWatermark {
public static void main(String[] args) throws Exception{

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 设定时间特征为EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// 在源端(即SourceFunction)中直接指定Timestamp和生成Watermark
DataStreamSource<Tuple4<String, Long, String, Integer>> source = env.addSource(new ExampleSourceFunction());

env.execute();
}

public static class ExampleSourceFunction implements SourceFunction<Tuple4<String,Long,String,Integer>>{

private volatile boolean isRunning = true;
private static int maxOutOfOrderness = 10 * 1000;

private static final String[] userIDSample={"user_1","user_2","user_3"};
private static final String[] eventTypeSample={"click","browse"};
private static final int[] productIDSample={1,2,3,4,5};

@Override
public void run(SourceContext<Tuple4<String,Long,String,Integer>> ctx) throws Exception {
while (isRunning){

// 构造测试数据
String userID=userIDSample[(new Random()).nextInt(userIDSample.length)];
long eventTime = System.currentTimeMillis();
String eventType=eventTypeSample[(new Random()).nextInt(eventTypeSample.length)];
int productID=productIDSample[(new Random()).nextInt(productIDSample.length)];

Tuple4<String, Long, String, Integer> record = Tuple4.of(userID, eventTime, eventType, productID);

// 发出一条数据以及数据对应的Timestamp
ctx.collectWithTimestamp(record,eventTime);

// 发出一条Watermark
ctx.emitWatermark(new Watermark(eventTime - maxOutOfOrderness));

Thread.sleep(1000);
}
}

@Override
public void cancel() {
isRunning = false;
}
}
}