当前位置: 首页 > >

Flink JDBCSink使用及源码解析

发布时间:

以下所有都是基于Flink 1.12.0版本


Flink JDBCSink的使用

flink提供了JDBCSink方便我们写入数据库,以下是使用案例:


pom依赖

需要引入flink-connector-jdbc的依赖。另外,我这里是写入mysql,所以还引入了mysql的驱动包



org.apache.flink
flink-connector-jdbc_2.12
1.12.0



mysql
mysql-connector-java
5.1.45


案例代码

package com.upupfeng.sink;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
* https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/jdbc.html
*
* @author mawf
*/
public class JDBCSinkDemo {

public static void main(String[] args) throws Exception {

LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

// 从配置文件中读取配置信息
ParameterTool parameterTool = ParameterTool.fromPropertiesFile("D:\upupfeng\my_code\flink-learning\resources\application.properties");
String url = parameterTool.get("url");
String driver = parameterTool.get("driver");
String user = parameterTool.get("user");
String password = parameterTool.get("password");

// sql语句,用问号做占位符
String sql = "insert into tb_traffic_statistic_min(starttime, city_name, distinct_user_count, total_traffic) values(?, ?, ?, ?)";
// 伪造数据
Tuple4 bjTp = Tuple4.of("2020-12-01 00:00:00", "北京", 10, 2.3d);

env
.fromElements(bjTp)
.returns(Types.TUPLE(Types.STRING, Types.STRING, Types.INT, Types.DOUBLE))
// 添加JDBCSink
.addSink(
JdbcSink.sink(
sql, // sql语句
// 设置占位符对应的字段值
(ps, tp) -> {
ps.setString(1, tp.f0);
ps.setString(2, tp.f1);
ps.setInt(3, tp.f2);
ps.setDouble(4, tp.f3);
},
// 传递jdbc的连接属性
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName(driver)
.withUrl(url)
.withUsername(user)
.withPassword(password)
.build()
)
);

// 执行
env.execute();
}
}


JDBCSink源码解析

先从JdbcSink.sink来看,


JdbcSink

这是一个类,提供了静态方法供我们创建JDBC的SinkFunction。


提供了两个重载的sink方法。


public static SinkFunction sink(String sql, JdbcStatementBuilder statementBuilder, JdbcConnectionOptions connectionOptions);

public static SinkFunction sink(
String sql,
JdbcStatementBuilder statementBuilder,
JdbcExecutionOptions executionOptions,
JdbcConnectionOptions connectionOptions);

其中一个方法是对另一个方法的executionOptions参数提供了默认实现。


我们就看一下参数最全的sink方法:


/**
* 创建JDBC Sink
* @param sql 任意DML查询(例如插入,update,upsert)
* @param statementBuilder 根据每一个查询在java.sql.PreparedStatement上设置参数
* @param 数据类型
* @param executionOptions 执行时配置的参数,如批大小、重试等
* @param connectionOptions 连接参数,如jdbc url等
*/
public static SinkFunction sink(
String sql,
JdbcStatementBuilder statementBuilder,
JdbcExecutionOptions executionOptions,
JdbcConnectionOptions connectionOptions) {
// 创建了一个GenericJdbcSinkFunction对象,这个类就是JDBC对应的SinkFunction实现类
// JdbcBatchingOutputFormat类是批处理OutputFormat的一个实现,封装了攒批处理的逻辑
return new GenericJdbcSinkFunction<>(new JdbcBatchingOutputFormat<>(
// 简单的JDBC连接提供者,可以创建JDBC连接
new SimpleJdbcConnectionProvider(connectionOptions),
// 执行参数
executionOptions,
// 执行PreparedStatement的方式
context -> {
Preconditions.checkState(!context.getExecutionConfig().isObjectReuseEnabled(),
"objects can not be reused with JDBC sink function");
// 使用一个静态方法,创建了简单的JdbcBatchStatementExecutor类:SimpleBatchStatementExecutor
return JdbcBatchStatementExecutor.simple(sql, statementBuilder, Function.identity());
},
// 记录的提取方式
JdbcBatchingOutputFormat.RecordExtractor.identity()
));
}

