# 如何使用DataStreamAPI来处理数据?

在 3.3 节中讲了数据转换常用的 Operators(算子),然后在 3.2 节中也讲了 Flink 中窗口的概念和原理,那么我们这篇文章再来细讲一下 Flink 中的各种 DataStream API。

我们先来看下源码里面的 DataStream 大概有哪些类呢?

images 可以发现其实还是有很多的类,只有熟练掌握了这些 API,我们才能在做数据转换和计算的时候足够灵活的运用开来(知道何时该选用哪种 DataStream?选用哪个 Function?)。那么我们先从 DataStream 开始吧!

# DataStream 如何使用及分析

首先我们来看下 DataStream 这个类的定义吧:


A DataStream represents a stream of elements of the same type. A DataStreamcan be transformed into another DataStream by applying a transformation as DataStream#map or DataStream#filter}

大概意思是:DataStream 表示相同类型的元素组成的数据流,一个数据流可以通过 map/filter 等算子转换成另一个数据流。

然后 DataStream 的类结构图如下:

images

它的继承类有 KeyedStream、SingleOutputStreamOperator 和SplitStream。这几个类本文后面都会一一给大家讲清楚。下面我们来看看 DataStream 这个类中的属性和方法吧。

它的属性就只有两个:

protected final StreamExecutionEnvironment environment;

protected final StreamTransformation<T> transformation;

但是它的方法却有很多,并且我们平时写的 Flink Job几乎离不开这些方法,这也注定了这个类的重要性,所以得好好看下这些方法该如何使用,以及是如何实现的。

# union

通过合并相同数据类型的数据流,然后创建一个新的数据流,union 方法代码实现如下:

public final DataStream<T> union(DataStream<T>... streams) {
    List<StreamTransformation<T>> unionedTransforms = new ArrayList<>();
    unionedTransforms.add(this.transformation);

    for (DataStream<T> newStream : streams) {
        if (!getType().equals(newStream.getType())) {   //判断数据类型是否一致
            throw new IllegalArgumentException("Cannot union streams of different types: " + getType() + " and " + newStream.getType());
        }
        unionedTransforms.add(newStream.getTransformation());
    }
    //构建新的数据流
    return new DataStream<>(this.environment, new UnionTransformation<>(unionedTransforms));//通过使用 UnionTransformation 将多个 StreamTransformation 合并起来
}

那么我们该如何去使用 union 呢(不止连接一个数据流,也可以连接多个数据流)?

//数据流 1 和 2
final DataStream<Integer> stream1 = env.addSource(...);
final DataStream<Integer> stream2 = env.addSource(...);
//union
stream1.union(stream2)

# split

该方法可以将两个数据流进行拆分,拆分后的数据流变成了 SplitStream(在下文会详细介绍这个类的内部实现),该 split 方法通过传入一个 OutputSelector 参数进行数据选择,方法内部实现就是构造一个 SplitStream 对象然后返回:

public SplitStream<T> split(OutputSelector<T> outputSelector) {
    return new SplitStream<>(this, clean(outputSelector));
}

然后我们该如何使用这个方法呢?

dataStream.split(new OutputSelector<Integer>() {
    private static final long serialVersionUID = 8354166915727490130L;

    @Override
    public Iterable<String> select(Integer value) {
        List<String> s = new ArrayList<String>();
        if (value > 4) {    //大于 4 的数据放到 > 这个 tag 里面去
            s.add(">");
        } else {    //小于等于 4 的数据放到 < 这个 tag 里面去
            s.add("<");
        }
        return s;
    }
});

注意:该方法已经不推荐使用了!在 1.7 版本以后建议使用 Side Output 来实现分流操作。

# connect

通过连接不同或相同数据类型的数据流,然后创建一个新的连接数据流,如果连接的数据流也是一个 DataStream 的话,那么连接后的数据流为 ConnectedStreams(会在下文介绍这个类的具体实现),它的具体实现如下:

public <R> ConnectedStreams<T, R> connect(DataStream<R> dataStream) {
    return new ConnectedStreams<>(environment, this, dataStream);
}

