# 如何自定义FlinkConnectors(Source和Sink)?

在前面文章 3.6 节中讲解了 Flink 中的 Data Source 和 Data Sink,然后介绍了 Flink 中自带的一些 Source 和 Sink 的 Connector,接着我们还有几篇实战会讲解了如何从 Kafka 处理数据写入到 Kafka、ElasticSearch 等,当然 Flink 还有一些其他的 Connector,我们这里就不一一介绍了,大家如果感兴趣的话可以去官网查看一下,如果对其代码实现比较感兴趣的话,也可以去看看其源码的实现。我们这篇文章来讲解一下如何自定义 Source 和 Sink Connector?这样我们后面再遇到什么样的需求都难不倒我们了。

# 如何自定义 Source Connector?

这里就演示一下如何自定义 Source 从 MySQL 中读取数据。

# 添加依赖

在 pom.xml 中添加 MySQL 依赖:

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.34</version>
</dependency>

# 数据库建表

数据库建表如下:


DROP TABLE IF EXISTS student; CREATE TABLE student ( id int(11) unsigned NOT NULL AUTO_INCREMENT, name varchar(25) COLLATE utf8_bin DEFAULT NULL, password varchar(25) COLLATE utf8_bin DEFAULT NULL, age int(10) DEFAULT NULL, PRIMARY KEY (id) ) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

# 数据库插入数据


INSERT INTO student VALUES ('1', 'zhisheng01', '123456', '18'), ('2', 'zhisheng02', '123', '17'), ('3', 'zhisheng03', '1234', '18'), ('4', 'zhisheng04', '12345', '16'); COMMIT;

# 新建实体类

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Student {
    public int id;
    public String name;
    public String password;
    public int age;
}

# 自定义 Source 类

SourceFromMySQL 是自定义的 Source 类,该类继承 RichSourceFunction,实现里面的 open、close、run、cancel 方法:

public class SourceFromMySQL extends RichSourceFunction<Student> {
    PreparedStatement ps;
    private Connection connection;

    /**
     * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接。
     *
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        connection = getConnection();
        String sql = "select * from Student;";
        ps = this.connection.prepareStatement(sql);
    }

    /**
     * 程序执行完毕就可以进行,关闭连接和释放资源的动作了
     *
     * @throws Exception
     */
    @Override
    public void close() throws Exception {
        super.close();
        if (connection != null) { //关闭连接和释放资源
            connection.close();
        }
        if (ps != null) {
            ps.close();
        }
    }

    /**
     * DataStream 调用一次 run() 方法用来获取数据
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void run(SourceContext<Student> ctx) throws Exception {
        ResultSet resultSet = ps.executeQuery();
        while (resultSet.next()) {
            Student student = new Student(
                    resultSet.getInt("id"),
                    resultSet.getString("name").trim(),
                    resultSet.getString("password").trim(),
                    resultSet.getInt("age"));
            ctx.collect(student);
        }
    }

    @Override
    public void cancel() {
    }

    private static Connection getConnection() {
        Connection con = null;
            try {
                Class.forName("com.mysql.jdbc.Driver");
                con = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8", "root", "123456");
            } catch (Exception e) {
                System.out.println("mysql get connection has exception , msg = " + e.getMessage());
            }
        return con;
    }
}


public class Main2 { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.addSource(new SourceFromMySQL()).print();

        env.execute("Flink add data sourc");
    }
}

运行 Flink 程序,控制台日志中可以看见打印的 student 信息。

images

# RichSourceFunction 使用及源码分析

从上面自定义的 Source 可以看到我们继承的就是这个 RichSourceFunction 类,其实也是可以使用 SourceFunction 函数来自定义 Source。 RichSourceFunction 函数比 SourceFunction 多了 open 方法(可以用来初始化)和获取应用上下文的方法,那么来了解一下该类。

images 它是一个抽象类,继承自 AbstractRichFunction,实现了 SourceFunction 接口,其子类有三个,两个是抽象类,在此基础上提供了更具体的实现,另一个是 ContinuousFileMonitoringFunction。

images

  • MessageAcknowledgingSourceBase :它针对的是数据源是消息队列的场景并且提供了基于 ID 的应答机制。
  • MultipleIdsMessageAcknowledgingSourceBase : 在 MessageAcknowledgingSourceBase 的基础上针对 ID 应答机制进行了更为细分的处理,支持两种 ID 应答模型:session id 和 unique message id。
  • ContinuousFileMonitoringFunction:这是单个(非并行)监视任务,它接受 FileInputFormat,并且根据 FileProcessingMode 和 FilePathFilter,它负责监视用户提供的路径;决定应该进一步读取和处理哪些文件;创建与这些文件对应的 FileInputSplit 拆分,将它们分配给下游任务以进行进一步处理。

除了上面使用 RichSourceFunction 和 SourceFunction 来自定义 Source,还可以继承 RichParallelSourceFunction 抽象类或实现 ParallelSourceFunction 接口来实现自定义 Source 函数。

# 如何自定义 Sink Connector?

下面将写一个 demo 教大家将从 Kafka Source 的数据 Sink 到 MySQL 中去

# 工具类

写了一个工具类往 Kafka 的 topic 中发送数据。

/**
 * 往kafka中写数据,可以使用这个main函数进行测试一下
 */
public class KafkaUtils2 {
    public static final String broker_list = "localhost:9092";
    public static final String topic = "student";  //kafka topic 需要和 flink 程序用同一个 topic