看下GenericJdbcSinkFunction类


GenericJdbcSinkFunction

JDBC的通用SinkFunction。


继承了RichSinkFunction接口,是一个标准的sink实现类。


// outputFormat内部封装了真正的处理逻辑
private final AbstractJdbcOutputFormat outputFormat;

// 构造器只有AbstractJdbcOutputFormat一个参数
public GenericJdbcSinkFunction(@Nonnull AbstractJdbcOutputFormat outputFormat) {
this.outputFormat = Preconditions.checkNotNull(outputFormat);
}

// open方法
public void open(Configuration parameters) throws Exception {
super.open(parameters);
RuntimeContext ctx = getRuntimeContext();
// 将context传递给outputFormat
outputFormat.setRuntimeContext(ctx);
// 调用outputFormat的open方法
outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
}

// 每条记录进来的时候调用outputFormat的writeRecord方法
@Override
public void invoke(T value, Context context) throws IOException {
outputFormat.writeRecord(value);
}

// 状态相关的
@Override
public void initializeState(FunctionInitializationContext context) {
}

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
outputFormat.flush();
}

// clost方法中调用了outputFormat的close方法
public void close() {
outputFormat.close();
}

从以上代码可以看出,基本上处理逻辑都封装了AbstractJdbcOutputFormat类中。


AbstractJdbcOutputFormat / JdbcBatchingOutputFormat

AbstractJdbcOutputFormat 是OutputFormat基础的抽象类,提供了一些方法的默认实现。


我们这里用到的是批提交对应的实现类JdbcBatchingOutputFormat,就直接看JdbcBatchingOutputFormat类了


JdbcBatchingOutputFormat源码:


// JDBC outputFormat支持在将记录写入数据库之前批处理记录
// JdbcBatchingOutputFormat继承了AbstractJdbcOutputFormat类。
public class JdbcBatchingOutputFormat> extends AbstractJdbcOutputFormat {

// 从给定参数中提取值的接口。
public interface RecordExtractor extends Function, Serializable {
// 直接原样返回
static RecordExtractor identity() {
return x -> x;
}
}

// 创建Statement执行类的工厂
public interface StatementExecutorFactory> extends Function, Serializable {
}

// 执行参数
private final JdbcExecutionOptions executionOptions;
// 创建执行类的工厂
private final StatementExecutorFactory statementExecutorFactory;
// 记录提取
private final RecordExtractor jdbcRecordExtractor;
// 执行statement的类
private transient JdbcExec jdbcStatementExecutor;
// 批大小
private transient int batchCount = 0;
private transient volatile boolean closed = false;
// 用于定时提交的定时器
private transient ScheduledExecutorService scheduler;
private transient ScheduledFuture scheduledFuture;
private transient volatile Exception flushException;

// 构造函数
public JdbcBatchingOutputFormat(
// JDBC连接提供者
@Nonnull JdbcConnectionProvider connectionProvider,
// 执行参数
@Nonnull JdbcExecutionOptions executionOptions,
// 创建statement执行类的工厂
@Nonnull StatementExecutorFactory statementExecutorFactory,
// 记录提取
@Nonnull RecordExtractor recordExtractor) {
super(connectionProvider);
this.executionOptions = checkNotNull(executionOptions);
this.statementExecutorFactory = checkNotNull(statementExecutorFactory);
this.jdbcRecordExtractor = checkNotNull(recordExtractor);
}

/**
* 连接到目标数据库,并初始化准备好的语句
*
* @param taskNumber 并行实例数
*/
@Override
public void open(int taskNumber, int numTasks) throws IOException {
super.open(taskNumber, numTasks);
// 根据给定的statement执行类工厂来创建 statement执行类
jdbcStatementExecutor = createAndOpenStatementExecutor(statementExecutorFactory);
// 设置一个定时器,来定时提交数据到数据库。
// 相当于就是有两种方式可以触发提交:一种是到达批大小,一种是到了定时的时间。
if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
// 创建一个调度线程池
this.scheduler = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("jdbc-upsert-output-format"));
// 注册定时调度
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
synchronized (JdbcBatchingOutputFormat.this) {
if (!closed) {
try {
// 调用flush方法,将数据刷入数据库
flush();
} catch (Exception e) {
flushException = e;
}
}
}
}, executionOptions.getBatchIntervalMs(), executionOptions.getBatchIntervalMs(), TimeUnit.MILLISECONDS);
}
}