如果连接的数据流是一个 BroadcastStream(广播数据流),那么连接后的数据流是一个 BroadcastConnectedStream(会在下文详细介绍该类的内部实现),它的具体实现如下:

public <R> BroadcastConnectedStream<T, R> connect(BroadcastStream<R> broadcastStream) {
    return new BroadcastConnectedStream<>(
            environment, this, Preconditions.checkNotNull(broadcastStream), 
            broadcastStream.getBroadcastStateDescriptor());
}

使用如下:

//1、连接 DataStream
DataStream<Tuple2<Long, Long>> src1 = env.fromElements(new Tuple2<>(0L, 0L));
DataStream<Tuple2<Long, Long>> src2 = env.fromElements(new Tuple2<>(0L, 0L));
ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connected = src1.connect(src2);

//2、连接 BroadcastStream
DataStream<Tuple2<Long, Long>> src1 = env.fromElements(new Tuple2<>(0L, 0L));
final BroadcastStream<String> broadcast = srcTwo.broadcast(utterDescriptor);
BroadcastConnectedStream<Tuple2<Long, Long>, String> connect = src1.connect(broadcast);

# keyBy

keyBy 方法是用来将数据进行分组的,通过该方法可以将具有相同 key 的数据划分在一起组成新的数据流,该方法有四种(它们的参数各不一样):

//1、参数是 KeySelector 对象
public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key) {
    ...
    return new KeyedStream<>(this, clean(key));//构造 KeyedStream 对象
}

//2、参数是 KeySelector 对象和 TypeInformation 对象
public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key, TypeInformation<K> keyType) {
    ...
    return new KeyedStream<>(this, clean(key), keyType);//构造 KeyedStream 对象
}

//3、参数是 1 至多个字段(用 0、1、2... 表示)
public KeyedStream<T, Tuple> keyBy(int... fields) {
    if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
        return keyBy(KeySelectorUtil.getSelectorForArray(fields, getType()));
    } else {
        return keyBy(new Keys.ExpressionKeys<>(fields, getType()));//调用 private 的 keyBy 方法
    }
}

//4、参数是 1 至多个字符串
public KeyedStream<T, Tuple> keyBy(String... fields) {
    return keyBy(new Keys.ExpressionKeys<>(fields, getType()));//调用 private 的 keyBy 方法
}

//真正调用的方法
private KeyedStream<T, Tuple> keyBy(Keys<T> keys) {
    return new KeyedStream<>(this, clean(KeySelectorUtil.getSelectorForKeys(keys,
            getType(), getExecutionConfig())));
}

如何使用呢:

DataStream<Event> dataStream = env.fromElements(
    new Event(1, "zhisheng01", 1.0),
    new Event(2, "zhisheng02", 2.0),
    new Event(3, "zhisheng03", 2.1),
    new Event(3, "zhisheng04", 3.0),
    new SubEvent(4, "zhisheng05", 4.0, 1.0),
);

//第1种
dataStream.keyBy(new KeySelector<Event, Integer>() {

    @Override
    public Integer getKey(Event value) throws Exception {
        return value.getId();
    }
});

//第2种
dataStream.keyBy(new KeySelector<Event, Integer>() {

    @Override
    public Integer getKey(Event value) throws Exception {
        return value.getId();
    }
}, Types.STRING);

//第3种
dataStream.keyBy(0);

//第4种
dataStream.keyBy("zhisheng01", "zhisheng02");

# partitionCustom

使用自定义分区器在指定的 key 字段上将 DataStream 分区,这个 partitionCustom 有 3个不同参数的方法,分别要传入的参数有自定义分区 Partitioner 对象、位置、字符和 KeySelector。它们内部也都是调用了私有的partitionCustom 方法。

# broadcast

broadcast 是将数据流进行广播,然后让下游的每个并行 Task中都可以获取到这份数据流,通常这些数据是一些配置,一般这些配置数据的数据量不能太大,否则资源消耗会比较大。这个 broadcast方法也有两个,一个是无参数,它返回的数据是 DataStream;另一种的参数是 MapStateDescriptor,它返回的参数是 BroadcastStream(这个也会在下文详细介绍)。

使用方法:

//1、第一种
DataStream<Tuple2<Integer, String>> source = env.addSource(...).broadcast();

//2、第二种
final MapStateDescriptor<Long, String> utterDescriptor = new MapStateDescriptor<>(
        "broadcast-state", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
);
final DataStream<String> srcTwo = env.fromCollection(expected.values());

final BroadcastStream<String> broadcast = srcTwo.broadcast(utterDescriptor);

# map

map 方法需要传入的参数是一个 MapFunction,当然传入 RichMapFunction 也是可以的,它返回的是SingleOutputStreamOperator(这个类在会在下文详细介绍),该 map 方法里面的实现如下:

public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {

    TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
            Utils.getCallLocationName(), true);
    //调用 transform 方法
    return transform("Map", outType, new StreamMap<>(clean(mapper)));
}

该方法平时使用的非常频繁,然后我们该如何使用这个方法呢:

dataStream.map(new MapFunction<Integer, String>() {
    private static final long serialVersionUID = 1L;

    @Override
    public String map(Integer value) throws Exception {
        return value.toString();
    }
})

# flatMap

flatMap 方法需要传入一个 FlatMapFunction 参数,当然传入 RichFlatMapFunction 也是可以的,如果你的 FlinkJob 里面有连续的 filter 和 map 算子在一起,可以考虑使用 flatMap 一个算子来完成两个算子的工作,它返回的是SingleOutputStreamOperator,该 flatMap 方法里面的实现如下:

public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {

    TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
            getType(), Utils.getCallLocationName(), true);
    //调用 transform 方法
    return transform("Flat Map", outType, new StreamFlatMap<>(clean(flatMapper)));

}

该方法平时使用的非常频繁,使用方式如下:

dataStream.flatMap(new FlatMapFunction<Integer, Integer>() {
    @Override
    public void flatMap(Integer value, Collector<Integer> out) throws Exception {
        out.collect(value);
    }
})

# process

在输入流上应用给定的 ProcessFunction,从而创建转换后的输出流,通过该方法返回的是SingleOutputStreamOperator,具体代码实现如下:

public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) {

    TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType(
        processFunction, ProcessFunction.class, 0, 1,
        TypeExtractor.NO_INDEX, getType(), Utils.getCallLocationName(), true);
    //调用下面的 process 方法
    return process(processFunction, outType);
}

public <R> SingleOutputStreamOperator<R> process(
        ProcessFunction<T, R> processFunction,
        TypeInformation<R> outputType) {

    ProcessOperator<T, R> operator = new ProcessOperator<>(clean(processFunction));
    //调用 transform 方法
    return transform("Process", outputType, operator);
}

使用方法:

DataStreamSource<Long> data = env.generateSequence(0, 0);

//定义的 ProcessFunction
ProcessFunction<Long, Integer> processFunction = new ProcessFunction<Long, Integer>() {
    private static final long serialVersionUID = 1L;

    @Override
    public void processElement(Long value, Context ctx,
            Collector<Integer> out) throws Exception {
        //具体逻辑
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx,
            Collector<Integer> out) throws Exception {
        //具体逻辑
    }
};

DataStream<Integer> processed = data.keyBy(new IdentityKeySelector<Long>()).process(processFunction);

# filter

filter 用来过滤数据的,它需要传入一个 FilterFunction,然后返回的数据也是 SingleOutputStreamOperator,该方法的实现是:

public SingleOutputStreamOperator<T> filter(FilterFunction<T> filter) {
    return transform("Filter", getType(), new StreamFilter<>(clean(filter)));
}

该方法平时使用非常多:

DataStream<String> filter1 = src
    .filter(new FilterFunction<String>() {
        @Override
        public boolean filter(String value) throws Exception {
            return "zhisheng".equals(value);
        }
    })

上面这些方法是平时写代码时用的非常多的方法,我们这里讲解了它们的实现原理和使用方式,当然还有其他方法,比如assignTimestampsAndWatermarks、join、shuffle、forward、addSink、rebalance、iterate、coGroup、project、timeWindowAll、countWindowAll、windowAll、print等,这里由于篇幅的问题就不一一展开来讲了。

