flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject flink git commit: [FLINK-3188] Pass deletes to KeyedDeserializationSchema
Date Tue, 05 Jan 2016 09:02:46 GMT
Repository: flink
Updated Branches:
  refs/heads/master 47cd9fd8a -> dcf86c27b


[FLINK-3188] Pass deletes to KeyedDeserializationSchema

This closes #1484


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dcf86c27
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dcf86c27
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dcf86c27

Branch: refs/heads/master
Commit: dcf86c27b91170f99c4d575950c72db8d14617dc
Parents: 47cd9fd
Author: Sebastian Klemke <sebastian.klemke@researchgate.net>
Authored: Mon Jan 4 14:52:52 2016 +0100
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Tue Jan 5 10:02:26 2016 +0100

----------------------------------------------------------------------
 .../kafka/internals/LegacyFetcher.java          | 17 ++---
 .../connectors/kafka/KafkaConsumerTestBase.java | 80 +++++++++++++++++++-
 .../streaming/connectors/kafka/KafkaITCase.java |  9 ++-
 .../KeyedDeserializationSchema.java             |  2 +-
 ...eInformationKeyValueSerializationSchema.java | 10 ++-
 5 files changed, 102 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dcf86c27/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
index 4233c18..b51ad61 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
@@ -448,19 +448,18 @@ public class LegacyFetcher implements Fetcher {
 
 								final long offset = msg.offset();
 
-								// put value into byte array
 								ByteBuffer payload = msg.message().payload();
+
+								// If the message value is null, this represents a delete command for the message
key.
+								// Log this and pass it on to the client who might want to also receive delete messages.
+								byte[] valueBytes;
 								if (payload == null) {
-									// This message has no value (which means it has been deleted from the Kafka topic)
 									deletedMessages++;
-									// advance offset in state to avoid re-reading the message
-									synchronized (sourceContext.getCheckpointLock()) {
-										offsetsState.put(topicPartition, offset);
-									}
-									continue;
+									valueBytes = null;
+								} else {
+									valueBytes = new byte[payload.remaining()];
+									payload.get(valueBytes);
 								}
-								byte[] valueBytes = new byte[payload.remaining()];
-								payload.get(valueBytes);
 
 								// put key into byte array
 								byte[] keyBytes = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/dcf86c27/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 4f71384..d75a15c 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -41,12 +41,15 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.util.DataOutputSerializer;
 import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -75,6 +78,7 @@ import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
 import org.apache.flink.testutils.junit.RetryOnException;
@@ -107,6 +111,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -122,10 +127,20 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	// ------------------------------------------------------------------------
 
 	protected abstract <T> FlinkKafkaConsumer<T> getConsumer(
-			List<String> topics, DeserializationSchema<T> deserializationSchema, Properties
props);
+			List<String> topics, KeyedDeserializationSchema<T> deserializationSchema,
Properties props);
+
+	protected <T> FlinkKafkaConsumer<T> getConsumer(
+			List<String> topics, DeserializationSchema<T> deserializationSchema, Properties
props) {
+		return getConsumer(topics, new KeyedDeserializationSchemaWrapper<T>(deserializationSchema),
props);
+	}
 
 	protected <T> FlinkKafkaConsumer<T> getConsumer(
 			String topic, DeserializationSchema<T> deserializationSchema, Properties props)
{
+		return getConsumer(Collections.singletonList(topic), new KeyedDeserializationSchemaWrapper<T>(deserializationSchema),
props);
+	}
+
+	protected <T> FlinkKafkaConsumer<T> getConsumer(
+			String topic, KeyedDeserializationSchema<T> deserializationSchema, Properties props)
{
 		return getConsumer(Collections.singletonList(topic), deserializationSchema, props);
 	}
 
@@ -832,7 +847,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		Assert.assertEquals((NUM_TOPICS * (NUM_TOPICS + 1))/2, topicPartitions.size());
 
 		KeyedDeserializationSchema<Tuple3<Integer, Integer, String>> readSchema = new
Tuple2WithTopicDeserializationSchema(env.getConfig());
-		DataStreamSource<Tuple3<Integer, Integer, String>> stream = env.addSource(new
FlinkKafkaConsumer082<>(topics, readSchema, standardProps));
+		DataStreamSource<Tuple3<Integer, Integer, String>> stream = env.addSource(getConsumer(topics,
readSchema, standardProps));
 
 		stream.flatMap(new FlatMapFunction<Tuple3<Integer, Integer, String>, Integer>()
{
 			Map<String, Integer> countPerTopic = new HashMap<>(NUM_TOPICS);
@@ -1120,7 +1135,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		KeyedDeserializationSchema<Tuple2<Long, PojoValue>> readSchema = new TypeInformationKeyValueSerializationSchema<>(Long.class,
PojoValue.class, env.getConfig());
 
-		DataStream<Tuple2<Long, PojoValue>> fromKafka = env.addSource(new FlinkKafkaConsumer082<>(topic,
readSchema, standardProps));
+		DataStream<Tuple2<Long, PojoValue>> fromKafka = env.addSource(getConsumer(topic,
readSchema, standardProps));
 		fromKafka.flatMap(new RichFlatMapFunction<Tuple2<Long,PojoValue>, Object>()
{
 			long counter = 0;
 			@Override
@@ -1152,7 +1167,66 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		public PojoValue() {}
 	}
 
+	public void runAllDeletesTest() throws Exception {
+		final String topic = "alldeletestest";
+		createTestTopic(topic, 1, 1);
+		final int ELEMENT_COUNT = 300;
+
+		// ----------- Write some data into Kafka -------------------
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost",
flinkPort);
+		env.setParallelism(1);
+		env.setNumberOfExecutionRetries(3);
+		env.getConfig().disableSysoutLogging();
+
+		DataStream<Tuple2<byte[], PojoValue>> kvStream = env.addSource(new SourceFunction<Tuple2<byte[],
PojoValue>>() {
+			@Override
+			public void run(SourceContext<Tuple2<byte[], PojoValue>> ctx) throws Exception
{
+				Random rnd = new Random(1337);
+				for (long i = 0; i < ELEMENT_COUNT; i++) {
+					final byte[] key = new byte[200];
+					rnd.nextBytes(key);
+					ctx.collect(new Tuple2<>(key, (PojoValue) null));
+				}
+			}
+			@Override
+			public void cancel() {
+			}
+		});
+
+		TypeInformationKeyValueSerializationSchema<byte[], PojoValue> schema = new TypeInformationKeyValueSerializationSchema<>(byte[].class,
PojoValue.class, env.getConfig());
 
+		kvStream.addSink(new FlinkKafkaProducer<>(topic, schema,
+				FlinkKafkaProducer.getPropertiesFromBrokerList(brokerConnectionStrings)));
+		env.execute("Write deletes to Kafka");
+
+		// ----------- Read the data again -------------------
+
+		env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env.setParallelism(1);
+		env.setNumberOfExecutionRetries(3);
+		env.getConfig().disableSysoutLogging();
+
+		DataStream<Tuple2<byte[], PojoValue>> fromKafka = env.addSource(getConsumer(topic,
schema, standardProps));
+
+		fromKafka.flatMap(new RichFlatMapFunction<Tuple2<byte[], PojoValue>, Object>()
{
+			long counter = 0;
+			@Override
+			public void flatMap(Tuple2<byte[], PojoValue> value, Collector<Object> out)
throws Exception {
+				// ensure that deleted messages are passed as nulls
+				assertNull(value.f1);
+				counter++;
+				if (counter == ELEMENT_COUNT) {
+					// we got the right number of elements
+					throw new SuccessException();
+				}
+			}
+		});
+
+		tryExecute(env, "Read deletes from Kafka");
+
+		deleteTestTopic(topic);
+	}
 
 	// ------------------------------------------------------------------------
 	//  Reading writing test data sets

http://git-wip-us.apache.org/repos/asf/flink/blob/dcf86c27/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index 3abbd71..07e650a 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 
 import org.junit.Test;
 
@@ -28,7 +28,7 @@ import java.util.Properties;
 public class KafkaITCase extends KafkaConsumerTestBase {
 	
 	@Override
-	protected <T> FlinkKafkaConsumer<T> getConsumer(List<String> topics, DeserializationSchema<T>
deserializationSchema, Properties props) {
+	protected <T> FlinkKafkaConsumer<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T>
deserializationSchema, Properties props) {
 		return new FlinkKafkaConsumer082<>(topics, deserializationSchema, props);
 	}
 	
@@ -125,4 +125,9 @@ public class KafkaITCase extends KafkaConsumerTestBase {
 		runConsumeMultipleTopics();
 	}
 
+	@Test
+	public void testAllDeletes() throws Exception {
+		runAllDeletesTest();
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dcf86c27/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
index 917c2330..80bea8d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
@@ -35,7 +35,7 @@ public interface KeyedDeserializationSchema<T> extends Serializable,
ResultTypeQ
 	 * Deserializes the byte message.
 	 *
 	 * @param messageKey the key as a byte array (null if no key has been set)
-	 * @param message The message, as a byte array.
+	 * @param message The message, as a byte array. (null if the message was empty or deleted)
 	 * @param offset the offset of the message in the original source (for example the Kafka
offset)
 	 * @return The deserialized message as an object.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/dcf86c27/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
index ef9cde5..250012f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
@@ -84,7 +84,10 @@ public class TypeInformationKeyValueSerializationSchema<K, V> implements
KeyedDe
 		if(messageKey != null) {
 			key = keySerializer.deserialize(new ByteArrayInputView(messageKey));
 		}
-		V value = valueSerializer.deserialize(new ByteArrayInputView(message));
+		V value = null;
+		if(message != null) {
+			value = valueSerializer.deserialize(new ByteArrayInputView(message));
+		}
 		return new Tuple2<>(key, value);
 	}
 
@@ -128,6 +131,11 @@ public class TypeInformationKeyValueSerializationSchema<K, V> implements
KeyedDe
 
 	@Override
 	public byte[] serializeValue(Tuple2<K, V> element) {
+		// if the value is null, its serialized value is null as well.
+		if(element.f1 == null) {
+			return null;
+		}
+
 		if (valueOutputSerializer == null) {
 			valueOutputSerializer = new DataOutputSerializer(16);
 		}


Mime
View raw message