博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink的JDBCOutputFormat
阅读量:5909 次
发布时间:2019-06-19

本文共 18191 字,大约阅读时间需要 60 分钟。

  hot3.png

本文主要研究一下flink的JDBCOutputFormat

JDBCOutputFormat

flink-jdbc_2.11-1.7.0-sources.jar!/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java

/** * OutputFormat to write Rows into a JDBC database. * The OutputFormat has to be configured using the supplied OutputFormatBuilder. * * @see Row * @see DriverManager */public class JDBCOutputFormat extends RichOutputFormat
{ private static final long serialVersionUID = 1L; static final int DEFAULT_BATCH_INTERVAL = 5000; private static final Logger LOG = LoggerFactory.getLogger(JDBCOutputFormat.class); private String username; private String password; private String drivername; private String dbURL; private String query; private int batchInterval = DEFAULT_BATCH_INTERVAL; private Connection dbConn; private PreparedStatement upload; private int batchCount = 0; private int[] typesArray; public JDBCOutputFormat() { } @Override public void configure(Configuration parameters) { } /** * Connects to the target database and initializes the prepared statement. * * @param taskNumber The number of the parallel instance. * @throws IOException Thrown, if the output could not be opened due to an * I/O problem. */ @Override public void open(int taskNumber, int numTasks) throws IOException { try { establishConnection(); upload = dbConn.prepareStatement(query); } catch (SQLException sqe) { throw new IllegalArgumentException("open() failed.", sqe); } catch (ClassNotFoundException cnfe) { throw new IllegalArgumentException("JDBC driver class not found.", cnfe); } } private void establishConnection() throws SQLException, ClassNotFoundException { Class.forName(drivername); if (username == null) { dbConn = DriverManager.getConnection(dbURL); } else { dbConn = DriverManager.getConnection(dbURL, username, password); } } /** * Adds a record to the prepared statement. * *

When this method is called, the output format is guaranteed to be opened. * *

WARNING: this may fail when no column types specified (because a best effort approach is attempted in order to * insert a null value but it's not guaranteed that the JDBC driver handles PreparedStatement.setObject(pos, null)) * * @param row The records to add to the output. * @see PreparedStatement * @throws IOException Thrown, if the records could not be added due to an I/O problem. */ @Override public void writeRecord(Row row) throws IOException { if (typesArray != null && typesArray.length > 0 && typesArray.length != row.getArity()) { LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array..."); } try { if (typesArray == null) { // no types provided for (int index = 0; index < row.getArity(); index++) { LOG.warn("Unknown column type for column {}. Best effort approach to set its value: {}.", index + 1, row.getField(index)); upload.setObject(index + 1, row.getField(index)); } } else { // types provided for (int index = 0; index < row.getArity(); index++) { if (row.getField(index) == null) { upload.setNull(index + 1, typesArray[index]); } else { // casting values as suggested by http://docs.oracle.com/javase/1.5.0/docs/guide/jdbc/getstart/mapping.html switch (typesArray[index]) { case java.sql.Types.NULL: upload.setNull(index + 1, typesArray[index]); break; case java.sql.Types.BOOLEAN: case java.sql.Types.BIT: upload.setBoolean(index + 1, (boolean) row.getField(index)); break; case java.sql.Types.CHAR: case java.sql.Types.NCHAR: case java.sql.Types.VARCHAR: case java.sql.Types.LONGVARCHAR: case java.sql.Types.LONGNVARCHAR: upload.setString(index + 1, (String) row.getField(index)); break; case java.sql.Types.TINYINT: upload.setByte(index + 1, (byte) row.getField(index)); break; case java.sql.Types.SMALLINT: upload.setShort(index + 1, (short) row.getField(index)); break; case java.sql.Types.INTEGER: upload.setInt(index + 1, (int) row.getField(index)); break; case java.sql.Types.BIGINT: upload.setLong(index + 1, (long) row.getField(index)); break; case java.sql.Types.REAL: upload.setFloat(index + 1, (float) row.getField(index)); break; case java.sql.Types.FLOAT: case java.sql.Types.DOUBLE: upload.setDouble(index + 1, (double) row.getField(index)); break; case java.sql.Types.DECIMAL: case java.sql.Types.NUMERIC: upload.setBigDecimal(index + 1, (java.math.BigDecimal) row.getField(index)); break; case java.sql.Types.DATE: upload.setDate(index + 1, (java.sql.Date) row.getField(index)); break; case java.sql.Types.TIME: upload.setTime(index + 1, (java.sql.Time) row.getField(index)); break; case java.sql.Types.TIMESTAMP: upload.setTimestamp(index + 1, (java.sql.Timestamp) row.getField(index)); break; case java.sql.Types.BINARY: case java.sql.Types.VARBINARY: case java.sql.Types.LONGVARBINARY: upload.setBytes(index + 1, (byte[]) row.getField(index)); break; default: upload.setObject(index + 1, row.getField(index)); LOG.warn("Unmanaged sql type ({}) for column {}. Best effort approach to set its value: {}.", typesArray[index], index + 1, row.getField(index)); // case java.sql.Types.SQLXML // case java.sql.Types.ARRAY: // case java.sql.Types.JAVA_OBJECT: // case java.sql.Types.BLOB: // case java.sql.Types.CLOB: // case java.sql.Types.NCLOB: // case java.sql.Types.DATALINK: // case java.sql.Types.DISTINCT: // case java.sql.Types.OTHER: // case java.sql.Types.REF: // case java.sql.Types.ROWID: // case java.sql.Types.STRUC } } } } upload.addBatch(); batchCount++; } catch (SQLException e) { throw new RuntimeException("Preparation of JDBC statement failed.", e); } if (batchCount >= batchInterval) { // execute batch flush(); } } void flush() { try { upload.executeBatch(); batchCount = 0; } catch (SQLException e) { throw new RuntimeException("Execution of JDBC statement failed.", e); } } int[] getTypesArray() { return typesArray; } /** * Executes prepared statement and closes all resources of this instance. * * @throws IOException Thrown, if the input could not be closed properly. */ @Override public void close() throws IOException { if (upload != null) { flush(); // close the connection try { upload.close(); } catch (SQLException e) { LOG.info("JDBC statement could not be closed: " + e.getMessage()); } finally { upload = null; } } if (dbConn != null) { try { dbConn.close(); } catch (SQLException se) { LOG.info("JDBC connection could not be closed: " + se.getMessage()); } finally { dbConn = null; } } } public static JDBCOutputFormatBuilder buildJDBCOutputFormat() { return new JDBCOutputFormatBuilder(); } //......}

  • JDBCOutputFormat继承了RichOutputFormat,这里的泛型为org.apache.flink.types.Row
  • open的时候调用了establishConnection来加载驱动,初始化dbConn,然后调用dbConn.prepareStatement(query)来获取upload(PreparedStatement)
  • writeRecord方法先判断是否有提供typesArray,没有的话则使用setObject来设置值,有点话则根据对应的类型进行转换,这里支持了多种java.sql.Types里头的类型
  • writeRecord采取的是PreparedStatement.addBatch操作,当batchCount大于等于batchInterval(默认5000),会执行flush操作,也就是调用PreparedStatement.executeBatch方法,然后重置batchCount;为了以防数据没达到batchInterval而未能提交,在close的时候会再次执行flush操作,然后才关闭PreparedStatement、Connection
  • JDBCOutputFormat提供了一个JDBCOutputFormatBuilder,可以用来方便构建JDBCOutputFormat

Row

flink-core-1.7.0-sources.jar!/org/apache/flink/types/Row.java

/** * A Row can have arbitrary number of fields and contain a set of fields, which may all be * different types. The fields in Row can be null. Due to Row is not strongly typed, Flink's * type extraction mechanism can't extract correct field types. So that users should manually * tell Flink the type information via creating a {@link RowTypeInfo}. * * 

* The fields in the Row can be accessed by position (zero-based) {@link #getField(int)}. And can * set fields by {@link #setField(int, Object)}. *

* Row is in principle serializable. However, it may contain non-serializable fields, * in which case serialization will fail. * */@PublicEvolvingpublic class Row implements Serializable{ private static final long serialVersionUID = 1L; /** The array to store actual values. */ private final Object[] fields; /** * Create a new Row instance. * @param arity The number of fields in the Row */ public Row(int arity) { this.fields = new Object[arity]; } /** * Get the number of fields in the Row. * @return The number of fields in the Row. */ public int getArity() { return fields.length; } /** * Gets the field at the specified position. * @param pos The position of the field, 0-based. * @return The field at the specified position. * @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or larger than the number of fields. */ public Object getField(int pos) { return fields[pos]; } /** * Sets the field at the specified position. * * @param pos The position of the field, 0-based. * @param value The value to be assigned to the field at the specified position. * @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or larger than the number of fields. */ public void setField(int pos, Object value) { fields[pos] = value; } @Override public String toString() { StringBuilder sb = new StringBuilder(); for (int i = 0; i < fields.length; i++) { if (i > 0) { sb.append(","); } sb.append(StringUtils.arrayAwareToString(fields[i])); } return sb.toString(); } @Override public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } Row row = (Row) o; return Arrays.deepEquals(fields, row.fields); } @Override public int hashCode() { return Arrays.deepHashCode(fields); } /** * Creates a new Row and assigns the given values to the Row's fields. * This is more convenient than using the constructor. * *

For example: * *

	 *     Row.of("hello", true, 1L);}	 * 
* instead of *
	 *     Row row = new Row(3);	 *     row.setField(0, "hello");	 *     row.setField(1, true);	 *     row.setField(2, 1L);	 * 
* */ public static Row of(Object... values) { Row row = new Row(values.length); for (int i = 0; i < values.length; i++) { row.setField(i, values[i]); } return row; } /** * Creates a new Row which copied from another row. * This method does not perform a deep copy. * * @param row The row being copied. * @return The cloned new Row */ public static Row copy(Row row) { final Row newRow = new Row(row.fields.length); System.arraycopy(row.fields, 0, newRow.fields, 0, row.fields.length); return newRow; } /** * Creates a new Row with projected fields from another row. * This method does not perform a deep copy. * * @param fields fields to be projected * @return the new projected Row */ public static Row project(Row row, int[] fields) { final Row newRow = new Row(fields.length); for (int i = 0; i < fields.length; i++) { newRow.fields[i] = row.fields[fields[i]]; } return newRow; }}
  • Row是JDBCOutputFormat的writeRecord的类型,它里头使用Object数据来存取字段值,同时也提供了诸如of、copy、project等静态方法

JDBCOutputFormatBuilder

flink-jdbc_2.11-1.7.0-sources.jar!/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java

/**	 * Builder for a {@link JDBCOutputFormat}.	 */	public static class JDBCOutputFormatBuilder {		private final JDBCOutputFormat format;		protected JDBCOutputFormatBuilder() {			this.format = new JDBCOutputFormat();		}		public JDBCOutputFormatBuilder setUsername(String username) {			format.username = username;			return this;		}		public JDBCOutputFormatBuilder setPassword(String password) {			format.password = password;			return this;		}		public JDBCOutputFormatBuilder setDrivername(String drivername) {			format.drivername = drivername;			return this;		}		public JDBCOutputFormatBuilder setDBUrl(String dbURL) {			format.dbURL = dbURL;			return this;		}		public JDBCOutputFormatBuilder setQuery(String query) {			format.query = query;			return this;		}		public JDBCOutputFormatBuilder setBatchInterval(int batchInterval) {			format.batchInterval = batchInterval;			return this;		}		public JDBCOutputFormatBuilder setSqlTypes(int[] typesArray) {			format.typesArray = typesArray;			return this;		}		/**		 * Finalizes the configuration and checks validity.		 *		 * @return Configured JDBCOutputFormat		 */		public JDBCOutputFormat finish() {			if (format.username == null) {				LOG.info("Username was not supplied.");			}			if (format.password == null) {				LOG.info("Password was not supplied.");			}			if (format.dbURL == null) {				throw new IllegalArgumentException("No database URL supplied.");			}			if (format.query == null) {				throw new IllegalArgumentException("No query supplied.");			}			if (format.drivername == null) {				throw new IllegalArgumentException("No driver supplied.");			}			return format;		}	}
  • JDBCOutputFormatBuilder提供了对username、password、dbURL、query、drivername、batchInterval、typesArray这几个属性的builder方法

JDBCAppendTableSink

flink-jdbc_2.11-1.7.0-sources.jar!/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java

/** * An at-least-once Table sink for JDBC. * * 

The mechanisms of Flink guarantees delivering messages at-least-once to this sink (if * checkpointing is enabled). However, one common use case is to run idempotent queries * (e.g., REPLACE or INSERT OVERWRITE) to upsert into the database and * achieve exactly-once semantic.

*/public class JDBCAppendTableSink implements AppendStreamTableSink
, BatchTableSink
{ private final JDBCOutputFormat outputFormat; private String[] fieldNames; private TypeInformation[] fieldTypes; JDBCAppendTableSink(JDBCOutputFormat outputFormat) { this.outputFormat = outputFormat; } public static JDBCAppendTableSinkBuilder builder() { return new JDBCAppendTableSinkBuilder(); } @Override public void emitDataStream(DataStream
dataStream) { dataStream .addSink(new JDBCSinkFunction(outputFormat)) .name(TableConnectorUtil.generateRuntimeName(this.getClass(), fieldNames)); } @Override public void emitDataSet(DataSet
dataSet) { dataSet.output(outputFormat); } @Override public TypeInformation
getOutputType() { return new RowTypeInfo(fieldTypes, fieldNames); } @Override public String[] getFieldNames() { return fieldNames; } @Override public TypeInformation
[] getFieldTypes() { return fieldTypes; } @Override public TableSink
configure(String[] fieldNames, TypeInformation
[] fieldTypes) { int[] types = outputFormat.getTypesArray(); String sinkSchema = String.join(", ", IntStream.of(types).mapToObj(JDBCTypeUtil::getTypeName).collect(Collectors.toList())); String tableSchema = String.join(", ", Stream.of(fieldTypes).map(JDBCTypeUtil::getTypeName).collect(Collectors.toList())); String msg = String.format("Schema of output table is incompatible with JDBCAppendTableSink schema. " + "Table schema: [%s], sink schema: [%s]", tableSchema, sinkSchema); Preconditions.checkArgument(fieldTypes.length == types.length, msg); for (int i = 0; i < types.length; ++i) { Preconditions.checkArgument( JDBCTypeUtil.typeInformationToSqlType(fieldTypes[i]) == types[i], msg); } JDBCAppendTableSink copy; try { copy = new JDBCAppendTableSink(InstantiationUtil.clone(outputFormat)); } catch (IOException | ClassNotFoundException e) { throw new RuntimeException(e); } copy.fieldNames = fieldNames; copy.fieldTypes = fieldTypes; return copy; } @VisibleForTesting JDBCOutputFormat getOutputFormat() { return outputFormat; }}
  • JDBCAppendTableSink里头用到了JDBCOutputFormat,它实现了AppendStreamTableSink以及BatchTableSink接口
  • 它的emitDataStream方法会给传入的dataStream设置JDBCSinkFunction的sink(JDBCSinkFunction);而emitDataSet方法则对dataSet设置output
  • 这里实现了TableSink(BatchTableSink声明实现TableSink)的getOutputType、getFieldNames、getFieldTypes、configure方法;configure方法这里主要是根据JDBCOutputFormat创建了JDBCAppendTableSink

JDBCSinkFunction

flink-jdbc_2.11-1.7.0-sources.jar!/org/apache/flink/api/java/io/jdbc/JDBCSinkFunction.java

class JDBCSinkFunction extends RichSinkFunction
implements CheckpointedFunction { final JDBCOutputFormat outputFormat; JDBCSinkFunction(JDBCOutputFormat outputFormat) { this.outputFormat = outputFormat; } @Override public void invoke(Row value) throws Exception { outputFormat.writeRecord(value); } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { outputFormat.flush(); } @Override public void initializeState(FunctionInitializationContext context) throws Exception { } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); RuntimeContext ctx = getRuntimeContext(); outputFormat.setRuntimeContext(ctx); outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks()); } @Override public void close() throws Exception { outputFormat.close(); super.close(); }}
  • JDBCSinkFunction继承了RichSinkFunction,同时也实现了CheckpointedFunction接口;invoke方法使用的是JDBCOutputFormat.writeRecord方法,而snapshotState则是调用了JDBCOutputFormat.flush来及时提交记录

