flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srich...@apache.org
Subject [01/26] flink git commit: [hotfix] Rename OperatorSnapshotResult to OperatorSnapshotFutures.
Date Sun, 25 Feb 2018 16:11:35 GMT
Repository: flink
Updated Branches:
  refs/heads/master 4e7f03e41 -> f9a583b72


[hotfix] Rename OperatorSnapshotResult to OperatorSnapshotFutures.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ea0d16d4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ea0d16d4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ea0d16d4

Branch: refs/heads/master
Commit: ea0d16d4bac639dc4858a6c5cef209e904e655ef
Parents: 617e67c
Author: Stefan Richter <s.richter@data-artisans.com>
Authored: Thu Feb 22 16:14:39 2018 +0100
Committer: Stefan Richter <s.richter@data-artisans.com>
Committed: Sun Feb 25 15:10:28 2018 +0100

----------------------------------------------------------------------
 .../api/operators/AbstractStreamOperator.java   |   4 +-
 .../api/operators/OperatorSnapshotFutures.java  | 134 +++++++++++++++++++
 .../api/operators/OperatorSnapshotResult.java   | 134 -------------------
 .../streaming/api/operators/StreamOperator.java |   2 +-
 .../streaming/runtime/tasks/StreamTask.java     |  18 +--
 .../operators/AbstractStreamOperatorTest.java   |   4 +-
 .../operators/OperatorSnapshotFuturesTest.java  |  82 ++++++++++++
 .../operators/OperatorSnapshotResultTest.java   |  82 ------------
 .../streaming/runtime/tasks/StreamTaskTest.java |  18 +--
 .../util/AbstractStreamOperatorTestHarness.java |   4 +-
 10 files changed, 241 insertions(+), 241 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ea0d16d4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 42b6923..4f16259 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -347,7 +347,7 @@ public abstract class AbstractStreamOperator<OUT>
 	}
 
 	@Override
