Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AC77018B1E for ; Thu, 30 Jul 2015 14:45:38 +0000 (UTC) Received: (qmail 50252 invoked by uid 500); 30 Jul 2015 14:45:38 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 50214 invoked by uid 500); 30 Jul 2015 14:45:38 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 50204 invoked by uid 99); 30 Jul 2015 14:45:38 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 30 Jul 2015 14:45:38 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6F679E7139; Thu, 30 Jul 2015 14:45:38 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: gyfora@apache.org To: commits@flink.apache.org Date: Thu, 30 Jul 2015 14:45:38 -0000 Message-Id: <315fa4c6bfc34f608ab6d77b503aa427@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/3] flink git commit: [FLINK-2324] [streaming] Added test for different StateHandle wrappers Repository: flink Updated Branches: refs/heads/master 1b3bdce5c -> 83e14cb15 [FLINK-2324] [streaming] Added test for different StateHandle wrappers Closes #937 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/83e14cb1 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/83e14cb1 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/83e14cb1 Branch: refs/heads/master Commit: 83e14cb15a41c417fd7d024c383a4f4d50ec5a19 Parents: 58cd4ea Author: Gyula Fora Authored: Thu Jul 30 07:26:40 2015 +0200 Committer: Gyula Fora Committed: Thu Jul 30 16:44:53 2015 +0200 ---------------------------------------------------------------------- .../operators/AbstractUdfStreamOperator.java | 8 +- .../api/state/OperatorStateHandle.java | 4 + .../streaming/api/state/StateHandleTest.java | 134 +++++++++++++++++++ .../StreamCheckpointingITCase.java | 14 +- 4 files changed, 156 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/83e14cb1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java index f21aacc..585b4ce 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java @@ -79,7 +79,7 @@ public abstract class AbstractUdfStreamOperator, Map> snapshots) throws Exception { // Restore state using the Checkpointed interface - if (userFunction instanceof Checkpointed) { + if (userFunction instanceof Checkpointed && snapshots.f0 != null) { ((Checkpointed) userFunction).restoreState(snapshots.f0.getState()); } @@ -122,8 +122,10 @@ public abstract class AbstractUdfStreamOperator provider = runtimeContext.getStateHandleProvider(); - checkpointedSnapshot = provider.createStateHandle(((Checkpointed) userFunction) - .snapshotState(checkpointId, timestamp)); + Serializable state = ((Checkpointed) userFunction).snapshotState(checkpointId, timestamp); + if (state != null) { + checkpointedSnapshot = provider.createStateHandle(state); + } } // if we have either operator or checkpointed state we store it in a http://git-wip-us.apache.org/repos/asf/flink/blob/83e14cb1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/OperatorStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/OperatorStateHandle.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/OperatorStateHandle.java index 87536ed..f308ba8 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/OperatorStateHandle.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/OperatorStateHandle.java @@ -46,5 +46,9 @@ public class OperatorStateHandle implements StateHandle { public void discardState() throws Exception { handle.discardState(); } + + public StateHandle getHandle() { + return handle; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/83e14cb1/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StateHandleTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StateHandleTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StateHandleTest.java new file mode 100644 index 0000000..38117e8 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StateHandleTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.state; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.util.InstantiationUtil; +import org.junit.Test; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +public class StateHandleTest { + + @Test + public void operatorStateHandleTest() throws Exception { + + MockHandle h1 = new MockHandle(1); + + OperatorStateHandle opHandle = new OperatorStateHandle(h1, true); + assertEquals(1, opHandle.getState()); + + OperatorStateHandle dsHandle = serializeDeserialize(opHandle); + MockHandle h2 = (MockHandle) dsHandle.getHandle(); + assertFalse(h2.discarded); + assertNotNull(h1.state); + assertNull(h2.state); + + dsHandle.discardState(); + + assertTrue(h2.discarded); + } + + @Test + public void wrapperStateHandleTest() throws Exception { + + MockHandle h1 = new MockHandle(1); + MockHandle h2 = new MockHandle(2); + StateHandle h3 = new MockHandle(3); + + OperatorStateHandle opH1 = new OperatorStateHandle(h1, true); + OperatorStateHandle opH2 = new OperatorStateHandle(h2, false); + + Map opHandles = ImmutableMap.of("h1", opH1, "h2", opH2); + + Tuple2, Map> fullState = Tuple2.of(h3, + opHandles); + + List, Map>> chainedStates = ImmutableList + .of(fullState); + + WrapperStateHandle wrapperHandle = new WrapperStateHandle(chainedStates); + + WrapperStateHandle dsWrapper = serializeDeserialize(wrapperHandle); + + @SuppressWarnings("unchecked") + Tuple2, Map> dsFullState = ((List, Map>>) dsWrapper + .getState()).get(0); + + Map dsOpHandles = dsFullState.f1; + + assertNull(dsFullState.f0.getState()); + assertFalse(((MockHandle) dsFullState.f0).discarded); + assertFalse(((MockHandle) dsOpHandles.get("h1").getHandle()).discarded); + assertNull(dsOpHandles.get("h1").getState()); + assertFalse(((MockHandle) dsOpHandles.get("h2").getHandle()).discarded); + assertNull(dsOpHandles.get("h2").getState()); + + dsWrapper.discardState(); + + assertTrue(((MockHandle) dsFullState.f0).discarded); + assertTrue(((MockHandle) dsOpHandles.get("h1").getHandle()).discarded); + assertTrue(((MockHandle) dsOpHandles.get("h2").getHandle()).discarded); + + } + + @SuppressWarnings("unchecked") + private > X serializeDeserialize(X handle) throws IOException, + ClassNotFoundException { + byte[] serialized = InstantiationUtil.serializeObject(handle); + return (X) InstantiationUtil.deserializeObject(serialized, Thread.currentThread() + .getContextClassLoader()); + } + + @SuppressWarnings("serial") + private static class MockHandle implements StateHandle { + + boolean discarded = false; + transient T state; + + public MockHandle(T state) { + this.state = state; + } + + @Override + public void discardState() { + state = null; + discarded = true; + } + + @Override + public T getState() { + return state; + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/83e14cb1/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java index 3f99fa0..93dda5f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java @@ -372,7 +372,8 @@ public class StreamCheckpointingITCase { } } - private static class StringPrefixCountRichMapFunction extends RichMapFunction { + private static class StringPrefixCountRichMapFunction extends RichMapFunction + implements Checkpointed { OperatorState count; static final long[] counts = new long[PARALLELISM]; @@ -392,5 +393,16 @@ public class StreamCheckpointingITCase { public void close() throws IOException { counts[getRuntimeContext().getIndexOfThisSubtask()] = count.value(); } + + @Override + public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + return null; + } + + @Override + public void restoreState(Integer state) { + // verify that we never store/restore null state + fail(); + } } }