From issues-return-179422-archive-asf-public=cust-asf.ponee.io@flink.apache.org Mon Jul 23 12:45:07 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id AA1EE180792 for ; Mon, 23 Jul 2018 12:45:06 +0200 (CEST) Received: (qmail 13900 invoked by uid 500); 23 Jul 2018 10:45:05 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 13575 invoked by uid 99); 23 Jul 2018 10:45:05 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 23 Jul 2018 10:45:05 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 036A4E0A03; Mon, 23 Jul 2018 10:45:05 +0000 (UTC) From: pnowojski To: issues@flink.apache.org Reply-To: issues@flink.apache.org References: In-Reply-To: Subject: [GitHub] flink pull request #6387: [FLINK-9846] [table] Add a Kafka table sink factor... Content-Type: text/plain Message-Id: <20180723104505.036A4E0A03@git1-us-west.apache.org> Date: Mon, 23 Jul 2018 10:45:05 +0000 (UTC) Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6387#discussion_r204348635 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java --- @@ -82,49 +129,97 @@ public KafkaTableSink( * * @param rowSchema the schema of the row to serialize. * @return Instance of serialization schema + * @deprecated Use the constructor to pass a serialization schema instead. */ - protected abstract SerializationSchema createSerializationSchema(RowTypeInfo rowSchema); + @Deprecated + protected SerializationSchema createSerializationSchema(RowTypeInfo rowSchema) { + throw new UnsupportedOperationException("This method only exists for backwards compatibility."); + } /** * Create a deep copy of this sink. * * @return Deep copy of this sink */ - protected abstract KafkaTableSink createCopy(); + @Deprecated + protected KafkaTableSink createCopy() { + throw new UnsupportedOperationException("This method only exists for backwards compatibility."); + } @Override public void emitDataStream(DataStream dataStream) { - FlinkKafkaProducerBase kafkaProducer = createKafkaProducer(topic, properties, serializationSchema, partitioner); - // always enable flush on checkpoint to achieve at-least-once if query runs with checkpointing enabled. - kafkaProducer.setFlushOnCheckpoint(true); + SinkFunction kafkaProducer = createKafkaProducer( + topic, + properties, + serializationSchema.orElseThrow(() -> new IllegalStateException("No serialization schema defined.")), + partitioner); dataStream.addSink(kafkaProducer).name(TableConnectorUtil.generateRuntimeName(this.getClass(), fieldNames)); } @Override public TypeInformation getOutputType() { - return new RowTypeInfo(getFieldTypes()); + return schema + .map(TableSchema::toRowType) + .orElseGet(() -> new RowTypeInfo(getFieldTypes())); } public String[] getFieldNames() { - return fieldNames; + return schema.map(TableSchema::getColumnNames).orElse(fieldNames); } @Override public TypeInformation[] getFieldTypes() { - return fieldTypes; + return schema.map(TableSchema::getTypes).orElse(fieldTypes); } @Override public KafkaTableSink configure(String[] fieldNames, TypeInformation[] fieldTypes) { + // a fixed schema is defined so reconfiguration is not supported --- End diff -- Move this comment to exception description. ---