    public static void writeToKafka() throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", broker_list);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer producer = new KafkaProducer<String, String>(props);

        for (int i = 1; i <= 100; i++) {
            Student student = new Student(i, "zhisheng" + i, "password" + i, 18 + i);
            ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, JSON.toJSONString(student));
            producer.send(record);
            System.out.println("发送数据: " + JSON.toJSONString(student));
        }
        producer.flush();
    }

    public static void main(String[] args) throws InterruptedException {
        writeToKafka();
    }
}

# SinkToMySQL

该类就是 Sink Function,继承了 RichSinkFunction ,然后重写了里面的方法,在 invoke 方法中将数据插入到 MySQL 中。

public class SinkToMySQL extends RichSinkFunction<Student> {
    PreparedStatement ps;
    private Connection connection;

    /**
     * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接
     *
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        connection = getConnection();
        String sql = "insert into Student(id, name, password, age) values(?, ?, ?, ?);";
        ps = this.connection.prepareStatement(sql);
    }

    @Override
    public void close() throws Exception {
        super.close();
        //关闭连接和释放资源
        if (connection != null) {
            connection.close();
        }
        if (ps != null) {
            ps.close();
        }
    }

    /**
     * 每条数据的插入都要调用一次 invoke() 方法
     *
     * @param value
     * @param context
     * @throws Exception
     */
    @Override
    public void invoke(Student value, Context context) throws Exception {
        //组装数据,执行插入操作
        ps.setInt(1, value.getId());
        ps.setString(2, value.getName());
        ps.setString(3, value.getPassword());
        ps.setInt(4, value.getAge());
        ps.executeUpdate();
    }

    private static Connection getConnection() {
        Connection con = null;
        try {
            Class.forName("com.mysql.jdbc.Driver");
            con = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8", "root", "root123456");
        } catch (Exception e) {
            System.out.println("-----------mysql get connection has exception , msg = "+ e.getMessage());
        }
        return con;
    }
}

这里的 source 是从 Kafka 读取数据的,然后 Flink 从 Kafka 读取到数据(JSON)后用阿里 fastjson 来解析成 Student 对象,然后在 addSink 中使用我们创建的 SinkToMySQL,这样就可以把数据存储到 MySQL 了。

public class Main3 {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("zookeeper.connect", "localhost:2181");
        props.put("group.id", "metric-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "latest");

        SingleOutputStreamOperator<Student> student = env.addSource(new FlinkKafkaConsumer011<>(
                "student",   //这个 kafka topic 需要和上面的工具类的 topic 一致
                new SimpleStringSchema(),
                props)).setParallelism(1)
                .map(string -> JSON.parseObject(string, Student.class)); //Fastjson 解析字符串成 student 对象

        student.addSink(new SinkToMySQL()); //数据 sink 到 mysql

        env.execute("Flink add sink");
    }
}

# 结果

运行 Flink 程序,然后再运行 KafkaUtils2.java 工具类,这样就可以了。

如果数据插入成功了,那么查看下我们的数据库:

images 数据库中已经插入了 100 条我们从 Kafka 发送的数据了。证明我们的 SinkToMySQL 起作用了。

# RichSinkFunction 使用及源码分析

通过上面的 demo 可以发现继承 RichSinkFunction 类,然后实现内部的 open、close、invoke 方法就可以实现自定义 Sink 了,RichSinkFunction 的类图如下。

images 该类继承了 AbstractRichFunction 抽象类,实现了 SinkFunction 接口,同样该类也是一个 Rich 函数,它比 SinkFunction 多了 open(可以初始化数据) 和 getRuntimeContext(可以获取上下文)方法,如果不需要这两个方法,同样也是可以实现 SinkFunction 接口来自定义 Sink 的。

# 小结与反思

本节讲了 Flink 中该如何去自定义 Connector,包括 Source 和 Sink,每种也都有提供样例去教大家如何操作。

本节相关代码链接: