Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 44B73200BF8 for ; Fri, 13 Jan 2017 11:26:18 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 4359F160B3F; Fri, 13 Jan 2017 10:26:18 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8FCA1160B32 for ; Fri, 13 Jan 2017 11:26:17 +0100 (CET) Received: (qmail 66114 invoked by uid 500); 13 Jan 2017 10:26:16 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 66105 invoked by uid 99); 13 Jan 2017 10:26:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 13 Jan 2017 10:26:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 86FE4DFC1A; Fri, 13 Jan 2017 10:26:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: uce@apache.org To: commits@flink.apache.org Message-Id: <4968e56a9da14349831b4e0ea5333dc3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-5467] Avoid legacy state for CheckpointedRestoring operators Date: Fri, 13 Jan 2017 10:26:16 +0000 (UTC) archived-at: Fri, 13 Jan 2017 10:26:18 -0000 Repository: flink Updated Branches: refs/heads/master 46423b9c7 -> 51a357351 [FLINK-5467] Avoid legacy state for CheckpointedRestoring operators This closes #3102. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/51a35735 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/51a35735 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/51a35735 Branch: refs/heads/master Commit: 51a357351b955844941edd9a9b1406cdc787b18a Parents: 46423b9 Author: Stefan Richter Authored: Thu Jan 12 12:24:34 2017 +0100 Committer: Ufuk Celebi Committed: Fri Jan 13 11:20:53 2017 +0100 ---------------------------------------------------------------------- .../streaming/api/operators/AbstractUdfStreamOperator.java | 6 +++--- .../org/apache/flink/test/checkpointing/RescalingITCase.java | 8 +++++++- 2 files changed, 10 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/51a35735/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java index 81f709b..15e26c9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java @@ -195,14 +195,13 @@ public abstract class AbstractUdfStreamOperator } catch (Exception e) { throw new Exception("Failed to draw state snapshot from function: " + e.getMessage(), e); } - } else if (userFunction instanceof CheckpointedRestoring) { - out.write(0); } } @Override public void restoreState(FSDataInputStream in) throws Exception { - if (userFunction instanceof CheckpointedRestoring) { + if (userFunction instanceof Checkpointed || + (userFunction instanceof CheckpointedRestoring && in instanceof Migration)) { @SuppressWarnings("unchecked") CheckpointedRestoring chkFunction = (CheckpointedRestoring) userFunction; @@ -219,6 +218,7 @@ public abstract class AbstractUdfStreamOperator } } } else if (in instanceof Migration) { + // absorb the introduced byte from the migration stream without too much further consequences int hasUdfState = in.read(); if (hasUdfState == 1) { throw new Exception("Found UDF state but operator is not instance of CheckpointedRestoring"); http://git-wip-us.apache.org/repos/asf/flink/blob/51a35735/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java index bc65abf..da4a01b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java @@ -42,6 +42,7 @@ import org.apache.flink.runtime.testingUtils.TestingCluster; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -903,7 +904,7 @@ public class RescalingITCase extends TestLogger { } } - private static class PartitionedStateSource extends StateSourceBase implements CheckpointedFunction { + private static class PartitionedStateSource extends StateSourceBase implements CheckpointedFunction, CheckpointedRestoring { private static final long serialVersionUID = -359715965103593462L; private static final int NUM_PARTITIONS = 7; @@ -945,5 +946,10 @@ public class RescalingITCase extends TestLogger { CHECK_CORRECT_RESTORE[getRuntimeContext().getIndexOfThisSubtask()] = counter; } } + + @Override + public void restoreState(Integer state) throws Exception { + counterPartitions.add(state); + } } }