flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gyf...@apache.org
Subject [1/3] flink git commit: [FLINK-2324] [streaming] Added test for different StateHandle wrappers
Date Thu, 30 Jul 2015 14:45:38 GMT
Repository: flink
Updated Branches:
  refs/heads/master 1b3bdce5c -> 83e14cb15


[FLINK-2324] [streaming] Added test for different StateHandle wrappers

Closes #937


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/83e14cb1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/83e14cb1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/83e14cb1

Branch: refs/heads/master
Commit: 83e14cb15a41c417fd7d024c383a4f4d50ec5a19
Parents: 58cd4ea
Author: Gyula Fora <gyfora@apache.org>
Authored: Thu Jul 30 07:26:40 2015 +0200
Committer: Gyula Fora <gyfora@apache.org>
Committed: Thu Jul 30 16:44:53 2015 +0200

----------------------------------------------------------------------
 .../operators/AbstractUdfStreamOperator.java    |   8 +-
 .../api/state/OperatorStateHandle.java          |   4 +
 .../streaming/api/state/StateHandleTest.java    | 134 +++++++++++++++++++
 .../StreamCheckpointingITCase.java              |  14 +-
 4 files changed, 156 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/83e14cb1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index f21aacc..585b4ce 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -79,7 +79,7 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function
& Serial
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	public void restoreInitialState(Tuple2<StateHandle<Serializable>, Map<String,
OperatorStateHandle>> snapshots) throws Exception {
 		// Restore state using the Checkpointed interface
-		if (userFunction instanceof Checkpointed) {
+		if (userFunction instanceof Checkpointed && snapshots.f0 != null) {
 			((Checkpointed) userFunction).restoreState(snapshots.f0.getState());
 		}
 		
@@ -122,8 +122,10 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function
& Serial
 		// if the UDF implements the Checkpointed interface we draw a snapshot
 		if (userFunction instanceof Checkpointed) {
 			StateHandleProvider<Serializable> provider = runtimeContext.getStateHandleProvider();
-			checkpointedSnapshot = provider.createStateHandle(((Checkpointed) userFunction)
-					.snapshotState(checkpointId, timestamp));
+			Serializable state = ((Checkpointed) userFunction).snapshotState(checkpointId, timestamp);
+			if (state != null) {
+				checkpointedSnapshot = provider.createStateHandle(state);
+			}
 		}
 		
 		// if we have either operator or checkpointed state we store it in a

http://git-wip-us.apache.org/repos/asf/flink/blob/83e14cb1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/OperatorStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/OperatorStateHandle.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/OperatorStateHandle.java
index 87536ed..f308ba8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/OperatorStateHandle.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/OperatorStateHandle.java
@@ -46,5 +46,9 @@ public class OperatorStateHandle implements StateHandle<Serializable>
{
 	public void discardState() throws Exception {
 		handle.discardState();
 	}
+	
+	public StateHandle<Serializable> getHandle() {
+		return handle;
+	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/83e14cb1/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StateHandleTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StateHandleTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StateHandleTest.java
new file mode 100644
index 0000000..38117e8
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StateHandleTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.util.InstantiationUtil;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+public class StateHandleTest {
+
+	@Test
+	public void operatorStateHandleTest() throws Exception {
+
+		MockHandle<Serializable> h1 = new MockHandle<Serializable>(1);
+
+		OperatorStateHandle opHandle = new OperatorStateHandle(h1, true);
+		assertEquals(1, opHandle.getState());
+
+		OperatorStateHandle dsHandle = serializeDeserialize(opHandle);
+		MockHandle<Serializable> h2 = (MockHandle<Serializable>) dsHandle.getHandle();
+		assertFalse(h2.discarded);
+		assertNotNull(h1.state);
+		assertNull(h2.state);
+
+		dsHandle.discardState();
+
+		assertTrue(h2.discarded);
+	}
+
+	@Test
+	public void wrapperStateHandleTest() throws Exception {
+
+		MockHandle<Serializable> h1 = new MockHandle<Serializable>(1);
+		MockHandle<Serializable> h2 = new MockHandle<Serializable>(2);
+		StateHandle<Serializable> h3 = new MockHandle<Serializable>(3);
+
+		OperatorStateHandle opH1 = new OperatorStateHandle(h1, true);
+		OperatorStateHandle opH2 = new OperatorStateHandle(h2, false);
+
+		Map<String, OperatorStateHandle> opHandles = ImmutableMap.of("h1", opH1, "h2", opH2);
+
+		Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>> fullState
= Tuple2.of(h3,
+				opHandles);
+
+		List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>>
chainedStates = ImmutableList
+				.of(fullState);
+
+		WrapperStateHandle wrapperHandle = new WrapperStateHandle(chainedStates);
+
+		WrapperStateHandle dsWrapper = serializeDeserialize(wrapperHandle);
+
+		@SuppressWarnings("unchecked")
+		Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>> dsFullState
= ((List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>>)
dsWrapper
+				.getState()).get(0);
+
+		Map<String, OperatorStateHandle> dsOpHandles = dsFullState.f1;
+
+		assertNull(dsFullState.f0.getState());
+		assertFalse(((MockHandle<?>) dsFullState.f0).discarded);
+		assertFalse(((MockHandle<?>) dsOpHandles.get("h1").getHandle()).discarded);
+		assertNull(dsOpHandles.get("h1").getState());
+		assertFalse(((MockHandle<?>) dsOpHandles.get("h2").getHandle()).discarded);
+		assertNull(dsOpHandles.get("h2").getState());
+
+		dsWrapper.discardState();
+
+		assertTrue(((MockHandle<?>) dsFullState.f0).discarded);
+		assertTrue(((MockHandle<?>) dsOpHandles.get("h1").getHandle()).discarded);
+		assertTrue(((MockHandle<?>) dsOpHandles.get("h2").getHandle()).discarded);
+
+	}
+
+	@SuppressWarnings("unchecked")
+	private <X extends StateHandle<?>> X serializeDeserialize(X handle) throws IOException,
+			ClassNotFoundException {
+		byte[] serialized = InstantiationUtil.serializeObject(handle);
+		return (X) InstantiationUtil.deserializeObject(serialized, Thread.currentThread()
+				.getContextClassLoader());
+	}
+
+	@SuppressWarnings("serial")
+	private static class MockHandle<T> implements StateHandle<T> {
+
+		boolean discarded = false;
+		transient T state;
+
+		public MockHandle(T state) {
+			this.state = state;
+		}
+
+		@Override
+		public void discardState() {
+			state = null;
+			discarded = true;
+		}
+
+		@Override
+		public T getState() {
+			return state;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/83e14cb1/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index 3f99fa0..93dda5f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -372,7 +372,8 @@ public class StreamCheckpointingITCase {
 		}
 	}
 
-	private static class StringPrefixCountRichMapFunction extends RichMapFunction<String,
PrefixCount> {
+	private static class StringPrefixCountRichMapFunction extends RichMapFunction<String,
PrefixCount>
+			implements Checkpointed<Integer> {
 
 		OperatorState<Long> count;
 		static final long[] counts = new long[PARALLELISM];
@@ -392,5 +393,16 @@ public class StreamCheckpointingITCase {
 		public void close() throws IOException {
 			counts[getRuntimeContext().getIndexOfThisSubtask()] = count.value();
 		}
+
+		@Override
+		public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception
{
+			return null;
+		}
+
+		@Override
+		public void restoreState(Integer state) {
+			// verify that we never store/restore null state
+			fail();
+		}
 	}
 }


Mime
View raw message