-	public final OperatorSnapshotResult snapshotState(
+	public final OperatorSnapshotFutures snapshotState(
 			long checkpointId,
 			long timestamp,
 			CheckpointOptions checkpointOptions,
@@ -356,7 +356,7 @@ public abstract class AbstractStreamOperator<OUT>
 		KeyGroupRange keyGroupRange = null != keyedStateBackend ?
 				keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
 
-		OperatorSnapshotResult snapshotInProgress = new OperatorSnapshotResult();
+		OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();
 
 		try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
 				checkpointId,

http://git-wip-us.apache.org/repos/asf/flink/blob/ea0d16d4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFutures.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFutures.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFutures.java
new file mode 100644
index 0000000..bdaf64b
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFutures.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.util.ExceptionUtils;
+
+import java.util.concurrent.RunnableFuture;
+
+/**
+ * Result of {@link StreamOperator#snapshotState}.
+ */
+public class OperatorSnapshotFutures {
+
+	private RunnableFuture<KeyedStateHandle> keyedStateManagedFuture;
+	private RunnableFuture<KeyedStateHandle> keyedStateRawFuture;
+	private RunnableFuture<OperatorStateHandle> operatorStateManagedFuture;
+	private RunnableFuture<OperatorStateHandle> operatorStateRawFuture;
+
+	public OperatorSnapshotFutures() {
+		this(null, null, null, null);
+	}
+
+	public OperatorSnapshotFutures(
+			RunnableFuture<KeyedStateHandle> keyedStateManagedFuture,
+			RunnableFuture<KeyedStateHandle> keyedStateRawFuture,
+			RunnableFuture<OperatorStateHandle> operatorStateManagedFuture,
+			RunnableFuture<OperatorStateHandle> operatorStateRawFuture) {
+		this.keyedStateManagedFuture = keyedStateManagedFuture;
+		this.keyedStateRawFuture = keyedStateRawFuture;
+		this.operatorStateManagedFuture = operatorStateManagedFuture;
+		this.operatorStateRawFuture = operatorStateRawFuture;
+	}
+
+	public RunnableFuture<KeyedStateHandle> getKeyedStateManagedFuture() {
+		return keyedStateManagedFuture;
+	}
+
+	public void setKeyedStateManagedFuture(RunnableFuture<KeyedStateHandle> keyedStateManagedFuture)
{
+		this.keyedStateManagedFuture = keyedStateManagedFuture;
+	}
+
+	public RunnableFuture<KeyedStateHandle> getKeyedStateRawFuture() {
+		return keyedStateRawFuture;
+	}
+
+	public void setKeyedStateRawFuture(RunnableFuture<KeyedStateHandle> keyedStateRawFuture)
{
+		this.keyedStateRawFuture = keyedStateRawFuture;
+	}
+
+	public RunnableFuture<OperatorStateHandle> getOperatorStateManagedFuture() {
+		return operatorStateManagedFuture;
+	}
+
+	public void setOperatorStateManagedFuture(RunnableFuture<OperatorStateHandle> operatorStateManagedFuture)
{
+		this.operatorStateManagedFuture = operatorStateManagedFuture;
+	}
+
+	public RunnableFuture<OperatorStateHandle> getOperatorStateRawFuture() {
+		return operatorStateRawFuture;
+	}
+
+	public void setOperatorStateRawFuture(RunnableFuture<OperatorStateHandle> operatorStateRawFuture)
{
+		this.operatorStateRawFuture = operatorStateRawFuture;
+	}
+
+	public void cancel() throws Exception {
+		Exception exception = null;
+
+		try {
+			StateUtil.discardStateFuture(getKeyedStateManagedFuture());
+		} catch (Exception e) {
+			exception = new Exception("Could not properly cancel managed keyed state future.", e);
+		}
+
+		try {
+			StateUtil.discardStateFuture(getOperatorStateManagedFuture());
+		} catch (Exception e) {
+			exception = ExceptionUtils.firstOrSuppressed(
+				new Exception("Could not properly cancel managed operator state future.", e),
+				exception);
+		}
+
+		try {
+			StateUtil.discardStateFuture(getKeyedStateRawFuture());
+		} catch (Exception e) {
+			exception = ExceptionUtils.firstOrSuppressed(
+				new Exception("Could not properly cancel raw keyed state future.", e),
+				exception);
+		}
+
+		try {
+			StateUtil.discardStateFuture(getOperatorStateRawFuture());
+		} catch (Exception e) {
+			exception = ExceptionUtils.firstOrSuppressed(
+				new Exception("Could not properly cancel raw operator state future.", e),
+				exception);
+		}
+
+		if (exception != null) {
+			throw exception;
+		}
+	}
+
+	public boolean hasKeyedState() {
+		return keyedStateManagedFuture != null || keyedStateRawFuture != null;
+	}
+
+	public boolean hasOperatorState() {
+		return operatorStateManagedFuture != null || operatorStateRawFuture != null;
+	}
+
+	public boolean hasState() {
+		return hasKeyedState() || hasOperatorState();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea0d16d4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
deleted file mode 100644
index 8c05ae9..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.StateUtil;
-import org.apache.flink.util.ExceptionUtils;
-
-import java.util.concurrent.RunnableFuture;
-
-/**
- * Result of {@link StreamOperator#snapshotState}.
- */
-public class OperatorSnapshotResult {
-
-	private RunnableFuture<KeyedStateHandle> keyedStateManagedFuture;
-	private RunnableFuture<KeyedStateHandle> keyedStateRawFuture;
-	private RunnableFuture<OperatorStateHandle> operatorStateManagedFuture;
-	private RunnableFuture<OperatorStateHandle> operatorStateRawFuture;
-
-	public OperatorSnapshotResult() {
-		this(null, null, null, null);
-	}
-
-	public OperatorSnapshotResult(
-			RunnableFuture<KeyedStateHandle> keyedStateManagedFuture,
-			RunnableFuture<KeyedStateHandle> keyedStateRawFuture,
-			RunnableFuture<OperatorStateHandle> operatorStateManagedFuture,
-			RunnableFuture<OperatorStateHandle> operatorStateRawFuture) {
-		this.keyedStateManagedFuture = keyedStateManagedFuture;
-		this.keyedStateRawFuture = keyedStateRawFuture;
-		this.operatorStateManagedFuture = operatorStateManagedFuture;
-		this.operatorStateRawFuture = operatorStateRawFuture;
-	}
-
-	public RunnableFuture<KeyedStateHandle> getKeyedStateManagedFuture() {
-		return keyedStateManagedFuture;
-	}
-
-	public void setKeyedStateManagedFuture(RunnableFuture<KeyedStateHandle> keyedStateManagedFuture)
{
-		this.keyedStateManagedFuture = keyedStateManagedFuture;
-	}
-
-	public RunnableFuture<KeyedStateHandle> getKeyedStateRawFuture() {
-		return keyedStateRawFuture;
-	}
-
-	public void setKeyedStateRawFuture(RunnableFuture<KeyedStateHandle> keyedStateRawFuture)
{
-		this.keyedStateRawFuture = keyedStateRawFuture;
-	}
-
-	public RunnableFuture<OperatorStateHandle> getOperatorStateManagedFuture() {
-		return operatorStateManagedFuture;
-	}
-
-	public void setOperatorStateManagedFuture(RunnableFuture<OperatorStateHandle> operatorStateManagedFuture)
{
-		this.operatorStateManagedFuture = operatorStateManagedFuture;
-	}
-
-	public RunnableFuture<OperatorStateHandle> getOperatorStateRawFuture() {
-		return operatorStateRawFuture;
-	}
-
-	public void setOperatorStateRawFuture(RunnableFuture<OperatorStateHandle> operatorStateRawFuture)
{
-		this.operatorStateRawFuture = operatorStateRawFuture;
-	}
-
-	public void cancel() throws Exception {
-		Exception exception = null;
-
-		try {
-			StateUtil.discardStateFuture(getKeyedStateManagedFuture());
-		} catch (Exception e) {
-			exception = new Exception("Could not properly cancel managed keyed state future.", e);
-		}
-
-		try {
-			StateUtil.discardStateFuture(getOperatorStateManagedFuture());
-		} catch (Exception e) {
-			exception = ExceptionUtils.firstOrSuppressed(
-				new Exception("Could not properly cancel managed operator state future.", e),
-				exception);
-		}
-
-		try {
-			StateUtil.discardStateFuture(getKeyedStateRawFuture());
-		} catch (Exception e) {
-			exception = ExceptionUtils.firstOrSuppressed(
-				new Exception("Could not properly cancel raw keyed state future.", e),
-				exception);
-		}
-
-		try {
-			StateUtil.discardStateFuture(getOperatorStateRawFuture());
-		} catch (Exception e) {
-			exception = ExceptionUtils.firstOrSuppressed(
-				new Exception("Could not properly cancel raw operator state future.", e),
-				exception);
-		}
-
-		if (exception != null) {
-			throw exception;
-		}
-	}
-
-	public boolean hasKeyedState() {
-		return keyedStateManagedFuture != null || keyedStateRawFuture != null;
-	}
-
-	public boolean hasOperatorState() {
-		return operatorStateManagedFuture != null || operatorStateRawFuture != null;
-	}
-
-	public boolean hasState() {
-		return hasKeyedState() || hasOperatorState();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea0d16d4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index 8450396..c3254f6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -99,7 +99,7 @@ public interface StreamOperator<OUT> extends CheckpointListener, KeyContext,
Ser
 	 *
 	 * @throws Exception exception that happened during snapshotting.
 	 */
-	OperatorSnapshotResult snapshotState(
+	OperatorSnapshotFutures snapshotState(
 		long checkpointId,
 		long timestamp,
 		CheckpointOptions checkpointOptions,

http://git-wip-us.apache.org/repos/asf/flink/blob/ea0d16d4/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 03c23a5..06cb18b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -46,7 +46,7 @@ import org.apache.flink.runtime.util.OperatorSubtaskDescriptionText;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
@@ -804,7 +804,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 		private final StreamTask<?, ?> owner;
 
-		private final Map<OperatorID, OperatorSnapshotResult> operatorSnapshotsInProgress;
+		private final Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress;
 
 		private final CheckpointMetaData checkpointMetaData;
 		private final CheckpointMetrics checkpointMetrics;
@@ -816,7 +816,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 		AsyncCheckpointRunnable(
 			StreamTask<?, ?> owner,
-			Map<OperatorID, OperatorSnapshotResult> operatorSnapshotsInProgress,
+			Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress,
 			CheckpointMetaData checkpointMetaData,
 			CheckpointMetrics checkpointMetrics,
 			long asyncStartNanos) {
@@ -838,10 +838,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 				final TaskStateSnapshot taskOperatorSubtaskStates =
 					new TaskStateSnapshot(operatorSnapshotsInProgress.size());
 
-				for (Map.Entry<OperatorID, OperatorSnapshotResult> entry : operatorSnapshotsInProgress.entrySet())
{
+				for (Map.Entry<OperatorID, OperatorSnapshotFutures> entry : operatorSnapshotsInProgress.entrySet())
{
 
 					OperatorID operatorID = entry.getKey();
-					OperatorSnapshotResult snapshotInProgress = entry.getValue();
+					OperatorSnapshotFutures snapshotInProgress = entry.getValue();
 
 					OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState(
 						FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateManagedFuture()),
@@ -927,7 +927,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 				Exception exception = null;
 
 				// clean up ongoing operator snapshot results and non partitioned state handles
-				for (OperatorSnapshotResult operatorSnapshotResult : operatorSnapshotsInProgress.values())
{
+				for (OperatorSnapshotFutures operatorSnapshotResult : operatorSnapshotsInProgress.values())
{
 					if (operatorSnapshotResult != null) {
 						try {
 							operatorSnapshotResult.cancel();
@@ -971,7 +971,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 		// ------------------------
 
-		private final Map<OperatorID, OperatorSnapshotResult> operatorSnapshotsInProgress;
+		private final Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress;
 
 		public CheckpointingOperation(
 				StreamTask<?, ?> owner,
@@ -1026,7 +1026,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 				}
 			} catch (Exception ex) {
 				// Cleanup to release resources
-				for (OperatorSnapshotResult operatorSnapshotResult : operatorSnapshotsInProgress.values())
{
+				for (OperatorSnapshotFutures operatorSnapshotResult : operatorSnapshotsInProgress.values())
{
 					if (null != operatorSnapshotResult) {
 						try {
 							operatorSnapshotResult.cancel();
@@ -1052,7 +1052,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
 			if (null != op) {
 
-				OperatorSnapshotResult snapshotInProgress = op.snapshotState(
+				OperatorSnapshotFutures snapshotInProgress = op.snapshotState(
 						checkpointMetaData.getCheckpointId(),
 						checkpointMetaData.getTimestamp(),
 						checkpointOptions,

http://git-wip-us.apache.org/repos/asf/flink/blob/ea0d16d4/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index 46cae27..85069b5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -576,10 +576,10 @@ public class AbstractStreamOperatorTest {
 		when(context.getKeyedStateStreamFuture()).thenReturn(futureKeyedStateHandle);
 		when(context.getOperatorStateStreamFuture()).thenReturn(futureOperatorStateHandle);
 
-		OperatorSnapshotResult operatorSnapshotResult = spy(new OperatorSnapshotResult());
+		OperatorSnapshotFutures operatorSnapshotResult = spy(new OperatorSnapshotFutures());
 
 		whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn(context);
-		whenNew(OperatorSnapshotResult.class).withAnyArguments().thenReturn(operatorSnapshotResult);
+		whenNew(OperatorSnapshotFutures.class).withAnyArguments().thenReturn(operatorSnapshotResult);
 
 		CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class);
 		StreamTask<Void, AbstractStreamOperator<Void>> containingTask = mock(StreamTask.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/ea0d16d4/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFuturesTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFuturesTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFuturesTest.java
new file mode 100644
index 0000000..6da39af
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFuturesTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.concurrent.RunnableFuture;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Tests for {@link OperatorSnapshotFutures}.
+ */
+public class OperatorSnapshotFuturesTest extends TestLogger {
+
+	/**
+	 * Tests that all runnable futures in an OperatorSnapshotResult are properly cancelled and
if
+	 * the StreamStateHandle result is retrievable that the state handle are discarded.
+	 */
+	@Test
+	public void testCancelAndCleanup() throws Exception {
+		OperatorSnapshotFutures operatorSnapshotResult = new OperatorSnapshotFutures();
+
+		operatorSnapshotResult.cancel();
+
+		KeyedStateHandle keyedManagedStateHandle = mock(KeyedStateHandle.class);
+		RunnableFuture<KeyedStateHandle> keyedStateManagedFuture = mock(RunnableFuture.class);
+		when(keyedStateManagedFuture.get()).thenReturn(keyedManagedStateHandle);
+
+		KeyedStateHandle keyedRawStateHandle = mock(KeyedStateHandle.class);
+		RunnableFuture<KeyedStateHandle> keyedStateRawFuture = mock(RunnableFuture.class);
+		when(keyedStateRawFuture.get()).thenReturn(keyedRawStateHandle);
+
+		OperatorStateHandle operatorManagedStateHandle = mock(OperatorStateHandle.class);
+		RunnableFuture<OperatorStateHandle> operatorStateManagedFuture = mock(RunnableFuture.class);
+		when(operatorStateManagedFuture.get()).thenReturn(operatorManagedStateHandle);
+
+		OperatorStateHandle operatorRawStateHandle = mock(OperatorStateHandle.class);
+		RunnableFuture<OperatorStateHandle> operatorStateRawFuture = mock(RunnableFuture.class);
+		when(operatorStateRawFuture.get()).thenReturn(operatorRawStateHandle);
+
+		operatorSnapshotResult = new OperatorSnapshotFutures(
+			keyedStateManagedFuture,
+			keyedStateRawFuture,
+			operatorStateManagedFuture,
+			operatorStateRawFuture);
+
+		operatorSnapshotResult.cancel();
+
+		verify(keyedStateManagedFuture).cancel(true);
+		verify(keyedStateRawFuture).cancel(true);
+		verify(operatorStateManagedFuture).cancel(true);
+		verify(operatorStateRawFuture).cancel(true);
+
+		verify(keyedManagedStateHandle).discardState();
+		verify(keyedRawStateHandle).discardState();
+		verify(operatorManagedStateHandle).discardState();
+		verify(operatorRawStateHandle).discardState();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea0d16d4/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java
deleted file mode 100644
index 5a7e69e..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import java.util.concurrent.RunnableFuture;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.powermock.api.mockito.PowerMockito.when;
-
-/**
- * Tests for {@link OperatorSnapshotResult}.
- */
-public class OperatorSnapshotResultTest extends TestLogger {
-
-	/**
-	 * Tests that all runnable futures in an OperatorSnapshotResult are properly cancelled and
if
-	 * the StreamStateHandle result is retrievable that the state handle are discarded.
-	 */
-	@Test
-	public void testCancelAndCleanup() throws Exception {
-		OperatorSnapshotResult operatorSnapshotResult = new OperatorSnapshotResult();
-
-		operatorSnapshotResult.cancel();
-
-		KeyedStateHandle keyedManagedStateHandle = mock(KeyedStateHandle.class);
-		RunnableFuture<KeyedStateHandle> keyedStateManagedFuture = mock(RunnableFuture.class);
-		when(keyedStateManagedFuture.get()).thenReturn(keyedManagedStateHandle);
-
-		KeyedStateHandle keyedRawStateHandle = mock(KeyedStateHandle.class);
-		RunnableFuture<KeyedStateHandle> keyedStateRawFuture = mock(RunnableFuture.class);
-		when(keyedStateRawFuture.get()).thenReturn(keyedRawStateHandle);
-
-		OperatorStateHandle operatorManagedStateHandle = mock(OperatorStateHandle.class);
-		RunnableFuture<OperatorStateHandle> operatorStateManagedFuture = mock(RunnableFuture.class);
-		when(operatorStateManagedFuture.get()).thenReturn(operatorManagedStateHandle);
-
-		OperatorStateHandle operatorRawStateHandle = mock(OperatorStateHandle.class);
-		RunnableFuture<OperatorStateHandle> operatorStateRawFuture = mock(RunnableFuture.class);
-		when(operatorStateRawFuture.get()).thenReturn(operatorRawStateHandle);
-
-		operatorSnapshotResult = new OperatorSnapshotResult(
-			keyedStateManagedFuture,
-			keyedStateRawFuture,
-			operatorStateManagedFuture,
-			operatorStateRawFuture);
-
-		operatorSnapshotResult.cancel();
-
-		verify(keyedStateManagedFuture).cancel(true);
-		verify(keyedStateRawFuture).cancel(true);
-		verify(operatorStateManagedFuture).cancel(true);
-		verify(operatorStateRawFuture).cancel(true);
-
-		verify(keyedManagedStateHandle).discardState();
-		verify(keyedRawStateHandle).discardState();
-		verify(operatorManagedStateHandle).discardState();
-		verify(operatorRawStateHandle).discardState();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea0d16d4/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 52295fb..99d4e5b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -94,7 +94,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
-import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperatorStateContext;
@@ -341,8 +341,8 @@ public class StreamTaskTest extends TestLogger {
 		StreamOperator<?> streamOperator3 = mock(StreamOperator.class);
 
 		// mock the returned snapshots
-		OperatorSnapshotResult operatorSnapshotResult1 = mock(OperatorSnapshotResult.class);
-		OperatorSnapshotResult operatorSnapshotResult2 = mock(OperatorSnapshotResult.class);
+		OperatorSnapshotFutures operatorSnapshotResult1 = mock(OperatorSnapshotFutures.class);
+		OperatorSnapshotFutures operatorSnapshotResult2 = mock(OperatorSnapshotFutures.class);
 
 		final Exception testException = new Exception("Test exception");
 
@@ -410,9 +410,9 @@ public class StreamTaskTest extends TestLogger {
 		StreamOperator<?> streamOperator3 = mock(StreamOperator.class);
 
 		// mock the new state operator snapshots
-		OperatorSnapshotResult operatorSnapshotResult1 = mock(OperatorSnapshotResult.class);
-		OperatorSnapshotResult operatorSnapshotResult2 = mock(OperatorSnapshotResult.class);
-		OperatorSnapshotResult operatorSnapshotResult3 = mock(OperatorSnapshotResult.class);
+		OperatorSnapshotFutures operatorSnapshotResult1 = mock(OperatorSnapshotFutures.class);
+		OperatorSnapshotFutures operatorSnapshotResult2 = mock(OperatorSnapshotFutures.class);
+		OperatorSnapshotFutures operatorSnapshotResult3 = mock(OperatorSnapshotFutures.class);
 
 		RunnableFuture<OperatorStateHandle> failingFuture = mock(RunnableFuture.class);
 		when(failingFuture.get()).thenThrow(new ExecutionException(new Exception("Test exception")));
@@ -520,7 +520,7 @@ public class StreamTaskTest extends TestLogger {
 		OperatorStateHandle managedOperatorStateHandle = mock(OperatorStateHandle.class);
 		OperatorStateHandle rawOperatorStateHandle = mock(OperatorStateHandle.class);
 
-		OperatorSnapshotResult operatorSnapshotResult = new OperatorSnapshotResult(
+		OperatorSnapshotFutures operatorSnapshotResult = new OperatorSnapshotFutures(
 			new DoneFuture<>(managedKeyedStateHandle),
 			new DoneFuture<>(rawKeyedStateHandle),
 			new DoneFuture<>(managedOperatorStateHandle),
@@ -635,7 +635,7 @@ public class StreamTaskTest extends TestLogger {
 		OperatorStateHandle managedOperatorStateHandle = mock(OperatorStateHandle.class);
 		OperatorStateHandle rawOperatorStateHandle = mock(OperatorStateHandle.class);
 
-		OperatorSnapshotResult operatorSnapshotResult = new OperatorSnapshotResult(
+		OperatorSnapshotFutures operatorSnapshotResult = new OperatorSnapshotFutures(
 			new DoneFuture<>(managedKeyedStateHandle),
 			new DoneFuture<>(rawKeyedStateHandle),
 			new DoneFuture<>(managedOperatorStateHandle),
@@ -739,7 +739,7 @@ public class StreamTaskTest extends TestLogger {
 		when(statelessOperator.getOperatorID()).thenReturn(operatorID);
 
 		// mock the returned empty snapshot result (all state handles are null)
-		OperatorSnapshotResult statelessOperatorSnapshotResult = new OperatorSnapshotResult();
+		OperatorSnapshotFutures statelessOperatorSnapshotResult = new OperatorSnapshotFutures();
 		when(statelessOperator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class),
any(CheckpointStreamFactory.class)))
 				.thenReturn(statelessOperatorSnapshotResult);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ea0d16d4/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index d38cb28..28ad930 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -47,7 +47,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperatorTest;
-import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
@@ -470,7 +470,7 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable
{
 	 */
 	public OperatorSubtaskState snapshot(long checkpointId, long timestamp) throws Exception
{
 
-		OperatorSnapshotResult operatorStateResult = operator.snapshotState(
+		OperatorSnapshotFutures operatorStateResult = operator.snapshotState(
 			checkpointId,
 			timestamp,
 			CheckpointOptions.forCheckpointWithDefaultLocation(),


Mime
View raw message