// 根据给定的工厂类创建对应的statement执行类
private JdbcExec createAndOpenStatementExecutor(StatementExecutorFactory statementExecutorFactory) throws IOException {
// 创建一个执行类
// 对于batch output format来说是创建了一个SimpleBatchStatementExecutor类,我们稍后看
JdbcExec exec = statementExecutorFactory.apply(getRuntimeContext());
try {
// 从连接中创建一个statement
exec.prepareStatements(connection);
} catch (SQLException e) {
throw new IOException("unable to open JDBC writer", e);
}
return exec;
}

// 写每条记录。这个方法就是在GenericJdbcSinkFunction的invoke方法中被调用,处理每一条记录
@Override
public final synchronized void writeRecord(In record) throws IOException {
checkFlushException();

try {
// 将数据添加到batch中
addToBatch(record, jdbcRecordExtractor.apply(record));
// batch数量增加
batchCount++;
// 如果批大小够了,触发flush方法提交数据
if (executionOptions.getBatchSize() > 0 && batchCount >= executionOptions.getBatchSize()) {
flush();
}
} catch (Exception e) {
throw new IOException("Writing records to JDBC failed.", e);
}
}

// 添加记录到batch中
protected void addToBatch(In original, JdbcIn extracted) throws SQLException {
// 调用了statement执行类的addToBatch方法,我们稍后看
jdbcStatementExecutor.addToBatch(extracted);
}

// 提交方法
@Override
public synchronized void flush() throws IOException {
checkFlushException();

// 根据重试次数循环。如果失败,会重试。如果成功,就直接break了
for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
try {
// 执行提交
attemptFlush();
// batch count置为0
batchCount = 0;
break;
} catch (SQLException e) {
LOG.error("JDBC executeBatch error, retry times = {}", i, e);
if (i >= executionOptions.getMaxRetries()) {
throw new IOException(e);
}
try {
// 如果是因为连接失效导致的。则重新获取连接
if (!connection.isValid(CONNECTION_CHECK_TIMEOUT_SECONDS)) {
// 重新获取连接
connection = connectionProvider.reestablishConnection();
// 关闭执行器的statement
jdbcStatementExecutor.closeStatements();
// 根据连接创建statement
jdbcStatementExecutor.prepareStatements(connection);
}
} catch (Exception excpetion) {
LOG.error("JDBC connection is not valid, and reestablish connection failed.", excpetion);
throw new IOException("Reestablish JDBC connection failed", excpetion);
}
try {
Thread.sleep(1000 * i);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IOException("unable to flush; interrupted while doing another attempt", e);
}
}
}
}

// 执行提交
protected void attemptFlush() throws SQLException {
jdbcStatementExecutor.executeBatch();
}


// close方法
public synchronized void close() {
if (!closed) {
closed = true;

if (this.scheduledFuture != null) {
// 关闭定时器
scheduledFuture.cancel(false);
this.scheduler.shutdown();
}
// 如果batch中还有数据,则提交
if (batchCount > 0) {
try {
flush();
} catch (Exception e) {
LOG.warn("Writing records to JDBC failed.", e);
throw new RuntimeException("Writing records to JDBC failed.", e);
}
}
// 关闭statement
try {
if (jdbcStatementExecutor != null) {
jdbcStatementExecutor.closeStatements();
}
} catch (SQLException e) {
LOG.warn("Close JDBC writer failed.", e);
}
}
super.close();
checkFlushException();
}