# SingleOutputStreamOperator 如何使用及分析

SingleOutputStreamOperator 这个类继承自 DataStream,所以 DataStream中有的方法在这里也都有,那么这里就讲解下额外的方法的作用,如下。

  • name():该方法可以设置当前数据流的名称,如果设置了该值,则可以在 Flink UI 上看到该值;uid() 方法可以为算子设置一个指定的 ID,该 ID 有个作用就是如果想从 savepoint 恢复 Job 时是可以根据这个算子的 ID 来恢复到它之前的运行状态;
  • setParallelism() :该方法是为每个算子单独设置并行度的,这个设置优先于你通过 env 设置的全局并行度;
  • setMaxParallelism() :该为算子设置最大的并行度;
  • setResources():该方法有两个(参数不同),设置算子的资源,但是这两个方法对外还没开放(是私有的,暂时功能性还不全);
  • forceNonParallel():该方法强行将并行度和最大并行度都设置为 1;
  • setChainingStrategy():该方法对给定的算子设置 ChainingStrategy;
  • disableChaining():该这个方法设置后将禁止该算子与其他的算子 chain 在一起;
  • getSideOutput():该方法通过给定的 OutputTag 参数从 side output 中来筛选出对应的数据流。

# KeyedStream 如何使用及分析

KeyedStream 是 DataStream 在根据 KeySelector 分区后的数据流,DataStream 中常用的方法在KeyedStream 后也可以用(除了 shuffle、forward 和 keyBy 等分区方法),在该类中的属性分别是 KeySelector 和TypeInformation。

DataStream 中的窗口方法只有 timeWindowAll、countWindowAll 和 windowAll 这三种全局窗口方法,但是在KeyedStream 类中的种类就稍微多了些,新增了 timeWindow、countWindow 方法,并且是还支持滑动窗口。

除了窗口方法的新增外,还支持大量的聚合操作方法,比如 reduce、fold、sum、min、max、minBy、maxBy、aggregate等方法(列举的这几个方法都支持多种参数的)。

最后就是它还有 asQueryableState() 方法,能够将 KeyedStream 发布为可查询的 ValueState 实例。

# SplitStream 如何使用及分析

SplitStream 这个类比较简单,它代表着数据分流后的数据流了,它有一个 select 方法可以选择分流后的哪种数据流了,通常它是结合 split使用的,对于单次分流来说还挺方便的。但是它是一个被废弃的类(Flink 1.7 后被废弃的,可以看下笔者之前写的一篇文章 Flink 从0到1学习—— Flink 不可以连续 Split(分流)? (opens new window) ),其实可以用 side output 来代替这种 split,后面文章中我们也会讲通过简单的案例来讲解一下该如何使用 side output做数据分流操作。

因为这个类的源码比较少,我们可以看下这个类的实现:

public class SplitStream<OUT> extends DataStream<OUT> {

    //构造方法
    protected SplitStream(DataStream<OUT> dataStream, OutputSelector<OUT> outputSelector) {
        super(dataStream.getExecutionEnvironment(), new SplitTransformation<OUT>(dataStream.getTransformation(), outputSelector));
    }

    //选择要输出哪种数据流
    public DataStream<OUT> select(String... outputNames) {
        return selectOutput(outputNames);
    }

    //上面那个 public 方法内部调用的就是这个方法,该方法是个 private 方法,对外隐藏了它是如何去找到特定的数据流。
    private DataStream<OUT> selectOutput(String[] outputNames) {
        for (String outName : outputNames) {
            if (outName == null) {
                throw new RuntimeException("Selected names must not be null");
            }
        }
        //构造了一个 SelectTransformation 对象
        SelectTransformation<OUT> selectTransform = new SelectTransformation<OUT>(this.getTransformation(), Lists.newArrayList(outputNames));
        //构造了一个 DataStream 对象
        return new DataStream<OUT>(this.getExecutionEnvironment(), selectTransform);
    }
}

# WindowedStream 如何使用及分析

虽然 WindowedStream 不是继承自 DataStream,并且我们在 3.1 节中也做了一定的讲解,但是当时没讲里面的Function,所以在这里刚好一起做一个补充。

在 WindowedStream 类中定义的属性有KeyedStream、WindowAssigner、Trigger、Evictor、allowedLateness 和lateDataOutputTag。

  • KeyedStream:代表着数据流,数据分组后再开 Window
  • WindowAssigner:Window 的组件之一
  • Trigger:Window 的组件之一
  • Evictor:Window 的组件之一(可选)
  • allowedLateness:用户指定的允许迟到时间长
  • lateDataOutputTag:数据延迟到达的 Side output,如果延迟数据没有设置任何标记,则会被丢弃

在 3.1 节中我们讲了上面的三个窗口组件 WindowAssigner、Trigger、Evictor,并教大家该如何使用,那么在这篇文章我就不再重复,那么接下来就来分析下其他几个的使用方式和其实现原理。

先来看下 allowedLateness 这个它可以在窗口后指定允许迟到的时间长,使用如下:

dataStream.keyBy(0)
    .timeWindow(Time.milliseconds(20))
    .allowedLateness(Time.milliseconds(2))

lateDataOutputTag 这个它将延迟到达的数据发送到由给定 OutputTag 标识的 side output(侧输出),当水印经过窗口末尾(并加上了允许的延迟后),数据就被认为是延迟了。

对于 keyed windows 有五个不同参数的 reduce 方法可以使用,如下:

//1、参数为 ReduceFunction
public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> function) {
    ...
    return reduce(function, new PassThroughWindowFunction<K, W, T>());
}

//2、参数为 ReduceFunction 和 WindowFunction
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) {
    ...
    return reduce(reduceFunction, function, resultType);
}

//3、参数为 ReduceFunction、WindowFunction 和 TypeInformation
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
    ...
    return input.transform(opName, resultType, operator);
}

//4、参数为 ReduceFunction 和 ProcessWindowFunction
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> function) {
    ...
    return reduce(reduceFunction, function, resultType);
}

//5、参数为 ReduceFunction、ProcessWindowFunction 和 TypeInformation
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
    ... 
    return input.transform(opName, resultType, operator);
}

除了 reduce 方法,还有六个不同参数的 fold 方法、aggregate 方法;两个不同参数的 apply 方法、process方法(其中你会发现这两个 apply 方法和 process 方法内部其实都隐式的调用了一个私有的 apply 方法);其实除了前面说的两个不同参数的apply 方法外,还有四个其他的 apply 方法,这四个方法也是参数不同,但是其实最终的是利用了 transform方法;还有的就是一些预定义的聚合方法比如sum、min、minBy、max、maxBy,它们的方法参数的个数不一致,这些预聚合的方法内部调用的其实都是私有的 aggregate方法,该方法允许你传入一个 AggregationFunction 参数。我们来看一个具体的实现:

//max
public SingleOutputStreamOperator<T> max(String field) {
    //内部调用私有的的 aggregate 方法
    return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAX, false, input.getExecutionConfig()));
}

//私有的 aggregate 方法
private SingleOutputStreamOperator<T> aggregate(AggregationFunction<T> aggregator) {
    //继续调用的是 reduce 方法
    return reduce(aggregator);
}

//该 reduce 方法内部其实又是调用了其他多个参数的 reduce 方法
public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> function) {
    ...
    function = input.getExecutionEnvironment().clean(function);
    return reduce(function, new PassThroughWindowFunction<K, W, T>());
}

从上面的方法调用过程,你会发现代码封装的很深,得需要你自己好好跟一下源码才可以了解更深些。

上面讲了这么多方法,你会发现 reduce 方法其实是用的蛮多的之一,那么就来看看该如何使用:

dataStream.keyBy(0)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
        @Override
        public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2)  {
            return value1;
        }
    })
    .print();

# AllWindowedStream 如何使用及分析

前面讲完了 WindowedStream,再来看看这个 AllWindowedStream 你会发现它的实现其实无太大区别,该类中的属性和方法都和前面WindowedStream 是一样的,然后我们就不再做过多的介绍,直接来看看该如何使用呢?