小结

  • JDBCOutputFormat继承了RichOutputFormat,open的时候调用了establishConnection来加载驱动,初始化dbConn,然后调用dbConn.prepareStatement(query)来获取upload(PreparedStatement);writeRecord采取的是PreparedStatement.addBatch操作,当batchCount大于等于batchInterval(默认5000),会执行flush操作,也就是调用PreparedStatement.executeBatch方法,然后重置batchCount;为了以防数据没达到batchInterval而未能提交,在close的时候会再次执行flush操作,然后才关闭PreparedStatement、Connection
  • Row是JDBCOutputFormat的writeRecord的类型,它里头使用Object数据来存取字段值
  • JDBCOutputFormatBuilder提供了对username、password、dbURL、query、drivername、batchInterval、typesArray这几个属性的builder方法
  • JDBCAppendTableSink里头用到了JDBCOutputFormat,它的emitDataStream方法会给传入的dataStream设置JDBCSinkFunction的sink(JDBCSinkFunction);而emitDataSet方法则对dataSet设置output
  • JDBCSinkFunction继承了RichSinkFunction,同时也实现了CheckpointedFunction接口;invoke方法使用的是JDBCOutputFormat.writeRecord方法,而snapshotState则是调用了JDBCOutputFormat.flush来及时提交记录

doc

转载于:https://my.oschina.net/go4it/blog/2967397

你可能感兴趣的文章
WordPress新增Page的模版文件
查看>>
WP移动设备压缩与解压控件Xceed Zip for .NET Compact Framework控件下载及详细介绍使用方法...
查看>>
proc文件系统探索 之 根目录下的文件[六]
查看>>
搭建ICINGA监控
查看>>
DataSet
查看>>
第三方分享功能
查看>>
Quartz.NET 前一次任务未执行完成时不触发下次的解决方法
查看>>
SQL中的null值
查看>>
python unittest之断言及示例
查看>>
online_judge_1106
查看>>
JAVA_内部类
查看>>
jxl 导入excel
查看>>
Mysql之performance Schema
查看>>
虚拟机linux上网问题
查看>>
XMLHttpRequest - 原始AJAX初步
查看>>
laravel/lumen 单元测试
查看>>
csu2161: 漫漫上学路(Hash+最短路)
查看>>
在Notepad++中为Python配置编译环境
查看>>
重复引用错误:duplicate symbols for architecture x86_64
查看>>
计算机图形学 课设
查看>>