Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DF505105B3 for ; Mon, 14 Sep 2015 15:59:27 +0000 (UTC) Received: (qmail 65323 invoked by uid 500); 14 Sep 2015 15:59:27 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 65291 invoked by uid 500); 14 Sep 2015 15:59:27 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 65282 invoked by uid 99); 14 Sep 2015 15:59:27 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 14 Sep 2015 15:59:27 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A85A0E0414; Mon, 14 Sep 2015 15:59:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mjsax@apache.org To: commits@flink.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-2658] fieldsGrouping for multiple output streams fails - added SplitStreamTypeKeySelector and JUnit tests Date: Mon, 14 Sep 2015 15:59:27 +0000 (UTC) Repository: flink Updated Branches: refs/heads/master 8acd15d82 -> ce68cbd91 [FLINK-2658] fieldsGrouping for multiple output streams fails - added SplitStreamTypeKeySelector and JUnit tests This closes #1122 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ce68cbd9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ce68cbd9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ce68cbd9 Branch: refs/heads/master Commit: ce68cbd91621b1a58cb34b33ab27762c6525cfa0 Parents: 8acd15d Author: mjsax Authored: Fri Sep 11 13:52:04 2015 +0200 Committer: mjsax Committed: Mon Sep 14 17:52:27 2015 +0200 ---------------------------------------------------------------------- .../api/FlinkTopologyBuilder.java | 22 ++++--- .../api/SplitStreamTypeKeySelector.java | 47 ++++++++++++++ .../api/FlinkTopologyBuilderTest.java | 28 +++++++++ .../stormcompatibility/util/TestDummyBolt.java | 55 ++++++++++++++++ .../stormcompatibility/util/TestDummySpout.java | 66 ++++++++++++++++++++ .../flink/stormcompatibility/util/TestSink.java | 48 ++++++++++++++ 6 files changed, 259 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ce68cbd9/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java index a739c23..e4d880f 100644 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java @@ -78,8 +78,8 @@ public class FlinkTopologyBuilder { */ @SuppressWarnings({"rawtypes", "unchecked"}) public FlinkTopology createTopology() { - final StormTopology stormTopolgoy = this.stormBuilder.createTopology(); - final FlinkTopology env = new FlinkTopology(stormTopolgoy); + final StormTopology stormTopology = this.stormBuilder.createTopology(); + final FlinkTopology env = new FlinkTopology(stormTopology); env.setParallelism(1); final HashMap> availableInputs = new HashMap>(); @@ -121,7 +121,7 @@ public class FlinkTopologyBuilder { availableInputs.put(spoutId, outputStreams); int dop = 1; - final ComponentCommon common = stormTopolgoy.get_spouts().get(spoutId).get_common(); + final ComponentCommon common = stormTopology.get_spouts().get(spoutId).get_common(); if (common.is_set_parallelism_hint()) { dop = common.get_parallelism_hint(); source.setParallelism(dop); @@ -155,7 +155,7 @@ public class FlinkTopologyBuilder { final String boltId = bolt.getKey(); final IRichBolt userBolt = bolt.getValue(); - final ComponentCommon common = stormTopolgoy.get_bolts().get(boltId).get_common(); + final ComponentCommon common = stormTopology.get_bolts().get(boltId).get_common(); Set> unprocessedInputs = unprocessdInputsPerBolt.get(boltId); if (unprocessedInputs == null) { @@ -194,9 +194,17 @@ public class FlinkTopologyBuilder { final List fields = grouping.get_fields(); if (fields.size() > 0) { FlinkOutputFieldsDeclarer prodDeclarer = this.declarers.get(producerId); - inputStream = inputStream.groupBy(prodDeclarer - .getGroupingFieldIndexes(inputStreamId, - grouping.get_fields())); + if (producer.size() == 1) { + inputStream = inputStream.groupBy(prodDeclarer + .getGroupingFieldIndexes(inputStreamId, + grouping.get_fields())); + } else { + inputStream = inputStream + .groupBy(new SplitStreamTypeKeySelector( + prodDeclarer.getGroupingFieldIndexes( + inputStreamId, + grouping.get_fields()))); + } } else { inputStream = inputStream.global(); } http://git-wip-us.apache.org/repos/asf/flink/blob/ce68cbd9/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/SplitStreamTypeKeySelector.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/SplitStreamTypeKeySelector.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/SplitStreamTypeKeySelector.java new file mode 100644 index 0000000..30227b8 --- /dev/null +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/SplitStreamTypeKeySelector.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.stormcompatibility.api; + +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.stormcompatibility.util.SplitStreamType; +import org.apache.flink.streaming.util.keys.KeySelectorUtil; +import org.apache.flink.streaming.util.keys.KeySelectorUtil.ArrayKeySelector; + +/** + * {@link SplitStreamTypeKeySelector} is a specific grouping key selector for streams that are selected via + * {@link FlinkStormStreamSelector} from a Spout or Bolt that declares multiple output streams. + * + * It extracts the wrapped {@link Tuple} type from the {@link SplitStreamType} tuples and applies a regular + * {@link ArrayKeySelector} on it. + */ +public class SplitStreamTypeKeySelector implements KeySelector, Tuple> { + private static final long serialVersionUID = 4672434660037669254L; + + private final ArrayKeySelector selector; + + public SplitStreamTypeKeySelector(int... fields) { + this.selector = new KeySelectorUtil.ArrayKeySelector(fields); + } + + @Override + public Tuple getKey(SplitStreamType value) throws Exception { + return selector.getKey(value.value); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/ce68cbd9/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilderTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilderTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilderTest.java index 0187020..0dd9b1c 100644 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilderTest.java +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilderTest.java @@ -16,8 +16,13 @@ */ package org.apache.flink.stormcompatibility.api; +import org.apache.flink.stormcompatibility.util.TestDummyBolt; +import org.apache.flink.stormcompatibility.util.TestDummySpout; +import org.apache.flink.stormcompatibility.util.TestSink; import org.junit.Test; +import backtype.storm.tuple.Fields; + public class FlinkTopologyBuilderTest { @Test(expected = RuntimeException.class) @@ -45,4 +50,27 @@ public class FlinkTopologyBuilderTest { builder.createTopology(); } + @Test + public void testFieldsGroupingOnMultipleSpoutOutputStreams() { + FlinkTopologyBuilder flinkBuilder = new FlinkTopologyBuilder(); + + flinkBuilder.setSpout("spout", new TestDummySpout()); + flinkBuilder.setBolt("sink", new TestSink()).fieldsGrouping("spout", + TestDummySpout.spoutStreamId, new Fields("id")); + + flinkBuilder.createTopology(); + } + + @Test + public void testFieldsGroupingOnMultipleBoltOutputStreams() { + FlinkTopologyBuilder flinkBuilder = new FlinkTopologyBuilder(); + + flinkBuilder.setSpout("spout", new TestDummySpout()); + flinkBuilder.setBolt("bolt", new TestDummyBolt()).shuffleGrouping("spout"); + flinkBuilder.setBolt("sink", new TestSink()).fieldsGrouping("bolt", + TestDummyBolt.groupingStreamId, new Fields("id")); + + flinkBuilder.createTopology(); + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/ce68cbd9/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummyBolt.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummyBolt.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummyBolt.java new file mode 100644 index 0000000..ec48719 --- /dev/null +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummyBolt.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.stormcompatibility.util; + +import java.util.Map; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.IRichBolt; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; + +public class TestDummyBolt implements IRichBolt { + private static final long serialVersionUID = 6893611247443121322L; + + public final static String shuffleStreamId = "shuffleStream"; + public final static String groupingStreamId = "groupingStream"; + + @SuppressWarnings("rawtypes") + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {} + + @Override + public void execute(Tuple input) {} + + @Override + public void cleanup() {} + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declareStream(shuffleStreamId, new Fields("data")); + declarer.declareStream(groupingStreamId, new Fields("id", "data")); + } + + @Override + public Map getComponentConfiguration() { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/ce68cbd9/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummySpout.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummySpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummySpout.java new file mode 100644 index 0000000..62705b8 --- /dev/null +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummySpout.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.stormcompatibility.util; + +import java.util.Map; + +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.IRichSpout; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Fields; +import backtype.storm.utils.Utils; + +public class TestDummySpout implements IRichSpout { + private static final long serialVersionUID = -5190945609124603118L; + + public final static String spoutStreamId = "spout-stream"; + + @SuppressWarnings("rawtypes") + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {} + + @Override + public void close() {} + + @Override + public void activate() {} + + @Override + public void deactivate() {} + + @Override + public void nextTuple() {} + + @Override + public void ack(Object msgId) {} + + @Override + public void fail(Object msgId) {} + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declareStream(Utils.DEFAULT_STREAM_ID, new Fields("data")); + declarer.declareStream(spoutStreamId, new Fields("id", "data")); + } + + @Override + public Map getComponentConfiguration() { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/ce68cbd9/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestSink.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestSink.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestSink.java new file mode 100644 index 0000000..5699219 --- /dev/null +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestSink.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.stormcompatibility.util; + +import java.util.Map; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.IRichBolt; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Tuple; + +public class TestSink implements IRichBolt { + private static final long serialVersionUID = 4314871456719370877L; + + @SuppressWarnings("rawtypes") + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {} + + @Override + public void execute(Tuple input) {} + + @Override + public void cleanup() {} + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) {} + + @Override + public Map getComponentConfiguration() { + return null; + } + +}