flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pnowojski <...@git.apache.org>
Subject [GitHub] flink pull request #6387: [FLINK-9846] [table] Add a Kafka table sink factor...
Date Mon, 23 Jul 2018 10:45:05 GMT
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6387#discussion_r204348635
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
---
    @@ -82,49 +129,97 @@ public KafkaTableSink(
     	 *
     	 * @param rowSchema the schema of the row to serialize.
     	 * @return Instance of serialization schema
    +	 * @deprecated Use the constructor to pass a serialization schema instead.
     	 */
    -	protected abstract SerializationSchema<Row> createSerializationSchema(RowTypeInfo
rowSchema);
    +	@Deprecated
    +	protected SerializationSchema<Row> createSerializationSchema(RowTypeInfo rowSchema)
{
    +		throw new UnsupportedOperationException("This method only exists for backwards compatibility.");
    +	}
     
     	/**
     	 * Create a deep copy of this sink.
     	 *
     	 * @return Deep copy of this sink
     	 */
    -	protected abstract KafkaTableSink createCopy();
    +	@Deprecated
    +	protected KafkaTableSink createCopy() {
    +		throw new UnsupportedOperationException("This method only exists for backwards compatibility.");
    +	}
     
     	@Override
     	public void emitDataStream(DataStream<Row> dataStream) {
    -		FlinkKafkaProducerBase<Row> kafkaProducer = createKafkaProducer(topic, properties,
serializationSchema, partitioner);
    -		// always enable flush on checkpoint to achieve at-least-once if query runs with checkpointing
enabled.
    -		kafkaProducer.setFlushOnCheckpoint(true);
    +		SinkFunction<Row> kafkaProducer = createKafkaProducer(
    +			topic,
    +			properties,
    +			serializationSchema.orElseThrow(() -> new IllegalStateException("No serialization
schema defined.")),
    +			partitioner);
     		dataStream.addSink(kafkaProducer).name(TableConnectorUtil.generateRuntimeName(this.getClass(),
fieldNames));
     	}
     
     	@Override
     	public TypeInformation<Row> getOutputType() {
    -		return new RowTypeInfo(getFieldTypes());
    +		return schema
    +			.map(TableSchema::toRowType)
    +			.orElseGet(() -> new RowTypeInfo(getFieldTypes()));
     	}
     
     	public String[] getFieldNames() {
    -		return fieldNames;
    +		return schema.map(TableSchema::getColumnNames).orElse(fieldNames);
     	}
     
     	@Override
     	public TypeInformation<?>[] getFieldTypes() {
    -		return fieldTypes;
    +		return schema.map(TableSchema::getTypes).orElse(fieldTypes);
     	}
     
     	@Override
     	public KafkaTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes)
{
    +		// a fixed schema is defined so reconfiguration is not supported
    --- End diff --
    
    Move this comment to exception description.


---

Mime
View raw message