flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/8] flink git commit: [FLINK-2420] [streaming] StreamRecordWriter properly reports exceptions on flush.
Date Tue, 28 Jul 2015 22:15:44 GMT
[FLINK-2420] [streaming] StreamRecordWriter properly reports exceptions on flush.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8ba32133
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8ba32133
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8ba32133

Branch: refs/heads/master
Commit: 8ba321332b994579f387add8bd0855bd29cb33ec
Parents: a0556ef
Author: Stephan Ewen <sewen@apache.org>
Authored: Tue Jul 28 15:38:20 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Tue Jul 28 22:58:06 2015 +0200

----------------------------------------------------------------------
 .../runtime/io/RecordWriterOutput.java          |  13 +-
 .../runtime/io/StreamRecordWriter.java          | 142 ++++++++++++++-----
 .../runtime/io/DummyBufferRecycler.java         |  34 +++++
 .../runtime/io/SpillingBufferOrEventTest.java   |   4 +-
 .../runtime/io/StreamRecordWriterTest.java      | 132 +++++++++++++++++
 .../checkpointing/StateCheckpoinedITCase.java   |   4 +-
 .../StreamCheckpointingITCase.java              |  28 ++--
 7 files changed, 296 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8ba32133/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index b656bb5..f7d8d47 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -94,15 +94,16 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>>
{
 
 	@Override
 	public void close() {
-		if (recordWriter instanceof StreamRecordWriter) {
-			((StreamRecordWriter<?>) recordWriter).close();
-		} else {
-			try {
+		try {
+			if (recordWriter instanceof StreamRecordWriter) {
+				((StreamRecordWriter<?>) recordWriter).close();
+			} else {
 				recordWriter.flush();
-			} catch (IOException e) {
-				e.printStackTrace();
 			}
 		}
+		catch (IOException e) {
+			throw new RuntimeException("Failed to flush final output", e);
+		}
 	}
 
 	public void clearBuffers() {

http://git-wip-us.apache.org/repos/asf/flink/blob/8ba32133/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
index abae9a4..b0e2532 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
@@ -23,38 +23,61 @@ import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
 
-public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWriter<T>
{
-
-	private long timeout;
-	private boolean flushAlways = false;
+import static com.google.common.base.Preconditions.checkArgument;
 
-	private OutputFlusher outputFlusher;
+/**
+ * This record writer keeps data in buffers at most for a certain timeout. It spawns a separate
thread
+ * that flushes the outputs in a defined interval, to make sure data does not linger in the
buffers for too long.
+ * 
+ * @param <T> The type of elements written.
+ */
+public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWriter<T>
{
 
-	public StreamRecordWriter(ResultPartitionWriter writer) {
-		this(writer, new RoundRobinChannelSelector<T>(), 1000);
-	}
+	/** Default name for teh output flush thread, if no name with a task reference is given
*/
+	private static final String DEFAULT_OUTPUT_FLUSH_THREAD_NAME = "OutputFlusher";
+	
+	
+	/** The thread that periodically flushes the output, to give an upper latency bound */
+	private final OutputFlusher outputFlusher;
+	
+	/** Flag indicating whether the output should be flushed after every element */
+	private final boolean flushAlways;
 
-	public StreamRecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector)
{
-		this(writer, channelSelector, 1000);
+	/** The exception encountered in the flushing thread */
+	private Throwable flusherException;
+	
+	
+	
+	public StreamRecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector,
long timeout) {
+		this(writer, channelSelector, timeout, null);
 	}
-
+	
 	public StreamRecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector,
-			long timeout) {
+								long timeout, String taskName) {
+		
 		super(writer, channelSelector);
-
-		this.timeout = timeout;
+		
+		checkArgument(timeout < 0);
+		
 		if (timeout == 0) {
 			flushAlways = true;
-		} else {
-			this.outputFlusher = new OutputFlusher();
+			outputFlusher = null;
+		}
+		else {
+			flushAlways = false;
+
+			String threadName = taskName == null ?
+								DEFAULT_OUTPUT_FLUSH_THREAD_NAME : "Output Timeout Flusher - " + taskName;
+			
+			outputFlusher = new OutputFlusher(threadName, timeout);
 			outputFlusher.start();
 		}
 	}
 	
 	@Override
 	public void emit(T record) throws IOException, InterruptedException {
+		checkErroneous();
 		super.emit(record);
 		if (flushAlways) {
 			flush();
@@ -63,46 +86,101 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends
RecordWrit
 
 	@Override
 	public void broadcastEmit(T record) throws IOException, InterruptedException {
+		checkErroneous();
 		super.broadcastEmit(record);
 		if (flushAlways) {
 			flush();
 		}
 	}
 
-	public void close() {
-		try {
-			if (outputFlusher != null) {
+	/**
+	 * Closes the writer. This stops the flushing thread (if there is one) and flushes all pending
outputs.
+	 * 
+	 * @throws IOException I/O errors may happen during the final flush of the buffers.
+	 */
+	public void close() throws IOException {
+		// propagate exceptions
+		flush();
+		
+		if (outputFlusher != null) {
+			try {
 				outputFlusher.terminate();
 				outputFlusher.join();
 			}
+			catch (InterruptedException e) {
+				// ignore on close
+			}
+		}
 
-			flush();
-		} catch (IOException e) {
-			throw new RuntimeException(e);
-		} catch (InterruptedException e) {
-			// Do nothing here
+		// final check for asynchronous errors, before we exit with a green light
+		checkErroneous();
+	}
+
+	/**
+	 * Notifies the writer that teh output flusher thread encountered an exception.
+	 * 
+	 * @param t The exception to report.
+	 */
+	void notifyFlusherException(Throwable t) {
+		if (this.flusherException == null) {
+			this.flusherException = t;
+		}
+	}
+	
+	private void checkErroneous() throws IOException {
+		if (flusherException != null) {
+			throw new IOException("An exception happened while flushing the outputs", flusherException);
 		}
 	}
 
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * A dedicated thread that periodically flushes the output buffers, to set upper latency
bounds.
+	 * 
+	 * The thread is daemonic, because it is only a utility thread.
+	 */
 	private class OutputFlusher extends Thread {
+		
+		private final long timeout;
+		
 		private volatile boolean running = true;
 
+		
+		OutputFlusher(String name, long timeout) {
+			super(name);
+			setDaemon(true);
+			this.timeout = timeout;
+		}
+		
 		public void terminate() {
 			running = false;
+			interrupt();
 		}
 
 		@Override
 		public void run() {
-			while (running) {
-				try {
+			try {
+				while (running) {
+					try {
+						Thread.sleep(timeout);
+					}
+					catch (InterruptedException e) {
+						// propagate this if we are still running, because it should not happen
+						// in that case
+						if (running) {
+							throw new Exception(e);
+						}
+					}
+					
+					// any errors here should let the thread come to a halt and be
+					// recognized by the writer 
 					flush();
-					Thread.sleep(timeout);
-				} catch (InterruptedException e) {
-					// Do nothing here
-				} catch (IOException e) {
-					throw new RuntimeException(e);
 				}
 			}
+			catch (Throwable t) {
+				notifyFlusherException(t);
+			}
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ba32133/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/DummyBufferRecycler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/DummyBufferRecycler.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/DummyBufferRecycler.java
new file mode 100644
index 0000000..23ca86d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/DummyBufferRecycler.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+
+/**
+ * A BufferRecycler that does nothing.
+ */
+public class DummyBufferRecycler implements BufferRecycler {
+
+	public static final BufferRecycler INSTANCE = new DummyBufferRecycler();
+
+
+	@Override
+	public void recycle(MemorySegment memorySegment) {}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/8ba32133/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java
index e0fab17..9934bd9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java
@@ -26,9 +26,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.streaming.runtime.io.BufferSpiller;
-import org.apache.flink.streaming.runtime.io.SpillReader;
-import org.apache.flink.streaming.runtime.io.SpillingBufferOrEvent;
+
 import org.junit.Test;
 
 public class SpillingBufferOrEventTest {

http://git-wip-us.apache.org/repos/asf/flink/blob/8ba32133/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
new file mode 100644
index 0000000..b5bece7
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.types.LongValue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.IOException;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * This test uses the PowerMockRunner runner to work around the fact that the 
+ * {@link ResultPartitionWriter} class is final.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ResultPartitionWriter.class)
+public class StreamRecordWriterTest {
+
+	/**
+	 * Verifies that exceptions during flush from the output flush thread are
+	 * recognized in the writer.
+	 */
+	@Test
+	public void testPropagateAsyncFlushError() {
+		FailingWriter<LongValue> testWriter = null;
+		try {
+			ResultPartitionWriter mockResultPartitionWriter = getMockWriter(5);
+			
+			// test writer that flushes every 5ms and fails after 3 flushes
+			testWriter = new FailingWriter<LongValue>(mockResultPartitionWriter,
+					new RoundRobinChannelSelector<LongValue>(), 5, 3);
+			
+			try {
+				long deadline = System.currentTimeMillis() + 20000; // in max 20 seconds (conservative)
+				long l = 0L;
+				
+				while (System.currentTimeMillis() < deadline) {
+					testWriter.emit(new LongValue(l++));
+				}
+				
+				fail("This should have failed with an exception");
+			}
+			catch (IOException e) {
+				assertNotNull(e.getCause());
+				assertTrue(e.getCause().getMessage().contains("Test Exception"));
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			if (testWriter != null) {
+				try {
+					testWriter.close();
+				}
+				catch (IOException e) {
+					// ignore in tests
+				}
+			}
+		}
+	}
+	
+	private static ResultPartitionWriter getMockWriter(int numPartitions) throws Exception {
+		BufferProvider mockProvider = mock(BufferProvider.class);
+		when(mockProvider.requestBufferBlocking()).thenAnswer(new Answer<Buffer>() {
+			@Override
+			public Buffer answer(InvocationOnMock invocation) {
+				return new Buffer(new MemorySegment(new byte[4096]), DummyBufferRecycler.INSTANCE);
+			}
+		});
+		
+		ResultPartitionWriter mockWriter = mock(ResultPartitionWriter.class);
+		when(mockWriter.getBufferProvider()).thenReturn(mockProvider);
+		when(mockWriter.getNumberOfOutputChannels()).thenReturn(numPartitions);
+		
+		
+		return mockWriter;
+	}
+	
+	
+	// ------------------------------------------------------------------------
+	
+	private static class FailingWriter<T extends IOReadableWritable> extends StreamRecordWriter<T>
{
+		
+		private int flushesBeforeException;
+		
+		private FailingWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector,
+								long timeout, int flushesBeforeException) {
+			super(writer, channelSelector, timeout);
+			this.flushesBeforeException = flushesBeforeException;
+		}
+
+		@Override
+		public void flush() throws IOException {
+			if (flushesBeforeException-- <= 0) {
+				throw new IOException("Test Exception");
+			}
+			super.flush();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ba32133/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
index 0693665..39ff2e5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
@@ -31,8 +31,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-
 import org.apache.flink.util.Collector;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -110,7 +110,7 @@ public class StateCheckpoinedITCase {
 																	"localhost", cluster.getJobManagerRPCPort());
 			env.setParallelism(PARALLELISM);
 			env.enableCheckpointing(500);
-			env.getConfig().enableSysoutLogging();
+			env.getConfig().disableSysoutLogging();
 
 			DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/8ba32133/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index 1730c63..438e980 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -18,40 +18,32 @@
 
 package org.apache.flink.test.checkpointing;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-
 import org.apache.flink.api.common.functions.RichFilterFunction;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-import org.apache.flink.util.Collector;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * A simple test that runs a streaming topology with checkpointing enabled.


Mime
View raw message