......
}

以上就是outputFormat的代码,提供了攒批、按批提交、定时提交的方法。


内部真正执行statement,还是调用JdbcBatchStatementExecutor来实现的。接下来看看JdbcBatchStatementExecutor


JdbcBatchStatementExecutor

这个接口是用于批量执行给定的JDBC语句以获取累积的记录。就是按批提交。


我们看下他的实现类SimpleBatchStatementExecutor


SimpleBatchStatementExecutor的源码:


class SimpleBatchStatementExecutor implements JdbcBatchStatementExecutor {

// sql
private final String sql;
// statement的build,用于将参数设置到占位符上
private final JdbcStatementBuilder parameterSetter;
// 值转换
private final Function valueTransformer;
// 使用List来存储批
private final List batch;

// 构造器
SimpleBatchStatementExecutor(String sql, JdbcStatementBuilder statementBuilder, Function valueTransformer) {
this.sql = sql;
this.parameterSetter = statementBuilder;
this.valueTransformer = valueTransformer;
this.batch = new ArrayList<>();
}

// 调用prepareStatement预编译sql
@Override
public void prepareStatements(Connection connection) throws SQLException {
this.st = connection.prepareStatement(sql);
}

// 将记录添加到批中。在outputFormat中调用这个方法攒批
@Override
public void addToBatch(T record) {
batch.add(valueTransformer.apply(record));
}

// 执行批、在outputFormat中调用,执行批提交
@Override
public void executeBatch() throws SQLException {
if (!batch.isEmpty()) {
// 遍历批
for (V r : batch) {
// 这里是用statement的引用,将值赋到statment上
parameterSetter.accept(st, r);
// 调用statament的addBatch方法
st.addBatch();
}
// 执行提交
st.executeBatch();
// 清空批
batch.clear();
}
}

// 关闭statement
@Override
public void closeStatements() throws SQLException {
if (st != null) {
st.close();
st = null;
}
}
}

从上面的所有代码,就可以了解JdbcSink的提交原理了。


接下来在看下两个配置的类:JdbcExecutionOptions和JdbcConnectionOptions。一个是执行的配置,一个是连接的配置。


JdbcConnectionOptions

有url、driverName、username、password四个配置


内部提供了builder类用于创建JdbcConnectionOptions


public class JdbcConnectionOptions implements Serializable {

protected final String url;
protected final String driverName;
@Nullable
protected final String username;
@Nullable
protected final String password;

public static class JdbcConnectionOptionsBuilder {
public JdbcConnectionOptions build() {
return new JdbcConnectionOptions(url, driverName, username, password);
}

}
}


JdbcExecutionOptions

执行配置。有batchIntervalMs、batchSize、maxRetries三个配置。


内部也提供了builder来创建JdbcExecutionOptions


public class JdbcExecutionOptions implements Serializable {
public static final int DEFAULT_MAX_RETRY_TIMES = 3;
private static final int DEFAULT_INTERVAL_MILLIS = 0;
public static final int DEFAULT_SIZE = 5000;

// 自动提交批的时间间隔。毫秒值。默认是0,默认不会定时提交
private final long batchIntervalMs;
// 批大小。默认的批大小为5000
private final int batchSize;
// 重试次数。默认为3
private final int maxRetries;

public static final class Builder {
public JdbcExecutionOptions build() {
return new JdbcExecutionOptions(intervalMs, size, maxRetries);
}
}
}

总结

以上就是JdbcSink的在Stream API中的使用和部分JdbcSink的源码。


这个JdbcSink写的很好,可以覆盖一部分场景。


但是有时候并不是很满足我们的要求,我们可以参考这个JdbcSink进行改造、二次开发。


参考

官方JDBC Connector文档https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/jdbc.html


flink-connector-jdbc_2.12-1.12.0.jar源代码



友情链接: