# FlinkTableAPI&SQL功能

在 5.1 节中对 Flink Table API & SQL 的概述和常见 API 都做了介绍,这篇文章先来看下其与 DataStream 和 DataSet API 的集成。

两个 planner 都可以与 DataStream API 集成,只有以前的 planner 才可以集成 DataSet API,所以下面讨论 DataSet API 都是和以前的 planner 有关。

Table API & SQL 查询与 DataStream 和 DataSet 程序集成是非常简单的,比如可以通过 Table API 或者 SQL 查询外部表数据,进行一些预处理后,然后使用 DataStream 或 DataSet API 继续处理一些复杂的计算,另外也可以将 DataStream 或 DataSet 处理后的数据利用 Table API 或者 SQL 写入到外部表去。总而言之,它们之间互相转换或者集成比较容易。

# Scala 的隐式转换

Scala Table API 提供了 DataSet、DataStream 和 Table 类的隐式转换,可以通过导入 org.apache.flink.table.api.scala._ 或者 org.apache.flink.api.scala._ 包来启用这些转换。

# 将 DataStream 或 DataSet 注册为 Table

DataStream 或者 DataSet 可以注册为 Table,结果表的 schema 取决于已经注册的 DataStream 和 DataSet 的数据类型。你可以像下面这种方式转换:

StreamTableEnvironment tableEnv = ...;
    
    DataStream<Tuple2<Long, String>> stream = ...
    
    //将 DataStream 注册为 myTable 表
    tableEnv.registerDataStream("myTable", stream);
    
    //将 DataStream 注册为 myTable2 表(表中的字段为 myLong、myString)
    tableEnv.registerDataStream("myTable2", stream, "myLong, myString");
    

# 将 DataStream 或 DataSet 转换为 Table

除了可以将 DataStream 或 DataSet 注册为 Table,还可以将它们转换为 Table,转换之后再去使用 Table API 查询就比较方便了。

 StreamTableEnvironment tableEnv = ...;
    
    DataStream<Tuple2<Long, String>> stream = ...
    
    //将 DataStream 转换成 Table
    Table table1 = tableEnv.fromDataStream(stream);
    
    //将 DataStream 转换成 Table
    Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");
    

# 将 Table 转换成 DataStream 或 DataSet

Table 可以转换为 DataStream 或 DataSet,这样就可以在 Table API 或 SQL 查询的结果上运行自定义的 DataStream 或 DataSet 程序。当将一个 Table 转换成 DataStream 或 DataSet 时,需要指定结果 DataStream 或 DataSet 的数据类型,最方便的数据类型是 Row,下面几个数据类型表示不同的功能:

  • Row:字段按位置映射,任意数量的字段,支持 null 值,没有类型安全访问。
  • POJO:字段按名称映射,POJO 属性必须按照 Table 中的属性来命名,任意数量的字段,支持 null 值,类型安全访问。
  • Case Class:字段按位置映射,不支持 null 值,类型安全访问。
  • Tuple:按位置映射字段,限制为 22(Scala)或 25(Java)字段,不支持 null 值,类型安全访问。
  • 原子类型:Table 必须具有单个字段,不支持 null 值,类型安全访问。
# 将 Table 转换成 DataStream

流查询的结果表会动态更新,即每个新的记录到达输入流时结果就会发生变化。所以在将 Table 转换成 DataStream 就需要对表的更新进行编码,有两种将 Table 转换为 DataStream 的模式:

  • 追加模式(Append Mode):这种模式只能在动态表仅通过 INSERT 更改修改时才能使用,即仅追加,之前发出的结果不会更新。
  • 撤回模式(Retract Mode):任何时刻都可以使用此模式,它使用一个 boolean 标志来编码 INSERT 和 DELETE 的更改。

    StreamTableEnvironment tableEnv = ...;
    
    //有两个字段(name、age) 的 Table
    Table table = ...
    
    //通过指定类,将表转换为一个 append DataStream
    DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);
    
    //将表转换为 Tuple2<String, Integer> 的 append DataStream
    TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(Types.STRING(), Types.INT());
    DataStream<Tuple2<String, Integer>> dsTuple = tableEnv.toAppendStream(table, tupleType);
    
    //将表转换为一个 Retract DataStream Row
    DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(table, Row.class);
# 将 Table 转换成 DataSet

将 Table 转换成 DataSet 的样例如下:

BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
    
    //有两个字段(name、age) 的 Table
    Table table = ...
    
    //通过指定一个类将表转换为一个 Row DataSet
    DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);
    
    //将表转换为 Tuple2<String, Integer> 的 DataSet
    TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(Types.STRING(), Types.INT());
    DataSet<Tuple2<String, Integer>> dsTuple = tableEnv.toDataSet(table, tupleType);
    

# 查询优化

Flink 使用 Calcite 来优化和翻译查询,以前的 planner 不会去优化 join 的顺序,而是按照查询中定义的顺序去执行。通过提供一个 CalciteConfig 对象来调整在不同阶段应用的优化规则集,这个可以通过调用 CalciteConfig.createBuilder() 获得的 builder 来创建,并且可以通过调用tableEnv.getConfig.setCalciteConfig(calciteConfig) 来提供给 TableEnvironment。而在 Blink planner 中扩展了 Calcite 来执行复杂的查询优化,这包括一系列基于规则和成本的优化,比如:

  • 基于 Calcite 的子查询去相关性
  • Project pruning
  • Partition pruning
  • Filter push-down
  • 删除子计划中的重复数据以避免重复计算
  • 重写特殊的子查询,包括两部分:
    • 将 IN 和 EXISTS 转换为 left semi-joins
    • 将 NOT IN 和 NOT EXISTS 转换为 left anti-join
  • 重排序可选的 join
    • 通过启用 table.optimizer.join-reorder-enabled

注意:IN/EXISTS/NOT IN/NOT EXISTS 目前只支持子查询重写中的连接条件。

# 解释 Table

Table API 提供了一种机制来解释计算 Table 的逻辑和优化查询计划。你可以通过 TableEnvironment.explain(table) 或者 TableEnvironment.explain() 方法来完成。explain(table) 会返回给定计划的 Table,explain() 会返回多路 Sink 计划的结果(主要用于 Blink planner)。它返回一个描述三个计划的字符串:

  • 关系查询的抽象语法树,即未优化的逻辑查询计划
  • 优化的逻辑查询计划
  • 实际执行计划

以下代码演示了一个 Table 示例:

 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
    
    DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello"));
    DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello"));
    
    Table table1 = tEnv.fromDataStream(stream1, "count, word");
    Table table2 = tEnv.fromDataStream(stream2, "count, word");
    Table table = table1.where("LIKE(word, 'F%')").unionAll(table2);
    
    System.out.println(tEnv.explain(table));

通过 explain(table) 方法返回的结果:


== Abstract Syntax Tree == LogicalUnion(all=[true]) LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')]) FlinkLogicalDataStreamScan(id=[1], fields=[count, word]) FlinkLogicalDataStreamScan(id=[2], fields=[count, word])

== Optimized Logical Plan ==
DataStreamUnion(all=[true], union all=[count, word])
  DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
    DataStreamScan(id=[1], fields=[count, word])
  DataStreamScan(id=[2], fields=[count, word])

== Physical Execution Plan ==
Stage 1 : Data Source
    content : collect elements with CollectionInputFormat

Stage 2 : Data Source
    content : collect elements with CollectionInputFormat

    Stage 3 : Operator
        content : from: (count, word)
        ship_strategy : REBALANCE

        Stage 4 : Operator
            content : where: (LIKE(word, _UTF-16LE'F%')), select: (count, word)
            ship_strategy : FORWARD

            Stage 5 : Operator
                content : from: (count, word)
                ship_strategy : REBALANCE

# 数据类型

在 Flink 1.9 之前,Flink 的 Table API&SQL 的数据类型与 Flink 中的 TypeInformation 紧密相关。TypeInformation 在 DataStream 和 DataSet API 中使用,另外它还可以描述在分布式中序列化和反序列化基于 JVM 对象所需的所有信息。从 1.9 版本之后,Table API&SQL 会引入一种新类型来作为 API 稳定性和标准的长期解决方案。在以前的 planner 和 Blink planner 的数据类型有点不一致,具体差别可以参考官网。

# 时间属性

在 3.1 节中介绍过 Flink 的多种时间语义,常用的比如 Event time 和 Processing time,那么在 Table API&SQL 中怎么去定义时间语义呢?

# Processing Time

因为处理时间是额外的数据字段,在原始的事件中是不存在该字段的,那么在将数据流转换成 Table 的时候就需要将这个 Processing time 当作 Table 的一个字段,以供后面需要,比如定义窗口。你可以像下面这样定义:

    DataStream<Tuple2<String, String>> stream = ...;
    
    //将附加的逻辑字段声明为 Processing time 属性
    Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.proctime");
    
    WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));

如果是直接使用 TableSource 的话,那么需要实现 DefinedProctimeAttribute 接口,然后去重写 getProctimeAttribute 方法,返回的字符串表示 Processing time 在 Table 中的字段名。

# Event time

Event time 是在采集上来的事件中就有的,将数据流转换成 Table 的时候需要像下面这样定义:

  //第一种方法:
    //提取流数据时间戳并分配水印
    DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
    //将附加的逻辑字段声明为 Event time 属性,和 Processing time 不同的是这里使用 rowtime
    Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.rowtime");
    
    //第二种方法:
    //从第一个字段提取时间戳,并分配水印
    DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
    Table table = tEnv.fromDataStream(stream, "UserActionTime.rowtime, Username, Data");
    
    //使用方式:
    WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    

使用 TableSource 的话则需要实现 DefinedRowtimeAttributes 接口,重写 getRowtimeAttributeDescriptors 方法,该方法返回一个 RowtimeAttributeDescriptor 列表,其用于描述时间属性的最终名称、时间提取器以及该属性关联的水印策略。

# SQL Connector

在第三部分中介绍了大量的 Flink Connectors 的使用,但是那些都是通过 DataStream API 是去使用,放在 Table API&SQL 中其实不再适合,其实 Flink Table API&SQL 是可以直接连接到外部系统的,然后读取和写入批处理表和流处理表。TableSource 提供从外部系统(数据库、MQ、文件系统等)读取数据,TableSink 将结果存储到数据库中。这里讲解一下该如何去定义 TableSource 和 TableSink 并将它们注册。在官网,它提供了如下这些 Connectors 和 Formats 的下载。

images 从 Flink 1.6 开始,不仅可以使用编程的方式指定 Connector,还可以使用声明式去定义。下面举个例子(读取 Kafka 中 Avro 格式的数据)来讲解这两种区别。

# 使用代码


    
    tableEnvironment
      //声明要连接的外部系统
      .connect(
        new Kafka()
          .version("0.10")
          .topic("zhisheng_user")
          .startFromEarliest()
          .property("zookeeper.connect", "localhost:2181")
          .property("bootstrap.servers", "localhost:9092")
      )
      //定义数据格式
      .withFormat(
        new Avro()
          .avroSchema(
            "{" +
            "  \"namespace\": \"com.zhisheng\"," +
            "  \"type\": \"record\"," +
            "  \"name\": \"UserMessage\"," +
            "    \"fields\": [" +
            "      {\"name\": \"timestamp\", \"type\": \"string\"}," +
            "      {\"name\": \"user\", \"type\": \"long\"}," +
            "      {\"name\": \"message\", \"type\": [\"string\", \"null\"]}" +
            "    ]" +
            "}"
          )
      )
      //定义 Table schema
      .withSchema(
        new Schema()
          .field("rowtime", Types.SQL_TIMESTAMP)
            .rowtime(new Rowtime()
              .timestampsFromField("timestamp")
              .watermarksPeriodicBounded(60000)
            )
          .field("user", Types.LONG)
          .field("message", Types.STRING)
      )
      .inAppendMode()  //指定流表的 update-mode
      .registerTableSource("zhisheng");    //注册表的名字

