flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/2] flink git commit: [FLINK-2647] [streaming core] Distinguish between "close" (flushing buffered data) and "dispose" (cleanup resources) in streaming operators.
Date Fri, 11 Sep 2015 11:48:25 GMT
Repository: flink
Updated Branches:
  refs/heads/master aec4b1575 -> a492ed922


[FLINK-2647] [streaming core] Distinguish between "close" (flushing buffered data) and "dispose"
(cleanup resources) in streaming operators.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/407d74fd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/407d74fd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/407d74fd

Branch: refs/heads/master
Commit: 407d74fd004bbd11febfa553c381e828115d8bfb
Parents: aec4b15
Author: Stephan Ewen <sewen@apache.org>
Authored: Wed Sep 9 20:12:09 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri Sep 11 11:29:43 2015 +0200

----------------------------------------------------------------------
 .../wrappers/StormBoltWrapper.java              |  3 +-
 .../wrappers/StormBoltWrapperTest.java          |  6 +-
 .../api/operators/AbstractStreamOperator.java   | 29 +++++--
 .../operators/AbstractUdfStreamOperator.java    | 24 +++++-
 .../streaming/api/operators/StreamOperator.java | 57 ++++++++++----
 .../windowing/GroupedActiveDiscretizer.java     |  4 +-
 .../operators/windowing/StreamDiscretizer.java  |  7 ++
 .../streaming/runtime/tasks/StreamTask.java     | 81 ++++++++++++--------
 8 files changed, 152 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/407d74fd/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
index 05a4902..c4ba9ba 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
@@ -209,8 +209,7 @@ public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT>
imple
 	}
 
 	@Override
-	public void close() throws Exception {
-		super.close();
+	public void dispose() {
 		this.bolt.cleanup();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/407d74fd/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
index 2491486..db34096 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.api.mockito.PowerMockito;
@@ -43,9 +44,6 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import java.util.HashSet;
 import java.util.Map;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.isNull;
 import static org.mockito.Mockito.*;
 
 @RunWith(PowerMockRunner.class)
@@ -236,6 +234,8 @@ public class StormBoltWrapperTest extends AbstractTest {
 		wrapper.setup(mock(Output.class), taskContext);
 
 		wrapper.close();
+		wrapper.dispose();
+		
 		verify(bolt).cleanup();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/407d74fd/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 3956d75..07d8312 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -32,17 +32,22 @@ public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>
 
 	private static final long serialVersionUID = 1L;
 
+	
 	protected transient StreamingRuntimeContext runtimeContext;
 
 	protected transient ExecutionConfig executionConfig;
 
-	public transient Output<StreamRecord<OUT>> output;
+	protected transient Output<StreamRecord<OUT>> output;
 
 	protected boolean inputCopyDisabled = false;
 
 	// A sane default for most operators
 	protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;
 
+	// ------------------------------------------------------------------------
+	//  Life Cycle
+	// ------------------------------------------------------------------------
+	
 	@Override
 	public void setup(Output<StreamRecord<OUT>> output, StreamingRuntimeContext
runtimeContext) {
 		this.output = output;
@@ -50,14 +55,28 @@ public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>
 		this.runtimeContext = runtimeContext;
 	}
 
+	/**
+	 * This default implementation of the interface method does nothing.
+	 */
 	@Override
-	public void open(Configuration parameters) throws Exception {
-	}
+	public void open(Configuration parameters) throws Exception {}
 
+	/**
+	 * This default implementation of the interface method does nothing.
+	 */
 	@Override
-	public void close() throws Exception {
-	}
+	public void close() throws Exception {}
 
+	/**
+	 * This default implementation of the interface method does nothing.
+	 */
+	@Override
+	public void dispose() {}
+	
+	// ------------------------------------------------------------------------
+	//  Context and chaining properties
+	// ------------------------------------------------------------------------
+	
 	@Override
 	public final void setChainingStrategy(ChainingStrategy strategy) {
 		this.chainingStrategy = strategy;

http://git-wip-us.apache.org/repos/asf/flink/blob/407d74fd/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index dae1bf0..f679d5f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -36,6 +36,8 @@ import org.apache.flink.streaming.api.state.PartitionedStreamOperatorState;
 import org.apache.flink.streaming.api.state.StreamOperatorState;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This is used as the base class for operators that have a user-defined
@@ -46,11 +48,17 @@ import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
  * @param <F>
  *            The type of the user function
  */
-public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serializable>
extends AbstractStreamOperator<OUT> implements StatefulStreamOperator<OUT> {
+public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serializable>

+		extends AbstractStreamOperator<OUT> implements StatefulStreamOperator<OUT>
{
 
 	private static final long serialVersionUID = 1L;
+	
+	private static final Logger LOG = LoggerFactory.getLogger(AbstractUdfStreamOperator.class);
+	
 
 	protected final F userFunction;
+	
+	private boolean functionsClosed = false;
 
 	public AbstractUdfStreamOperator(F userFunction) {
 		this.userFunction = userFunction;
@@ -72,10 +80,24 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function
& Serial
 	@Override
 	public void close() throws Exception {
 		super.close();
+		functionsClosed = true;
 		FunctionUtils.closeFunction(userFunction);
 	}
 
 	@Override
+	public void dispose() {
+		if (!functionsClosed) {
+			functionsClosed = true;
+			try {
+				FunctionUtils.closeFunction(userFunction);
+			}
+			catch (Throwable t) {
+				LOG.error("Exception while closing user function while failing or canceling task", t);
+			}
+		}
+	}
+
+	@Override
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	public void restoreInitialState(Tuple2<StateHandle<Serializable>, Map<String,
OperatorStateHandle>> snapshots) throws Exception {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/407d74fd/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index 9dd18b2..0706c07 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -27,40 +27,71 @@ import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
  * Basic interface for stream operators. Implementers would implement one of
  * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator} or
  * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator} to create operators
- * that process elements. You can use
- * {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator} as a base class
for
- * custom operators.
+ * that process elements.
+ * 
+ * The class {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator}
+ * offers default implementation for the lifecycle and properties methods.
  * 
  * @param <OUT> The output type of the operator
  */
 public interface StreamOperator<OUT> extends Serializable {
 
+	// ------------------------------------------------------------------------
+	//  Life Cycle
+	// ------------------------------------------------------------------------
+	
 	/**
-	 * Initializes the {@link StreamOperator} for input and output handling.
+	 * Initializes the operator. Sets access to the context and the output.
 	 */
-	public void setup(Output<StreamRecord<OUT>> output, StreamingRuntimeContext
runtimeContext);
+	void setup(Output<StreamRecord<OUT>> output, StreamingRuntimeContext runtimeContext);
 
 	/**
-	 * This method is called before any elements are processed.
+	 * This method is called immediately before any elements are processed, it should contain
the
+	 * operator's initialization logic.
+	 * 
+	 * @throws java.lang.Exception An exception in this method causes the operator to fail.
 	 */
-	public void open(Configuration config) throws Exception;
+	void open(Configuration config) throws Exception;
 
 	/**
-	 * This method is called after no more elements for can arrive for processing.
+	 * This method is called after all records have been added to the operators via the methods
+	 * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator#processElement(StreamRecord)},
or
+	 * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement1(StreamRecord)}
and
+	 * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement2(StreamRecord)}.
+	 * <p>
+	 * The method is expected to flush all remaining buffered data. Exceptions during this flushing
+	 * of buffered should be propagated, in order to cause the operation to be recognized asa
failed,
+	 * because the last data items are not processed properly.
+	 * 
+	 * @throws java.lang.Exception An exception in this method causes the operator to fail.
 	 */
-	public void close() throws Exception;
+	void close() throws Exception;
+
+	/**
+	 * This method is called at the very end of the operator's life, both in the case of a successful
+	 * completion of the operation, and in the case of a failure and canceling.
+	 * 
+	 * This method is expected to make a thorough effort to release all resources
+	 * that the operator has acquired.
+	 */
+	void dispose();
+	
+
+	// ------------------------------------------------------------------------
+	//  Context and chaining properties
+	// ------------------------------------------------------------------------
 	
-	public StreamingRuntimeContext getRuntimeContext();
+	StreamingRuntimeContext getRuntimeContext();
 
 	/**
 	 * An operator can return true here to disable copying of its input elements. This overrides
 	 * the object-reuse setting on the {@link org.apache.flink.api.common.ExecutionConfig}
 	 */
-	public boolean isInputCopyingDisabled();
+	boolean isInputCopyingDisabled();
 
-	public void setChainingStrategy(ChainingStrategy strategy);
+	void setChainingStrategy(ChainingStrategy strategy);
 
-	public ChainingStrategy getChainingStrategy();
+	ChainingStrategy getChainingStrategy();
 
 	/**
 	 * Defines the chaining scheme for the operator. By default <b>ALWAYS</b> is
used,

http://git-wip-us.apache.org/repos/asf/flink/blob/407d74fd/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
index 0de16b2..5141598 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
@@ -79,9 +79,7 @@ public class GroupedActiveDiscretizer<IN> extends GroupedStreamDiscretizer<IN>
{
 	}
 
 	@Override
-	public void close() throws Exception {
-		super.close();
-
+	public void dispose() {
 		try {
 			centralCheck.running = false;
 			centralThread.interrupt();

http://git-wip-us.apache.org/repos/asf/flink/blob/407d74fd/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java
index 47c2323..a5e00aa 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java
@@ -185,6 +185,13 @@ public class StreamDiscretizer<IN>
 		emitWindow();
 	}
 
+	@Override
+	public void dispose() {
+		if (activePolicyThread != null) {
+			activePolicyThread.interrupt();
+		}
+	}
+
 	/**
 	 * This class allows the active trigger thread to call back and push fake
 	 * elements at any time.

http://git-wip-us.apache.org/repos/asf/flink/blob/407d74fd/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index a70fb31..d5bdce2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -162,60 +162,56 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>>
extends Abs
 	public final void invoke() throws Exception {
 		LOG.debug("Invoking {}", getName());
 		
-		boolean operatorOpen = false;
+		boolean disposed = false;
 		try {
 			openAllOperators();
-			operatorOpen = true;
 
 			// let the task do its work
 			isRunning = true;
 			run();
+			isRunning = false;
 			
 			if (LOG.isDebugEnabled()) {
 				LOG.debug("Finished task {}", getName());
 			}
-
-			// make sure no further checkpoint and notification actions happen
-			// for that we set this task as not running and make sure no other thread is
-			// currently in the locked scope before we close the operators
-			this.isRunning = false;
+			
+			// make sure no further checkpoint and notification actions happen.
+			// we make sure that no other thread is currently in the locked scope before
+			// we close the operators by trying to acquire the checkpoint scope lock
 			synchronized (checkpointLock) {}
 			
 			// this is part of the main logic, so if this fails, the task is considered failed
 			closeAllOperators();
-			operatorOpen = false;
 			
-			// make sure all data if flushed
+			// make sure all data is flushed
 			outputHandler.flushOutputs();
+
+			// make an attempt to dispose the operators such that failures in the dispose call
+			// still let the computation fail
+			tryDisposeAllOperators();
+			disposed = true;
 		}
 		finally {
-			this.isRunning = false;
+			isRunning = false;
 			
+			// release the output resources. this method should never fail.
+			if (outputHandler != null) {
+				outputHandler.releaseOutputs();
+			}
+
+			// we must! perform this cleanup
+
 			try {
-				if (operatorOpen) {
-					// we came here in a failure
-					closeAllOperators();
-				}
+				cleanup();
 			}
 			catch (Throwable t) {
-				LOG.error("Error closing stream operators after an exception.", t);
-				
+				// catch and log the exception to not replace the original exception
+				LOG.error("Error during cleanup of stream task.");
 			}
-			finally {
-				// we must! perform this cleanup
-				
-				// release the output resources
-				if (outputHandler != null) {
-					outputHandler.releaseOutputs();
-				}
-
-				// release this operator's resources
-				try {
-					cleanup();
-				}
-				catch (Throwable t) {
-					LOG.error("Error during cleanup of stream task.");
-				}
+			
+			// if the operators were not disposed before, do a hard dispose
+			if (!disposed) {
+				disposeAllOperators();
 			}
 		}
 	}
@@ -237,7 +233,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>>
extends Abs
 	private void closeAllOperators() throws Exception {
 		// We need to close them first to last, since upstream operators in the chain might emit
 		// elements in their close methods.
-		for (int i = outputHandler.getChainedOperators().size()-1; i >= 0; i--) {
+		for (int i = outputHandler.getChainedOperators().size() - 1; i >= 0; i--) {
 			StreamOperator<?> operator = outputHandler.getChainedOperators().get(i);
 			if (operator != null) {
 				operator.close();
@@ -245,6 +241,27 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>>
extends Abs
 		}
 	}
 
+	private void tryDisposeAllOperators() throws Exception {
+		for (StreamOperator<?> operator : outputHandler.getChainedOperators()) {
+			if (operator != null) {
+				operator.dispose();
+			}
+		}
+	}
+	
+	private void disposeAllOperators() {
+		for (StreamOperator<?> operator : outputHandler.getChainedOperators()) {
+			if (operator != null) {
+				try {
+					operator.dispose();
+				}
+				catch (Throwable t) {
+					LOG.error("Error during disposal of stream operator.", t);
+				}
+			}
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  Access to properties and utilities
 	// ------------------------------------------------------------------------


Mime
View raw message