AllWindowedStream 这种场景下是不需要让数据流做 keyBy 分组操作,直接就进行 windowAll 操作,然后在 windowAll方法中传入 WindowAssigner 参数对象即可,然后返回的数据结果就是 AllWindowedStream 了,下面使用方式继续执行了AllWindowedStream 中的 reduce 方法来返回数据:

dataStream.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
    .reduce(new RichReduceFunction<Tuple2<String, Integer>>() {
        private static final long serialVersionUID = -6448847205314995812L;

        @Override
        public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
                Tuple2<String, Integer> value2) throws Exception {
            return value1;
        }
    });

# ConnectedStreams 如何使用及分析

ConnectedStreams 这个类定义是表示(可能)两个不同数据类型的数据连接流,该场景如果对一个数据流进行操作会直接影响另一个数据流,因此可以通过流连接来共享状态。比较常见的一个例子就是一个数据流(随时间变化的规则数据流)通过连接其他的数据流,这样另一个数据流就可以利用这些连接的规则数据流。

ConnectedStreams 在概念上可以认为和 Union 数据流是一样的。

在 ConnectedStreams 类中有三个属性:environment、inputStream1 和 inputStream2,该类中的方法如下:

images 在 ConnectedStreams 中可以通过 getFirstInput 获取连接的第一个流、通过 getSecondInput 获取连接的第二个流,同时它还含有六个 keyBy 方法来将连接后的数据流进行分组,这六个 keyBy 方法的参数各有不同。另外它还含有 map、flatMap、process 方法来处理数据(其中 map 和 flatMap 方法的参数分别使用的是 CoMapFunction 和 CoFlatMapFunction),其实如果你细看其方法里面的实现就会发现都是调用的 transform 方法。

上面讲完了 ConnectedStreams 类的基础定义,接下来我们来看下该类如何使用呢?

DataStream<Tuple2<Long, Long>> src1 = env.fromElements(new Tuple2<>(0L, 0L));    //流 1
DataStream<Tuple2<Long, Long>> src2 = env.fromElements(new Tuple2<>(0L, 0L));    //流 2
ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connected = src1.connect(src2);    //连接流 1 和流 2

//使用连接流的六种 keyBy 方法
ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedGroup1 = connected.keyBy(0, 0);
ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedGroup2 = connected.keyBy(new int[]{0}, new int[]{0});
ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedGroup3 = connected.keyBy("f0", "f0");
ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedGroup4 = connected.keyBy(new String[]{"f0"}, new String[]{"f0"});
ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedGroup5 = connected.keyBy(new FirstSelector(), new FirstSelector());
ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedGroup5 = connected.keyBy(new FirstSelector(), new FirstSelector(), Types.STRING);

//使用连接流的 map 方法
connected.map(new CoMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Object>() {
    private static final long serialVersionUID = 1L;

    @Override
    public Object map1(Tuple2<Long, Long> value) {
        return null;
    }

    @Override
    public Object map2(Tuple2<Long, Long> value) {
        return null;
    }
});

