flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzuli...@apache.org
Subject [1/2] flink git commit: [FLINK-3679] [kafka] Allow Kafka consumer to skip corrupted messages
Date Thu, 09 Mar 2017 06:06:48 GMT
Repository: flink
Updated Branches:
  refs/heads/master adbf846f2 -> c39ad31f3


[FLINK-3679] [kafka] Allow Kafka consumer to skip corrupted messages


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/afb4c5e0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/afb4c5e0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/afb4c5e0

Branch: refs/heads/master
Commit: afb4c5e02c513a82d2ad7f7816065fdd93665e0e
Parents: adbf846
Author: Haohui Mai <wheat9@apache.org>
Authored: Thu Mar 2 13:33:13 2017 -0800
Committer: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Committed: Thu Mar 9 14:05:37 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/kafka.md                    |  4 +
 .../connectors/kafka/Kafka09FetcherTest.java    | 84 ++++++++++++++++++++
 .../kafka/internals/AbstractFetcher.java        |  4 +
 3 files changed, 92 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/afb4c5e0/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index 0f700ab..331c9c7 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -146,6 +146,10 @@ The Flink Kafka Consumer needs to know how to turn the binary data in
Kafka into
 `DeserializationSchema` allows users to specify such a schema. The `T deserialize(byte[]
message)`
 method gets called for each Kafka message, passing the value from Kafka.
 
+There are two possible design choices when the `DeserializationSchema` encounters a corrupted
message. It can
+either throw an `IOException` which causes the pipeline to be restarted, or it can return
`null` where the Flink
+Kafka consumer will silently skip the corrupted message.
+
 It is usually helpful to start from the `AbstractDeserializationSchema`, which takes care
of describing the
 produced Java/Scala type to Flink's type system. Users that implement a vanilla `DeserializationSchema`
need
 to implement the `getProducedType(...)` method themselves.

http://git-wip-us.apache.org/repos/asf/flink/blob/afb4c5e0/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
index 49144e6..61a8855 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
@@ -29,6 +31,7 @@ import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.util.CollectingSourceContext;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
@@ -49,6 +52,8 @@ import org.mockito.stubbing.Answer;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -419,6 +424,85 @@ public class Kafka09FetcherTest {
 		assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking());
 	}
 
+	@Test
+	public void testSkipCorruptedMessage() throws Exception {
+
+		// ----- some test data -----
+
+		final String topic = "test-topic";
+		final int partition = 3;
+		final byte[] payload = new byte[] {1, 2, 3, 4};
+
+		final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList(
+			new ConsumerRecord<>(topic, partition, 15, payload, payload),
+			new ConsumerRecord<>(topic, partition, 16, payload, payload),
+			new ConsumerRecord<>(topic, partition, 17, payload, "end".getBytes()));
+
+		final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data
= new HashMap<>();
+		data.put(new TopicPartition(topic, partition), records);
+
+		final ConsumerRecords<byte[], byte[]> consumerRecords = new ConsumerRecords<>(data);
+
+		// ----- the test consumer -----
+
+		final KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+		when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>()
{
+			@Override
+			public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) {
+				return consumerRecords;
+			}
+		});
+
+		whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+		// ----- build a fetcher -----
+
+		ArrayList<String> results = new ArrayList<>();
+		SourceContext<String> sourceContext = new CollectingSourceContext<>(results,
results);
+		Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+			Collections.singletonMap(new KafkaTopicPartition(topic, partition), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+		KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchema<String>()
{
+
+			@Override
+			public String deserialize(byte[] messageKey, byte[] message,
+									  String topic, int partition, long offset) throws IOException {
+				return offset == 15 ? null : new String(message);
+			}
+
+			@Override
+			public boolean isEndOfStream(String nextElement) {
+				return "end".equals(nextElement);
+			}
+
+			@Override
+			public TypeInformation<String> getProducedType() {
+				return BasicTypeInfo.STRING_TYPE_INFO;
+			}
+		};
+
+		final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
+			sourceContext,
+			partitionsWithInitialOffsets,
+			null, /* periodic watermark extractor */
+			null, /* punctuated watermark extractor */
+			new TestProcessingTimeService(),
+			10, /* watermark interval */
+			this.getClass().getClassLoader(),
+			true, /* checkpointing */
+			"task_name",
+			new UnregisteredMetricsGroup(),
+			schema,
+			new Properties(),
+			0L,
+			false);
+
+
+		// ----- run the fetcher -----
+
+		fetcher.runFetchLoop();
+		assertEquals(1, results.size());
+	}
+
 	// ------------------------------------------------------------------------
 	//  test utilities
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/afb4c5e0/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index e021881..76ce1a0 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -213,6 +213,10 @@ public abstract class AbstractFetcher<T, KPH> {
 	 * @param offset The offset of the record
 	 */
 	protected void emitRecord(T record, KafkaTopicPartitionState<KPH> partitionState,
long offset) throws Exception {
+		if (record == null) {
+			return;
+		}
+
 		if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
 			// fast path logic, in case there are no watermarks
 


Mime
View raw message