Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1196B181A1 for ; Tue, 30 Jun 2015 12:39:47 +0000 (UTC) Received: (qmail 87879 invoked by uid 500); 30 Jun 2015 12:39:47 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 87842 invoked by uid 500); 30 Jun 2015 12:39:47 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 87820 invoked by uid 99); 30 Jun 2015 12:39:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 30 Jun 2015 12:39:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B9238E04B3; Tue, 30 Jun 2015 12:39:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: gyfora@apache.org To: commits@flink.apache.org Message-Id: <11180e04915341f9b75e2e89b2024255@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-2294] [streaming] Fix partitioned state next-input setting for copying chained collectors Date: Tue, 30 Jun 2015 12:39:46 +0000 (UTC) Repository: flink Updated Branches: refs/heads/master df4216083 -> fef9f1158 [FLINK-2294] [streaming] Fix partitioned state next-input setting for copying chained collectors Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fef9f115 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fef9f115 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fef9f115 Branch: refs/heads/master Commit: fef9f115838b3ba3d3769f8669ee251c2cd403c6 Parents: df42160 Author: Gyula Fora Authored: Tue Jun 30 13:48:17 2015 +0200 Committer: Gyula Fora Committed: Tue Jun 30 14:29:11 2015 +0200 ---------------------------------------------------------------------- .../streaming/runtime/tasks/OutputHandler.java | 1 + .../api/state/StatefulOperatorTest.java | 35 ++++++++++++++++++++ 2 files changed, 36 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fef9f115/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java index 2d2f29b..73f0a89 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java @@ -274,6 +274,7 @@ public class OutputHandler { @Override public void collect(T record) { try { + operator.getRuntimeContext().setNextInput(record); operator.processElement(serializer.copy(record)); } catch (Exception e) { if (LOG.isErrorEnabled()) { http://git-wip-us.apache.org/repos/asf/flink/blob/fef9f115/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java index af719f3..774b431 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java @@ -109,6 +109,11 @@ public class StatefulOperatorTest { public void invoke(String value) throws Exception {} }); + keyedStream.map(new StatefulMapper2()).setParallelism(1).addSink(new SinkFunction() { + private static final long serialVersionUID = 1L; + public void invoke(String value) throws Exception {} + }); + try { keyedStream.shuffle(); fail(); @@ -224,6 +229,36 @@ public class StatefulOperatorTest { } } + public static class StatefulMapper2 extends RichMapFunction { + private static final long serialVersionUID = 1L; + OperatorState groupCounter; + + @Override + public String map(Integer value) throws Exception { + groupCounter.updateState(groupCounter.getState() + 1); + + return value.toString(); + } + + @Override + public void open(Configuration conf) throws IOException { + groupCounter = getRuntimeContext().getOperatorState("groupCounter", 0, true); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + public void close() throws Exception { + Map states = ((StreamingRuntimeContext) getRuntimeContext()).getOperatorStates(); + PartitionedStreamOperatorState groupCounter = (PartitionedStreamOperatorState) states.get("groupCounter"); + for (Entry count : groupCounter.getPartitionedState().entrySet()) { + Integer key = (Integer) count.getKey(); + Integer expected = key < 3 ? 2 : 1; + assertEquals(expected, count.getValue()); + } + } + + } + public static class ModKey implements KeySelector { private static final long serialVersionUID = 4193026742083046736L;