//使用连接流的 flatMap 方法
connected.flatMap(new CoFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {

    @Override
    public void flatMap1(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {}

    @Override
    public void flatMap2(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {}

}).name("testCoFlatMap")

//使用连接流的 process 方法
connected.process(new CoProcessFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
    @Override
    public void processElement1(Tuple2<Long, Long> value, Context ctx, Collector<Tuple2<Long, Long>> out) throws Exception {
        if (value.f0 < 3) {
            out.collect(value);
            ctx.output(sideOutputTag, "sideout1-" + String.valueOf(value));
        }
    }

    @Override
    public void processElement2(Tuple2<Long, Long> value, Context ctx, Collector<Tuple2<Long, Long>> out) throws Exception {
        if (value.f0 >= 3) {
            out.collect(value);
            ctx.output(sideOutputTag, "sideout2-" + String.valueOf(value));
        }
    }
});

# BroadcastStream 如何使用及分析

BroadcastStream 这个类定义是表示 broadcast state(广播状态)组成的数据流。通常这个 BroadcastStream数据流是通过调用 DataStream 中的 broadcast 方法才返回的,注意 BroadcastStream后面不能使用算子去操作这些流,唯一可以做的就是使用 KeyedStream/DataStream 的 connect 方法去连接BroadcastStream,连接之后的话就会返回一个 BroadcastConnectedStream 数据流。

在 BroadcastStream 中我们该如何使用呢?通常是在 DataStream 中使用 broadcast 方法,该方法需要传入一个 MapStateDescriptor 对象,可以看下该方法的实现如下:

public BroadcastStream<T> broadcast(final MapStateDescriptor<?, ?>... broadcastStateDescriptors) {
    Preconditions.checkNotNull(broadcastStateDescriptors);  //检查是否为空
    final DataStream<T> broadcastStream = setConnectionType(new BroadcastPartitioner<>());
    return new BroadcastStream<>(environment, broadcastStream, broadcastStateDescriptors);  //构建 BroadcastStream 对象,传入 env 环境、broadcastStream 和 broadcastStateDescriptors
}

上面方法传入的参数 broadcastStateDescriptors,我们可以像下面这样去定义一个 MapStateDescriptor 对象:

final MapStateDescriptor<Long, String> utterDescriptor = new MapStateDescriptor<>(
    "broadcast-state", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
);

# BroadcastConnectedStream 如何使用及分析

BroadcastConnectedStream 这个类定义是表示 keyed 或者 non-keyed 数据流和 BroadcastStream 数据流进行连接后组成的数据流。比如在 DataStream 中执行 connect 方法就可以连接两个数据流了,那么在 DataStream 中 connect 方法实现如下:

public <R> BroadcastConnectedStream<T, R> connect(BroadcastStream<R> broadcastStream) {
    return new BroadcastConnectedStream<>( //构造 BroadcastConnectedStream 对象
            environment,
            this,
            Preconditions.checkNotNull(broadcastStream),
            broadcastStream.getBroadcastStateDescriptor());
}

在这个 BroadcastConnectedStream 类中主要的方法有:

images 从图中可以看到四个 process 方法和一个 transform 私有方法,其中四个 process 方法也是参数不同,最后实际调用的方法就是这个私有的 transform 方法。

# QueryableStateStream 如何使用及分析

QueryableStateStream 该类代表着可查询的状态流。该类的定义如下:

public class QueryableStateStream<K, V> {

    //要查询的状态名称
    private final String queryableStateName;

    //状态的 Key 序列化器
    private final TypeSerializer<K> keySerializer;

    //状态的 descriptor 
    private final StateDescriptor<?, V> stateDescriptor;

    //构造器
    public QueryableStateStream(String queryableStateName, StateDescriptor<?, V> stateDescriptor, TypeSerializer<K> keySerializer) {

    }

    //返回可以查询状态的名称
    public String getQueryableStateName() {
        return queryableStateName;
    }

    //返回 key 序列化器
    public TypeSerializer<K> getKeySerializer() {
        return keySerializer;
    }

    //返回状态的 descriptor 
    public StateDescriptor<?, V> getStateDescriptor() {
        return stateDescriptor;
    }
}

在 KeyedStream 你可以通过 asQueryableState() 方法返回一个 QueryableStateStream 数据流,这个方法可以通过传入不同的参数来实现,主要的参数就是 queryableStateName 和 StateDescriptor(这个参数你可以传入 ValueStateDescriptor、FoldingStateDescriptor 和 ReducingStateDescriptor 三种)。

具体如何使用呢,我们来看个 demo:

ValueStateDescriptor<Tuple2<Integer, Long>> valueState = new ValueStateDescriptor<>(
    "any", source.getType(),    null);

QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
    source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
        private static final long serialVersionUID = 7480503339992214681L;

        @Override
        public Integer getKey(Tuple2<Integer, Long> value) {
            return value.f0;
        }
    }).asQueryableState("zhisheng", valueState);

# 小结

本节算是对 Flink DataStream 包下的所有常用的 Stream 做了个讲解,不仅从使用方式来介绍这些 Stream API 该如何使用,而且还给出了部分 demo,此外还剖析了部分 Stream 的代码结构及其内部部分方法的源码实现,从而能够让大家不仅仅是从表面上去使用这些 DataStream API,还能够对它们的实现原理有了解,这样就可以做到活学活用,并且还可以自己去做扩展。