Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0C681200BCE for ; Fri, 2 Dec 2016 14:34:56 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 0B439160B4A; Fri, 2 Dec 2016 13:34:56 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3B506160B32 for ; Fri, 2 Dec 2016 14:34:53 +0100 (CET) Received: (qmail 70020 invoked by uid 500); 2 Dec 2016 13:34:52 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 69228 invoked by uid 99); 2 Dec 2016 13:34:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 02 Dec 2016 13:34:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8780FF177F; Fri, 2 Dec 2016 13:34:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: fhueske@apache.org To: commits@flink.apache.org Date: Fri, 02 Dec 2016 13:35:03 -0000 Message-Id: In-Reply-To: <627ed550fa9e4aed8e3752516869c22e@git.apache.org> References: <627ed550fa9e4aed8e3752516869c22e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [13/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors] Merge batch and streaming connectors into common Maven module. archived-at: Fri, 02 Dec 2016 13:34:56 -0000 http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-flume/pom.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-flume/pom.xml b/flink-streaming-connectors/flink-connector-flume/pom.xml deleted file mode 100644 index 1b1b810..0000000 --- a/flink-streaming-connectors/flink-connector-flume/pom.xml +++ /dev/null @@ -1,175 +0,0 @@ - - - - - 4.0.0 - - - org.apache.flink - flink-streaming-connectors - 1.2-SNAPSHOT - .. - - - flink-connector-flume_2.10 - flink-connector-flume - - jar - - - - 1.5.0 - - - - - - org.apache.flink - flink-streaming-java_2.10 - ${project.version} - provided - - - - org.apache.flume - flume-ng-core - ${flume-ng.version} - - - org.slf4j - slf4j-log4j12 - - - log4j - log4j - - - commons-io - commons-io - - - commons-codec - commons-codec - - - commons-cli - commons-cli - - - commons-lang - commons-lang - - - org.apache.avro - avro - - - org.codehaus.jackson - jackson-core-asl - - - org.codehaus.jackson - jackson-mapper-asl - - - com.thoughtworks.paranamer - paranamer - - - org.xerial.snappy - snappy-java - - - org.tukaani - xz - - - org.apache.velocity - velocity - - - commons-collections - commons-collections - - - org.mortbay.jetty - servlet-api - - - org.mortbay.jetty - jetty-util - - - org.mortbay.jetty - jetty - - - com.google.code.gson - gson - - - org.apache.thrift - libthrift - - - - - - - - - - org.apache.maven.plugins - maven-jar-plugin - - - - test-jar - - - - - - - - org.apache.maven.plugins - maven-shade-plugin - - - shade-flink - - - - - org.apache.flume:* - - - - - - - - - - - - http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java deleted file mode 100644 index 2dc043b..0000000 --- a/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.flume; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flink.streaming.util.serialization.SerializationSchema; -import org.apache.flume.Event; -import org.apache.flume.EventDeliveryException; -import org.apache.flume.FlumeException; -import org.apache.flume.api.RpcClient; -import org.apache.flume.api.RpcClientFactory; -import org.apache.flume.event.EventBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class FlumeSink extends RichSinkFunction { - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(FlumeSink.class); - - private transient FlinkRpcClientFacade client; - boolean initDone = false; - String host; - int port; - SerializationSchema schema; - - public FlumeSink(String host, int port, SerializationSchema schema) { - this.host = host; - this.port = port; - this.schema = schema; - } - - /** - * Receives tuples from the Apache Flink {@link DataStream} and forwards - * them to Apache Flume. - * - * @param value - * The tuple arriving from the datastream - */ - @Override - public void invoke(IN value) { - - byte[] data = schema.serialize(value); - client.sendDataToFlume(data); - - } - - private class FlinkRpcClientFacade { - private RpcClient client; - private String hostname; - private int port; - - /** - * Initializes the connection to Apache Flume. - * - * @param hostname - * The host - * @param port - * The port. - */ - public void init(String hostname, int port) { - // Setup the RPC connection - this.hostname = hostname; - this.port = port; - int initCounter = 0; - while (true) { - if (initCounter >= 90) { - throw new RuntimeException("Cannot establish connection with" + port + " at " - + host); - } - try { - this.client = RpcClientFactory.getDefaultInstance(hostname, port); - } catch (FlumeException e) { - // Wait one second if the connection failed before the next - // try - try { - Thread.sleep(1000); - } catch (InterruptedException e1) { - if (LOG.isErrorEnabled()) { - LOG.error("Interrupted while trying to connect {} at {}", port, host); - } - } - } - if (client != null) { - break; - } - initCounter++; - } - initDone = true; - } - - /** - * Sends byte arrays as {@link Event} series to Apache Flume. - * - * @param data - * The byte array to send to Apache FLume - */ - public void sendDataToFlume(byte[] data) { - Event event = EventBuilder.withBody(data); - - try { - client.append(event); - - } catch (EventDeliveryException e) { - // clean up and recreate the client - client.close(); - client = null; - client = RpcClientFactory.getDefaultInstance(hostname, port); - } - } - - } - - @Override - public void close() { - client.client.close(); - } - - @Override - public void open(Configuration config) { - client = new FlinkRpcClientFacade(); - client.init(host, port); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml b/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml deleted file mode 100644 index 04019f8..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml +++ /dev/null @@ -1,205 +0,0 @@ - - - - - 4.0.0 - - - org.apache.flink - flink-streaming-connectors - 1.2-SNAPSHOT - .. - - - flink-connector-kafka-0.10_2.10 - flink-connector-kafka-0.10 - - jar - - - - 0.10.0.1 - - - - - - - - org.apache.flink - flink-connector-kafka-0.9_2.10 - ${project.version} - - - org.apache.kafka - kafka_${scala.binary.version} - - - - - - - - org.apache.kafka - kafka-clients - ${kafka.version} - - - - org.apache.flink - flink-table_2.10 - ${project.version} - provided - - true - - - - - - org.apache.flink - flink-streaming-java_2.10 - ${project.version} - test - test-jar - - - - org.apache.flink - flink-connector-kafka-0.9_2.10 - ${project.version} - - - - org.apache.kafka - kafka_${scala.binary.version} - - - test-jar - test - - - - org.apache.flink - flink-connector-kafka-base_2.10 - ${project.version} - - - - org.apache.kafka - kafka_${scala.binary.version} - - - test-jar - test - - - - - org.apache.kafka - kafka_${scala.binary.version} - ${kafka.version} - test - - - - org.apache.flink - flink-tests_2.10 - ${project.version} - test-jar - test - - - - org.apache.flink - flink-test-utils_2.10 - ${project.version} - test - - - - org.apache.flink - flink-runtime_2.10 - ${project.version} - test-jar - test - - - - org.apache.flink - flink-metrics-jmx - ${project.version} - test - - - - - - - - - org.apache.maven.plugins - maven-jar-plugin - - - - test-jar - - - - **/KafkaTestEnvironmentImpl* - - - - - - - org.apache.maven.plugins - maven-source-plugin - - - attach-test-sources - - test-jar-no-fork - - - - **/KafkaTestEnvironmentImpl* - - - - - - - org.apache.maven.plugins - maven-surefire-plugin - - - 1 - -Xms256m -Xmx1000m -Dlog4j.configuration=${log4j.configuration} -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit - - - - - - http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java deleted file mode 100644 index a9ce336..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher; -import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; -import org.apache.flink.util.SerializedValue; - -import java.util.Collections; -import java.util.List; -import java.util.Properties; - - -/** - * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from - * Apache Kafka 0.10.x. The consumer can run in multiple parallel instances, each of which will pull - * data from one or more Kafka partitions. - * - *

The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost - * during a failure, and that the computation processes elements "exactly once". - * (Note: These guarantees naturally assume that Kafka itself does not loose any data.)

- * - *

Please note that Flink snapshots the offsets internally as part of its distributed checkpoints. The offsets - * committed to Kafka / ZooKeeper are only to bring the outside view of progress in sync with Flink's view - * of the progress. That way, monitoring and other jobs can get a view of how far the Flink Kafka consumer - * has consumed a topic.

- * - *

Please refer to Kafka's documentation for the available configuration properties: - * http://kafka.apache.org/documentation.html#newconsumerconfigs

- * - *

NOTE: The implementation currently accesses partition metadata when the consumer - * is constructed. That means that the client that submits the program needs to be able to - * reach the Kafka brokers or ZooKeeper.

- */ -public class FlinkKafkaConsumer010 extends FlinkKafkaConsumer09 { - - private static final long serialVersionUID = 2324564345203409112L; - - - // ------------------------------------------------------------------------ - - /** - * Creates a new Kafka streaming source consumer for Kafka 0.10.x - * - * @param topic - * The name of the topic that should be consumed. - * @param valueDeserializer - * The de-/serializer used to convert between Kafka's byte messages and Flink's objects. - * @param props - * The properties used to configure the Kafka consumer client, and the ZooKeeper client. - */ - public FlinkKafkaConsumer010(String topic, DeserializationSchema valueDeserializer, Properties props) { - this(Collections.singletonList(topic), valueDeserializer, props); - } - - /** - * Creates a new Kafka streaming source consumer for Kafka 0.10.x - * - * This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value - * pairs, offsets, and topic names from Kafka. - * - * @param topic - * The name of the topic that should be consumed. - * @param deserializer - * The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects. - * @param props - * The properties used to configure the Kafka consumer client, and the ZooKeeper client. - */ - public FlinkKafkaConsumer010(String topic, KeyedDeserializationSchema deserializer, Properties props) { - this(Collections.singletonList(topic), deserializer, props); - } - - /** - * Creates a new Kafka streaming source consumer for Kafka 0.10.x - * - * This constructor allows passing multiple topics to the consumer. - * - * @param topics - * The Kafka topics to read from. - * @param deserializer - * The de-/serializer used to convert between Kafka's byte messages and Flink's objects. - * @param props - * The properties that are used to configure both the fetcher and the offset handler. - */ - public FlinkKafkaConsumer010(List topics, DeserializationSchema deserializer, Properties props) { - this(topics, new KeyedDeserializationSchemaWrapper<>(deserializer), props); - } - - /** - * Creates a new Kafka streaming source consumer for Kafka 0.10.x - * - * This constructor allows passing multiple topics and a key/value deserialization schema. - * - * @param topics - * The Kafka topics to read from. - * @param deserializer - * The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects. - * @param props - * The properties that are used to configure both the fetcher and the offset handler. - */ - public FlinkKafkaConsumer010(List topics, KeyedDeserializationSchema deserializer, Properties props) { - super(topics, deserializer, props); - } - - @Override - protected AbstractFetcher createFetcher( - SourceContext sourceContext, - List thisSubtaskPartitions, - SerializedValue> watermarksPeriodic, - SerializedValue> watermarksPunctuated, - StreamingRuntimeContext runtimeContext) throws Exception { - - boolean useMetrics = !Boolean.valueOf(properties.getProperty(KEY_DISABLE_METRICS, "false")); - - return new Kafka010Fetcher<>( - sourceContext, - thisSubtaskPartitions, - watermarksPeriodic, - watermarksPunctuated, - runtimeContext.getProcessingTimeService(), - runtimeContext.getExecutionConfig().getAutoWatermarkInterval(), - runtimeContext.getUserCodeClassLoader(), - runtimeContext.isCheckpointingEnabled(), - runtimeContext.getTaskNameWithSubtasks(), - runtimeContext.getMetricGroup(), - deserializer, - properties, - pollTimeout, - useMetrics); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java deleted file mode 100644 index cc0194b..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java +++ /dev/null @@ -1,398 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.api.common.functions.IterationRuntimeContext; -import org.apache.flink.api.common.functions.RichFunction; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.api.java.typeutils.GenericTypeInfo; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSink; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.api.operators.StreamSink; -import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner; -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.SerializationSchema; -import org.apache.kafka.clients.producer.ProducerRecord; - -import java.util.Properties; - -import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList; - - -/** - * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.10.x - * - * Implementation note: This producer is a hybrid between a regular regular sink function (a) - * and a custom operator (b). - * - * For (a), the class implements the SinkFunction and RichFunction interfaces. - * For (b), it extends the StreamTask class. - * - * Details about approach (a): - * - * Pre Kafka 0.10 producers only follow approach (a), allowing users to use the producer using the - * DataStream.addSink() method. - * Since the APIs exposed in that variant do not allow accessing the the timestamp attached to the record - * the Kafka 0.10 producer has a second invocation option, approach (b). - * - * Details about approach (b): - * Kafka 0.10 supports writing the timestamp attached to a record to Kafka. When adding the - * FlinkKafkaProducer010 using the FlinkKafkaProducer010.writeToKafkaWithTimestamps() method, the Kafka producer - * can access the internal record timestamp of the record and write it to Kafka. - * - * All methods and constructors in this class are marked with the approach they are needed for. - */ -public class FlinkKafkaProducer010 extends StreamSink implements SinkFunction, RichFunction { - - /** - * Flag controlling whether we are writing the Flink record's timestamp into Kafka. - */ - private boolean writeTimestampToKafka = false; - - // ---------------------- "Constructors" for timestamp writing ------------------ - - /** - * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to - * the topic. - * - * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) - * - * @param inStream The stream to write to Kafka - * @param topicId ID of the Kafka topic. - * @param serializationSchema User defined serialization schema supporting key/value messages - * @param producerConfig Properties with the producer configuration. - */ - public static FlinkKafkaProducer010Configuration writeToKafkaWithTimestamps(DataStream inStream, - String topicId, - KeyedSerializationSchema serializationSchema, - Properties producerConfig) { - return writeToKafkaWithTimestamps(inStream, topicId, serializationSchema, producerConfig, new FixedPartitioner()); - } - - - /** - * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to - * the topic. - * - * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) - * - * @param inStream The stream to write to Kafka - * @param topicId ID of the Kafka topic. - * @param serializationSchema User defined (keyless) serialization schema. - * @param producerConfig Properties with the producer configuration. - */ - public static FlinkKafkaProducer010Configuration writeToKafkaWithTimestamps(DataStream inStream, - String topicId, - SerializationSchema serializationSchema, - Properties producerConfig) { - return writeToKafkaWithTimestamps(inStream, topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner()); - } - - /** - * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to - * the topic. - * - * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) - * - * @param inStream The stream to write to Kafka - * @param topicId The name of the target topic - * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages - * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. - * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. - */ - public static FlinkKafkaProducer010Configuration writeToKafkaWithTimestamps(DataStream inStream, - String topicId, - KeyedSerializationSchema serializationSchema, - Properties producerConfig, - KafkaPartitioner customPartitioner) { - - GenericTypeInfo objectTypeInfo = new GenericTypeInfo<>(Object.class); - FlinkKafkaProducer010 kafkaProducer = new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, customPartitioner); - SingleOutputStreamOperator transformation = inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer); - return new FlinkKafkaProducer010Configuration<>(transformation, kafkaProducer); - } - - // ---------------------- Regular constructors w/o timestamp support ------------------ - - /** - * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to - * the topic. - * - * @param brokerList - * Comma separated addresses of the brokers - * @param topicId - * ID of the Kafka topic. - * @param serializationSchema - * User defined (keyless) serialization schema. - */ - public FlinkKafkaProducer010(String brokerList, String topicId, SerializationSchema serializationSchema) { - this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FixedPartitioner()); - } - - /** - * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to - * the topic. - * - * @param topicId - * ID of the Kafka topic. - * @param serializationSchema - * User defined (keyless) serialization schema. - * @param producerConfig - * Properties with the producer configuration. - */ - public FlinkKafkaProducer010(String topicId, SerializationSchema serializationSchema, Properties producerConfig) { - this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner()); - } - - /** - * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to - * the topic. - * - * @param topicId The topic to write data to - * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[] - * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. - * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner) - */ - public FlinkKafkaProducer010(String topicId, SerializationSchema serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner) { - this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner); - } - - // ------------------- Key/Value serialization schema constructors ---------------------- - - /** - * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to - * the topic. - * - * @param brokerList - * Comma separated addresses of the brokers - * @param topicId - * ID of the Kafka topic. - * @param serializationSchema - * User defined serialization schema supporting key/value messages - */ - public FlinkKafkaProducer010(String brokerList, String topicId, KeyedSerializationSchema serializationSchema) { - this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FixedPartitioner()); - } - - /** - * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to - * the topic. - * - * @param topicId - * ID of the Kafka topic. - * @param serializationSchema - * User defined serialization schema supporting key/value messages - * @param producerConfig - * Properties with the producer configuration. - */ - public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema serializationSchema, Properties producerConfig) { - this(topicId, serializationSchema, producerConfig, new FixedPartitioner()); - } - - /** - * Create Kafka producer - * - * This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above) - */ - public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner) { - // We create a Kafka 09 producer instance here and only "override" (by intercepting) the - // invoke call. - super(new FlinkKafkaProducer09<>(topicId, serializationSchema, producerConfig, customPartitioner)); - } - - - // ----------------------------- Generic element processing --------------------------- - - private void invokeInternal(T next, long elementTimestamp) throws Exception { - - final FlinkKafkaProducerBase internalProducer = (FlinkKafkaProducerBase) userFunction; - - internalProducer.checkErroneous(); - - byte[] serializedKey = internalProducer.schema.serializeKey(next); - byte[] serializedValue = internalProducer.schema.serializeValue(next); - String targetTopic = internalProducer.schema.getTargetTopic(next); - if (targetTopic == null) { - targetTopic = internalProducer.defaultTopicId; - } - - Long timestamp = null; - if(this.writeTimestampToKafka) { - timestamp = elementTimestamp; - } - - ProducerRecord record; - if (internalProducer.partitioner == null) { - record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue); - } else { - record = new ProducerRecord<>(targetTopic, internalProducer.partitioner.partition(next, serializedKey, serializedValue, internalProducer.partitions.length), timestamp, serializedKey, serializedValue); - } - if (internalProducer.flushOnCheckpoint) { - synchronized (internalProducer.pendingRecordsLock) { - internalProducer.pendingRecords++; - } - } - internalProducer.producer.send(record, internalProducer.callback); - } - - - // ----------------- Helper methods implementing methods from SinkFunction and RichFunction (Approach (a)) ---- - - - // ---- Configuration setters - - /** - * Defines whether the producer should fail on errors, or only log them. - * If this is set to true, then exceptions will be only logged, if set to false, - * exceptions will be eventually thrown and cause the streaming program to - * fail (and enter recovery). - * - * Method is only accessible for approach (a) (see above) - * - * @param logFailuresOnly The flag to indicate logging-only on exceptions. - */ - public void setLogFailuresOnly(boolean logFailuresOnly) { - final FlinkKafkaProducerBase internalProducer = (FlinkKafkaProducerBase) userFunction; - internalProducer.setLogFailuresOnly(logFailuresOnly); - } - - /** - * If set to true, the Flink producer will wait for all outstanding messages in the Kafka buffers - * to be acknowledged by the Kafka producer on a checkpoint. - * This way, the producer can guarantee that messages in the Kafka buffers are part of the checkpoint. - * - * Method is only accessible for approach (a) (see above) - * - * @param flush Flag indicating the flushing mode (true = flush on checkpoint) - */ - public void setFlushOnCheckpoint(boolean flush) { - final FlinkKafkaProducerBase internalProducer = (FlinkKafkaProducerBase) userFunction; - internalProducer.setFlushOnCheckpoint(flush); - } - - /** - * This method is used for approach (a) (see above) - * - */ - @Override - public void open(Configuration parameters) throws Exception { - final FlinkKafkaProducerBase internalProducer = (FlinkKafkaProducerBase) userFunction; - internalProducer.open(parameters); - } - - /** - * This method is used for approach (a) (see above) - */ - @Override - public IterationRuntimeContext getIterationRuntimeContext() { - final FlinkKafkaProducerBase internalProducer = (FlinkKafkaProducerBase) userFunction; - return internalProducer.getIterationRuntimeContext(); - } - - /** - * This method is used for approach (a) (see above) - */ - @Override - public void setRuntimeContext(RuntimeContext t) { - final FlinkKafkaProducerBase internalProducer = (FlinkKafkaProducerBase) userFunction; - internalProducer.setRuntimeContext(t); - } - - /** - * Invoke method for using the Sink as DataStream.addSink() sink. - * - * This method is used for approach (a) (see above) - * - * @param value The input record. - */ - @Override - public void invoke(T value) throws Exception { - invokeInternal(value, Long.MAX_VALUE); - } - - - // ----------------- Helper methods and classes implementing methods from StreamSink (Approach (b)) ---- - - - /** - * Process method for using the sink with timestamp support. - * - * This method is used for approach (b) (see above) - */ - @Override - public void processElement(StreamRecord element) throws Exception { - invokeInternal(element.getValue(), element.getTimestamp()); - } - - /** - * Configuration object returned by the writeToKafkaWithTimestamps() call. - */ - public static class FlinkKafkaProducer010Configuration extends DataStreamSink { - - private final FlinkKafkaProducerBase wrappedProducerBase; - private final FlinkKafkaProducer010 producer; - - private FlinkKafkaProducer010Configuration(DataStream stream, FlinkKafkaProducer010 producer) { - //noinspection unchecked - super(stream, producer); - this.producer = producer; - this.wrappedProducerBase = (FlinkKafkaProducerBase) producer.userFunction; - } - - /** - * Defines whether the producer should fail on errors, or only log them. - * If this is set to true, then exceptions will be only logged, if set to false, - * exceptions will be eventually thrown and cause the streaming program to - * fail (and enter recovery). - * - * @param logFailuresOnly The flag to indicate logging-only on exceptions. - */ - public void setLogFailuresOnly(boolean logFailuresOnly) { - this.wrappedProducerBase.setLogFailuresOnly(logFailuresOnly); - } - - /** - * If set to true, the Flink producer will wait for all outstanding messages in the Kafka buffers - * to be acknowledged by the Kafka producer on a checkpoint. - * This way, the producer can guarantee that messages in the Kafka buffers are part of the checkpoint. - * - * @param flush Flag indicating the flushing mode (true = flush on checkpoint) - */ - public void setFlushOnCheckpoint(boolean flush) { - this.wrappedProducerBase.setFlushOnCheckpoint(flush); - } - - /** - * If set to true, Flink will write the (event time) timestamp attached to each record into Kafka. - * Timestamps must be positive for Kafka to accept them. - * - * @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka. - */ - public void setWriteTimestampToKafka(boolean writeTimestampToKafka) { - this.producer.writeTimestampToKafka = writeTimestampToKafka; - } - } - - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java deleted file mode 100644 index ddf1ad3..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.table.Row; -import org.apache.flink.api.table.sources.StreamTableSource; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; - -import java.util.Properties; - -/** - * Kafka {@link StreamTableSource} for Kafka 0.10. - */ -public class Kafka010JsonTableSource extends Kafka09JsonTableSource { - - /** - * Creates a Kafka 0.10 JSON {@link StreamTableSource}. - * - * @param topic Kafka topic to consume. - * @param properties Properties for the Kafka consumer. - * @param fieldNames Row field names. - * @param fieldTypes Row field types. - */ - public Kafka010JsonTableSource( - String topic, - Properties properties, - String[] fieldNames, - TypeInformation[] fieldTypes) { - - super(topic, properties, fieldNames, fieldTypes); - } - - /** - * Creates a Kafka 0.10 JSON {@link StreamTableSource}. - * - * @param topic Kafka topic to consume. - * @param properties Properties for the Kafka consumer. - * @param fieldNames Row field names. - * @param fieldTypes Row field types. - */ - public Kafka010JsonTableSource( - String topic, - Properties properties, - String[] fieldNames, - Class[] fieldTypes) { - - super(topic, properties, fieldNames, fieldTypes); - } - - @Override - FlinkKafkaConsumerBase getKafkaConsumer(String topic, Properties properties, DeserializationSchema deserializationSchema) { - return new FlinkKafkaConsumer010<>(topic, deserializationSchema, properties); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java deleted file mode 100644 index 732440b..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.table.Row; -import org.apache.flink.api.table.sources.StreamTableSource; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; - -import java.util.Properties; - -/** - * Kafka {@link StreamTableSource} for Kafka 0.10. - */ -public class Kafka010TableSource extends Kafka09TableSource { - - /** - * Creates a Kafka 0.10 {@link StreamTableSource}. - * - * @param topic Kafka topic to consume. - * @param properties Properties for the Kafka consumer. - * @param deserializationSchema Deserialization schema to use for Kafka records. - * @param fieldNames Row field names. - * @param fieldTypes Row field types. - */ - public Kafka010TableSource( - String topic, - Properties properties, - DeserializationSchema deserializationSchema, - String[] fieldNames, - TypeInformation[] fieldTypes) { - - super(topic, properties, deserializationSchema, fieldNames, fieldTypes); - } - - /** - * Creates a Kafka 0.10 {@link StreamTableSource}. - * - * @param topic Kafka topic to consume. - * @param properties Properties for the Kafka consumer. - * @param deserializationSchema Deserialization schema to use for Kafka records. - * @param fieldNames Row field names. - * @param fieldTypes Row field types. - */ - public Kafka010TableSource( - String topic, - Properties properties, - DeserializationSchema deserializationSchema, - String[] fieldNames, - Class[] fieldTypes) { - - super(topic, properties, deserializationSchema, fieldNames, fieldTypes); - } - - @Override - FlinkKafkaConsumerBase getKafkaConsumer(String topic, Properties properties, DeserializationSchema deserializationSchema) { - return new FlinkKafkaConsumer010<>(topic, deserializationSchema, properties); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java deleted file mode 100644 index 71dd29a..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.internal; - -import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; -import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; -import org.apache.flink.util.SerializedValue; - -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.TopicPartition; - -import java.util.List; -import java.util.Properties; - -/** - * A fetcher that fetches data from Kafka brokers via the Kafka 0.10 consumer API. - * - *

This fetcher re-uses basically all functionality of the 0.9 fetcher. It only additionally - * takes the KafkaRecord-attached timestamp and attaches it to the Flink records. - * - * @param The type of elements produced by the fetcher. - */ -public class Kafka010Fetcher extends Kafka09Fetcher { - - public Kafka010Fetcher( - SourceContext sourceContext, - List assignedPartitions, - SerializedValue> watermarksPeriodic, - SerializedValue> watermarksPunctuated, - ProcessingTimeService processingTimeProvider, - long autoWatermarkInterval, - ClassLoader userCodeClassLoader, - boolean enableCheckpointing, - String taskNameWithSubtasks, - MetricGroup metricGroup, - KeyedDeserializationSchema deserializer, - Properties kafkaProperties, - long pollTimeout, - boolean useMetrics) throws Exception - { - super( - sourceContext, - assignedPartitions, - watermarksPeriodic, - watermarksPunctuated, - processingTimeProvider, - autoWatermarkInterval, - userCodeClassLoader, - enableCheckpointing, - taskNameWithSubtasks, - metricGroup, - deserializer, - kafkaProperties, - pollTimeout, - useMetrics); - } - - @Override - protected void emitRecord( - T record, - KafkaTopicPartitionState partition, - long offset, - ConsumerRecord consumerRecord) throws Exception { - - // we attach the Kafka 0.10 timestamp here - emitRecordWithTimestamp(record, partition, offset, consumerRecord.timestamp()); - } - - /** - * This method needs to be overridden because Kafka broke binary compatibility between 0.9 and 0.10, - * changing binary signatures - */ - @Override - protected KafkaConsumerCallBridge010 createCallBridge() { - return new KafkaConsumerCallBridge010(); - } - - @Override - protected String getFetcherName() { - return "Kafka 0.10 Fetcher"; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java deleted file mode 100644 index a81b098..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.internal; - -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.TopicPartition; - -import java.util.List; - -/** - * The ConsumerCallBridge simply calls the {@link KafkaConsumer#assign(java.util.Collection)} method. - * - * This indirection is necessary, because Kafka broke binary compatibility between 0.9 and 0.10, - * changing {@code assign(List)} to {@code assign(Collection)}. - * - * Because of that, we need two versions whose compiled code goes against different method signatures. - */ -public class KafkaConsumerCallBridge010 extends KafkaConsumerCallBridge { - - @Override - public void assignPartitions(KafkaConsumer consumer, List topicPartitions) throws Exception { - consumer.assign(topicPartitions); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties deleted file mode 100644 index 6bdfb48..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties +++ /dev/null @@ -1,29 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -log4j.rootLogger=INFO, testlogger - -log4j.appender.testlogger=org.apache.log4j.ConsoleAppender -log4j.appender.testlogger.target = System.err -log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout -log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n - -# suppress the irrelevant (wrong) warnings from the netty channel handler -log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger - - http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java deleted file mode 100644 index 6ee0429..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java +++ /dev/null @@ -1,484 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.core.testutils.MultiShotLatch; -import org.apache.flink.core.testutils.OneShotLatch; -import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; -import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.connectors.kafka.internal.Handover; -import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher; -import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; -import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; - -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.consumer.OffsetCommitCallback; -import org.apache.kafka.common.TopicPartition; - -import org.junit.Test; -import org.junit.runner.RunWith; - -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantLock; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; - -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.anyLong; -import static org.powermock.api.mockito.PowerMockito.doAnswer; -import static org.powermock.api.mockito.PowerMockito.mock; -import static org.powermock.api.mockito.PowerMockito.when; -import static org.powermock.api.mockito.PowerMockito.whenNew; - -/** - * Unit tests for the {@link Kafka010Fetcher}. - */ -@RunWith(PowerMockRunner.class) -@PrepareForTest(KafkaConsumerThread.class) -public class Kafka010FetcherTest { - - @Test - public void testCommitDoesNotBlock() throws Exception { - - // test data - final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42); - final Map testCommitData = new HashMap<>(); - testCommitData.put(testPartition, 11L); - - // to synchronize when the consumer is in its blocking method - final OneShotLatch sync = new OneShotLatch(); - - // ----- the mock consumer with blocking poll calls ---- - final MultiShotLatch blockerLatch = new MultiShotLatch(); - - KafkaConsumer mockConsumer = mock(KafkaConsumer.class); - when(mockConsumer.poll(anyLong())).thenAnswer(new Answer>() { - - @Override - public ConsumerRecords answer(InvocationOnMock invocation) throws InterruptedException { - sync.trigger(); - blockerLatch.await(); - return ConsumerRecords.empty(); - } - }); - - doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) { - blockerLatch.trigger(); - return null; - } - }).when(mockConsumer).wakeup(); - - // make sure the fetcher creates the mock consumer - whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer); - - // ----- create the test fetcher ----- - - @SuppressWarnings("unchecked") - SourceContext sourceContext = mock(SourceContext.class); - List topics = Collections.singletonList(new KafkaTopicPartition("test", 42)); - KeyedDeserializationSchema schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); - - final Kafka010Fetcher fetcher = new Kafka010Fetcher<>( - sourceContext, - topics, - null, /* periodic assigner */ - null, /* punctuated assigner */ - new TestProcessingTimeService(), - 10, - getClass().getClassLoader(), - false, /* checkpointing */ - "taskname-with-subtask", - new UnregisteredMetricsGroup(), - schema, - new Properties(), - 0L, - false); - - // ----- run the fetcher ----- - - final AtomicReference error = new AtomicReference<>(); - final Thread fetcherRunner = new Thread("fetcher runner") { - - @Override - public void run() { - try { - fetcher.runFetchLoop(); - } catch (Throwable t) { - error.set(t); - } - } - }; - fetcherRunner.start(); - - // wait until the fetcher has reached the method of interest - sync.await(); - - // ----- trigger the offset commit ----- - - final AtomicReference commitError = new AtomicReference<>(); - final Thread committer = new Thread("committer runner") { - @Override - public void run() { - try { - fetcher.commitInternalOffsetsToKafka(testCommitData); - } catch (Throwable t) { - commitError.set(t); - } - } - }; - committer.start(); - - // ----- ensure that the committer finishes in time ----- - committer.join(30000); - assertFalse("The committer did not finish in time", committer.isAlive()); - - // ----- test done, wait till the fetcher is done for a clean shutdown ----- - fetcher.cancel(); - fetcherRunner.join(); - - // check that there were no errors in the fetcher - final Throwable fetcherError = error.get(); - if (fetcherError != null && !(fetcherError instanceof Handover.ClosedException)) { - throw new Exception("Exception in the fetcher", fetcherError); - } - final Throwable committerError = commitError.get(); - if (committerError != null) { - throw new Exception("Exception in the committer", committerError); - } - } - - @Test - public void ensureOffsetsGetCommitted() throws Exception { - - // test data - final KafkaTopicPartition testPartition1 = new KafkaTopicPartition("test", 42); - final KafkaTopicPartition testPartition2 = new KafkaTopicPartition("another", 99); - - final Map testCommitData1 = new HashMap<>(); - testCommitData1.put(testPartition1, 11L); - testCommitData1.put(testPartition2, 18L); - - final Map testCommitData2 = new HashMap<>(); - testCommitData2.put(testPartition1, 19L); - testCommitData2.put(testPartition2, 28L); - - final BlockingQueue> commitStore = new LinkedBlockingQueue<>(); - - - // ----- the mock consumer with poll(), wakeup(), and commit(A)sync calls ---- - - final MultiShotLatch blockerLatch = new MultiShotLatch(); - - KafkaConsumer mockConsumer = mock(KafkaConsumer.class); - - when(mockConsumer.poll(anyLong())).thenAnswer(new Answer>() { - @Override - public ConsumerRecords answer(InvocationOnMock invocation) throws InterruptedException { - blockerLatch.await(); - return ConsumerRecords.empty(); - } - }); - - doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) { - blockerLatch.trigger(); - return null; - } - }).when(mockConsumer).wakeup(); - - doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) { - @SuppressWarnings("unchecked") - Map offsets = - (Map) invocation.getArguments()[0]; - - OffsetCommitCallback callback = (OffsetCommitCallback) invocation.getArguments()[1]; - - commitStore.add(offsets); - callback.onComplete(offsets, null); - - return null; - } - }).when(mockConsumer).commitAsync( - Mockito.>any(), any(OffsetCommitCallback.class)); - - // make sure the fetcher creates the mock consumer - whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer); - - // ----- create the test fetcher ----- - - @SuppressWarnings("unchecked") - SourceContext sourceContext = mock(SourceContext.class); - List topics = Collections.singletonList(new KafkaTopicPartition("test", 42)); - KeyedDeserializationSchema schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); - StreamingRuntimeContext context = mock(StreamingRuntimeContext.class); - - final Kafka010Fetcher fetcher = new Kafka010Fetcher<>( - sourceContext, - topics, - null, /* periodic assigner */ - null, /* punctuated assigner */ - new TestProcessingTimeService(), - 10, - getClass().getClassLoader(), - false, /* checkpointing */ - "taskname-with-subtask", - new UnregisteredMetricsGroup(), - schema, - new Properties(), - 0L, - false); - - - // ----- run the fetcher ----- - - final AtomicReference error = new AtomicReference<>(); - final Thread fetcherRunner = new Thread("fetcher runner") { - - @Override - public void run() { - try { - fetcher.runFetchLoop(); - } catch (Throwable t) { - error.set(t); - } - } - }; - fetcherRunner.start(); - - // ----- trigger the first offset commit ----- - - fetcher.commitInternalOffsetsToKafka(testCommitData1); - Map result1 = commitStore.take(); - - for (Entry entry : result1.entrySet()) { - TopicPartition partition = entry.getKey(); - if (partition.topic().equals("test")) { - assertEquals(42, partition.partition()); - assertEquals(12L, entry.getValue().offset()); - } - else if (partition.topic().equals("another")) { - assertEquals(99, partition.partition()); - assertEquals(18L, entry.getValue().offset()); - } - } - - // ----- trigger the second offset commit ----- - - fetcher.commitInternalOffsetsToKafka(testCommitData2); - Map result2 = commitStore.take(); - - for (Entry entry : result2.entrySet()) { - TopicPartition partition = entry.getKey(); - if (partition.topic().equals("test")) { - assertEquals(42, partition.partition()); - assertEquals(20L, entry.getValue().offset()); - } - else if (partition.topic().equals("another")) { - assertEquals(99, partition.partition()); - assertEquals(28L, entry.getValue().offset()); - } - } - - // ----- test done, wait till the fetcher is done for a clean shutdown ----- - fetcher.cancel(); - fetcherRunner.join(); - - // check that there were no errors in the fetcher - final Throwable caughtError = error.get(); - if (caughtError != null && !(caughtError instanceof Handover.ClosedException)) { - throw new Exception("Exception in the fetcher", caughtError); - } - } - - @Test - public void testCancellationWhenEmitBlocks() throws Exception { - - // ----- some test data ----- - - final String topic = "test-topic"; - final int partition = 3; - final byte[] payload = new byte[] {1, 2, 3, 4}; - - final List> records = Arrays.asList( - new ConsumerRecord(topic, partition, 15, payload, payload), - new ConsumerRecord(topic, partition, 16, payload, payload), - new ConsumerRecord(topic, partition, 17, payload, payload)); - - final Map>> data = new HashMap<>(); - data.put(new TopicPartition(topic, partition), records); - - final ConsumerRecords consumerRecords = new ConsumerRecords<>(data); - - // ----- the test consumer ----- - - final KafkaConsumer mockConsumer = mock(KafkaConsumer.class); - when(mockConsumer.poll(anyLong())).thenAnswer(new Answer>() { - @Override - public ConsumerRecords answer(InvocationOnMock invocation) { - return consumerRecords; - } - }); - - whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer); - - // ----- build a fetcher ----- - - BlockingSourceContext sourceContext = new BlockingSourceContext<>(); - List topics = Collections.singletonList(new KafkaTopicPartition(topic, partition)); - KeyedDeserializationSchema schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); - - final Kafka010Fetcher fetcher = new Kafka010Fetcher<>( - sourceContext, - topics, - null, /* periodic watermark extractor */ - null, /* punctuated watermark extractor */ - new TestProcessingTimeService(), - 10, /* watermark interval */ - this.getClass().getClassLoader(), - true, /* checkpointing */ - "task_name", - new UnregisteredMetricsGroup(), - schema, - new Properties(), - 0L, - false); - - - // ----- run the fetcher ----- - - final AtomicReference error = new AtomicReference<>(); - final Thread fetcherRunner = new Thread("fetcher runner") { - - @Override - public void run() { - try { - fetcher.runFetchLoop(); - } catch (Throwable t) { - error.set(t); - } - } - }; - fetcherRunner.start(); - - // wait until the thread started to emit records to the source context - sourceContext.waitTillHasBlocker(); - - // now we try to cancel the fetcher, including the interruption usually done on the task thread - // once it has finished, there must be no more thread blocked on the source context - fetcher.cancel(); - fetcherRunner.interrupt(); - fetcherRunner.join(); - - assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking()); - } - - // ------------------------------------------------------------------------ - // test utilities - // ------------------------------------------------------------------------ - - private static final class BlockingSourceContext implements SourceContext { - - private final ReentrantLock lock = new ReentrantLock(); - private final OneShotLatch inBlocking = new OneShotLatch(); - - @Override - public void collect(T element) { - block(); - } - - @Override - public void collectWithTimestamp(T element, long timestamp) { - block(); - } - - @Override - public void emitWatermark(Watermark mark) { - block(); - } - - @Override - public Object getCheckpointLock() { - return new Object(); - } - - @Override - public void close() {} - - public void waitTillHasBlocker() throws InterruptedException { - inBlocking.await(); - } - - public boolean isStillBlocking() { - return lock.isLocked(); - } - - @SuppressWarnings({"InfiniteLoopStatement", "SynchronizationOnLocalVariableOrMethodParameter"}) - private void block() { - lock.lock(); - try { - inBlocking.trigger(); - - // put this thread to sleep indefinitely - final Object o = new Object(); - while (true) { - synchronized (o) { - o.wait(); - } - } - } - catch (InterruptedException e) { - // exit cleanly, simply reset the interruption flag - Thread.currentThread().interrupt(); - } - finally { - lock.unlock(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java deleted file mode 100644 index 08511c9..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java +++ /dev/null @@ -1,313 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.GenericTypeInfo; -import org.apache.flink.api.java.typeutils.TypeInfoParser; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.operators.StreamSink; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema; -import org.junit.Test; - -import javax.annotation.Nullable; -import java.io.ByteArrayInputStream; -import java.io.IOException; - - -public class Kafka010ITCase extends KafkaConsumerTestBase { - - // ------------------------------------------------------------------------ - // Suite of Tests - // ------------------------------------------------------------------------ - - - @Test(timeout = 60000) - public void testFailOnNoBroker() throws Exception { - runFailOnNoBrokerTest(); - } - - @Test(timeout = 60000) - public void testConcurrentProducerConsumerTopology() throws Exception { - runSimpleConcurrentProducerConsumerTopology(); - } - - @Test(timeout = 60000) - public void testKeyValueSupport() throws Exception { - runKeyValueTest(); - } - - // --- canceling / failures --- - - @Test(timeout = 60000) - public void testCancelingEmptyTopic() throws Exception { - runCancelingOnEmptyInputTest(); - } - - @Test(timeout = 60000) - public void testCancelingFullTopic() throws Exception { - runCancelingOnFullInputTest(); - } - - @Test(timeout = 60000) - public void testFailOnDeploy() throws Exception { - runFailOnDeployTest(); - } - - - // --- source to partition mappings and exactly once --- - - @Test(timeout = 60000) - public void testOneToOneSources() throws Exception { - runOneToOneExactlyOnceTest(); - } - - @Test(timeout = 60000) - public void testOneSourceMultiplePartitions() throws Exception { - runOneSourceMultiplePartitionsExactlyOnceTest(); - } - - @Test(timeout = 60000) - public void testMultipleSourcesOnePartition() throws Exception { - runMultipleSourcesOnePartitionExactlyOnceTest(); - } - - // --- broker failure --- - - @Test(timeout = 60000) - public void testBrokerFailure() throws Exception { - runBrokerFailureTest(); - } - - // --- special executions --- - - @Test(timeout = 60000) - public void testBigRecordJob() throws Exception { - runBigRecordTestTopology(); - } - - @Test(timeout = 60000) - public void testMultipleTopics() throws Exception { - runProduceConsumeMultipleTopics(); - } - - @Test(timeout = 60000) - public void testAllDeletes() throws Exception { - runAllDeletesTest(); - } - - @Test(timeout = 60000) - public void testMetricsAndEndOfStream() throws Exception { - runEndOfStreamTest(); - } - - // --- offset committing --- - - @Test(timeout = 60000) - public void testCommitOffsetsToKafka() throws Exception { - runCommitOffsetsToKafka(); - } - - @Test(timeout = 60000) - public void testStartFromKafkaCommitOffsets() throws Exception { - runStartFromKafkaCommitOffsets(); - } - - @Test(timeout = 60000) - public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception { - runAutoOffsetRetrievalAndCommitToKafka(); - } - - /** - * Kafka 0.10 specific test, ensuring Timestamps are properly written to and read from Kafka - */ - @Test(timeout = 60000) - public void testTimestamps() throws Exception { - - final String topic = "tstopic"; - createTestTopic(topic, 3, 1); - - // ---------- Produce an event time stream into Kafka ------------------- - - StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env.setParallelism(1); - env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); - env.getConfig().disableSysoutLogging(); - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - - DataStream streamWithTimestamps = env.addSource(new SourceFunction() { - boolean running = true; - - @Override - public void run(SourceContext ctx) throws Exception { - long i = 0; - while(running) { - ctx.collectWithTimestamp(i, i*2); - if(i++ == 1000L) { - running = false; - } - } - } - - @Override - public void cancel() { - running = false; - } - }); - - final TypeInformationSerializationSchema longSer = new TypeInformationSerializationSchema<>(TypeInfoParser.parse("Long"), env.getConfig()); - FlinkKafkaProducer010.FlinkKafkaProducer010Configuration prod = FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, new KeyedSerializationSchemaWrapper<>(longSer), standardProps, new KafkaPartitioner() { - @Override - public int partition(Long next, byte[] serializedKey, byte[] serializedValue, int numPartitions) { - return (int)(next % 3); - } - }); - prod.setParallelism(3); - prod.setWriteTimestampToKafka(true); - env.execute("Produce some"); - - // ---------- Consume stream from Kafka ------------------- - - env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env.setParallelism(1); - env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); - env.getConfig().disableSysoutLogging(); - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - - FlinkKafkaConsumer010 kafkaSource = new FlinkKafkaConsumer010<>(topic, new LimitedLongDeserializer(), standardProps); - kafkaSource.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks() { - @Nullable - @Override - public Watermark checkAndGetNextWatermark(Long lastElement, long extractedTimestamp) { - if(lastElement % 10 == 0) { - return new Watermark(lastElement); - } - return null; - } - - @Override - public long extractTimestamp(Long element, long previousElementTimestamp) { - return previousElementTimestamp; - } - }); - - DataStream stream = env.addSource(kafkaSource); - GenericTypeInfo objectTypeInfo = new GenericTypeInfo<>(Object.class); - stream.transform("timestamp validating operator", objectTypeInfo, new TimestampValidatingOperator()).setParallelism(1); - - env.execute("Consume again"); - - deleteTestTopic(topic); - } - - private static class TimestampValidatingOperator extends StreamSink { - - public TimestampValidatingOperator() { - super(new SinkFunction() { - @Override - public void invoke(Long value) throws Exception { - throw new RuntimeException("Unexpected"); - } - }); - } - - long elCount = 0; - long wmCount = 0; - long lastWM = Long.MIN_VALUE; - - @Override - public void processElement(StreamRecord element) throws Exception { - elCount++; - if(element.getValue() * 2 != element.getTimestamp()) { - throw new RuntimeException("Invalid timestamp: " + element); - } - } - - @Override - public void processWatermark(Watermark mark) throws Exception { - wmCount++; - - if(lastWM <= mark.getTimestamp()) { - lastWM = mark.getTimestamp(); - } else { - throw new RuntimeException("Received watermark higher than the last one"); - } - - if( mark.getTimestamp() % 10 != 0 && mark.getTimestamp() != Long.MAX_VALUE ) { - throw new RuntimeException("Invalid watermark: " + mark.getTimestamp()); - } - } - - @Override - public void close() throws Exception { - super.close(); - if(elCount != 1000L) { - throw new RuntimeException("Wrong final element count " + elCount); - } - - if(wmCount <= 2) { - throw new RuntimeException("Almost no watermarks have been sent " + wmCount); - } - } - } - - private static class LimitedLongDeserializer implements KeyedDeserializationSchema { - - private final TypeInformation ti; - private final TypeSerializer ser; - long cnt = 0; - - public LimitedLongDeserializer() { - this.ti = TypeInfoParser.parse("Long"); - this.ser = ti.createSerializer(new ExecutionConfig()); - } - @Override - public TypeInformation getProducedType() { - return ti; - } - - @Override - public Long deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { - cnt++; - DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message)); - Long e = ser.deserialize(in); - return e; - } - - @Override - public boolean isEndOfStream(Long nextElement) { - return cnt > 1000L; - } - } - -}