# 使用 YAML 文件

  
    tables:
      - name: zhisheng      #表的名字
        type: source           #定义是 source,还是 sink,或者 both
        update-mode: append    #指定流表的 update-mode
        #定义要连接的系统
        connector:
          type: kafka
          version: "0.10"
          topic: zhisheng_user
          startup-mode: earliest-offset
          properties:
            - key: zookeeper.connect
              value: localhost:2181
            - key: bootstrap.servers
              value: localhost:9092
    
        #定义格式
        format:
          type: avro
          avro-schema: >
            {
              "namespace": "com.zhisheng",
              "type": "record",
              "name": "UserMessage",
                "fields": [
                  {"name": "ts", "type": "string"},
                  {"name": "user", "type": "long"},
                  {"name": "message", "type": ["string", "null"]}
                ]
            }
        #定义 table schema
        schema:
          - name: rowtime
            type: TIMESTAMP
            rowtime:
              timestamps:
                type: from-field
                from: ts
              watermarks:
                type: periodic-bounded
                delay: "60000"
          - name: user
            type: BIGINT
          - name: message
            type: VARCHAR

# 使用 DDL


    CREATE TABLE zhisheng (
      `user` BIGINT,
      message VARCHAR,
      ts VARCHAR
    ) WITH (
      'connector.type' = 'kafka',
      'connector.version' = '0.10',
      'connector.topic' = 'zhisheng_user',
      'connector.startup-mode' = 'earliest-offset',
      'connector.properties.0.key' = 'zookeeper.connect',
      'connector.properties.0.value' = 'localhost:2181',
      'connector.properties.1.key' = 'bootstrap.servers',
      'connector.properties.1.value' = 'localhost:9092',
      'update-mode' = 'append',
      'format.type' = 'avro',
      'format.avro-schema' = '{
                                "namespace": "com.zhisheng",
                                "type": "record",
                                "name": "UserMessage",
                                "fields": [
                                    {"name": "ts", "type": "string"},
                                    {"name": "user", "type": "long"},
                                    {"name": "message", "type": ["string", "null"]}
                                ]
                             }'
    )

上面演示了 Kafka Connector 和 avro 数据格式化在 Table API&SQL 中的使用方式,在官网中还有文件系统和 Elasticsearch Connector、CSV 和 JSON 等的使用说明。

# SQL Client

虽然 Flink Table API&SQL 让使用 SQL 去查询流数据有了可能,但是这些查询语句通常要嵌入在 Java 或者 Scala 程序中,最后在提交到集群运行之前还要通过构建工具打包,这就导致 Table API&SQL 的限制性很大,所以 SQL Client 就起到这么个作用,让用户不再编写任何 Java 或者 Scala 代码,直接编写 SQL 就可以去调试运行,并且可以通过其他命令行实时查看运行的结果,但是该功能目前还比较弱。

在启动 Flink 后可以通过运行 ./bin/sql-client.sh embedded 命令来启动 SQL Client CLI,如下图所示:

images 你可以运行下面的命令就可以知道名字和其出现的次数的结果。


SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;

另外它还支持传入 YAML 文件,你可以在 YAML 文件中如前面内容一样定义的 Kafka Connector 等信息,关于 SQL Client 的更多功能可以查阅官网。

# Hive

Hive 是建立在 Hadoop 上的数据仓库基础构架,它提供了一系列的工具,可以用来进行数据提取转化加载(ETL),这是一种可以存储、查询和分析存储在 Hadoop 中的大规模数据的机制。Hive 定义了简单的类 SQL 查询语言,称为 HQL,它允许熟悉 SQL 的用户查询数据。

Flink 在 1.9 版本中提供了与 Hive 的双重集成。首先是利用 Hive 的 Metastore 存储 Flink 特定元数据,另一个是 Flink 支持读取和写入 Hive 表。支持的 Hive 2.3.4 和 1.2.1 版本,如果你要使用的话,注意它们的依赖是有点不一样。

你可以通过 Java、Scala、YAML 连接 Hive,比如使用 Java 代码如下:


String name = "myhive"; String defaultDatabase = "mydatabase"; String hiveConfDir = "/opt/hive-conf"; String version = "2.3.4"; //或者 1.2.1

HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
tableEnv.registerCatalog("myhive", hive);

# 小结与反思

本节继续介绍了 Flink Table API&SQL 中的部分 API,然后讲解了 Flink 之前的 planner 和 Blink planner 在某些特性上面的区别,还讲解了 SQL Connector,最后介绍了 SQL Client 和 Hive。