Return-Path: X-Original-To: apmail-flink-issues-archive@minotaur.apache.org Delivered-To: apmail-flink-issues-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 27897185C3 for ; Tue, 24 Nov 2015 12:27:12 +0000 (UTC) Received: (qmail 66946 invoked by uid 500); 24 Nov 2015 12:27:12 -0000 Delivered-To: apmail-flink-issues-archive@flink.apache.org Received: (qmail 66892 invoked by uid 500); 24 Nov 2015 12:27:12 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 66880 invoked by uid 99); 24 Nov 2015 12:27:12 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Nov 2015 12:27:12 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id E48C82C1F57 for ; Tue, 24 Nov 2015 12:27:11 +0000 (UTC) Date: Tue, 24 Nov 2015 12:27:11 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-2837) FlinkTopologyBuilder cannot handle multiple input streams MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15024387#comment-15024387 ] ASF GitHub Bot commented on FLINK-2837: --------------------------------------- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45728322 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java --- @@ -15,75 +16,468 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.flink.storm.api; +import backtype.storm.generated.ComponentCommon; +import backtype.storm.generated.GlobalStreamId; +import backtype.storm.generated.Grouping; import backtype.storm.generated.StormTopology; +import backtype.storm.topology.IRichBolt; +import backtype.storm.topology.IRichSpout; +import backtype.storm.topology.IRichStateSpout; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.tuple.Fields; +import com.google.common.base.Preconditions; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.storm.util.SplitStreamMapper; +import org.apache.flink.storm.util.SplitStreamType; +import org.apache.flink.storm.util.StormStreamSelector; +import org.apache.flink.storm.wrappers.BoltWrapper; +import org.apache.flink.storm.wrappers.BoltWrapperTwoInput; +import org.apache.flink.storm.wrappers.SpoutWrapper; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.datastream.SplitStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; /** - * {@link FlinkTopology} mimics a {@link StormTopology} and is implemented in terms of a {@link - * StreamExecutionEnvironment} . In contrast to a regular {@link StreamExecutionEnvironment}, a {@link FlinkTopology} - * cannot be executed directly, but must be handed over to a {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or - * {@link FlinkClient}. + * {@link FlinkTopology} translates a {@link TopologyBuilder} to a Flink program. + * CAUTION: {@link IRichStateSpout StateSpout}s are currently not supported. */ -public class FlinkTopology extends StreamExecutionEnvironment { +public class FlinkTopology { + + /** All declared streams and output schemas by operator ID */ + private final HashMap> outputStreams = new HashMap>(); + /** All spouts&bolts declarers by their ID */ + private final HashMap declarers = new HashMap(); + + private final HashMap>> unprocessdInputsPerBolt = + new HashMap>>(); + + final HashMap>> availableInputs = new HashMap<>(); - /** The number of declared tasks for the whole program (ie, sum over all dops) */ - private int numberOfTasks = 0; + private final TopologyBuilder builder; - public FlinkTopology() { - // Set default parallelism to 1, to mirror Storm default behavior - super.setParallelism(1); + // needs to be a class member for internal testing purpose + private final StormTopology stormTopology; + + private final Map spouts; + private final Map bolts; + + private final StreamExecutionEnvironment env; + + private FlinkTopology(TopologyBuilder builder) { + this.builder = builder; + this.stormTopology = builder.createTopology(); + // extract the spouts and bolts + this.spouts = getPrivateField("_spouts"); + this.bolts = getPrivateField("_bolts"); + + this.env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // Kick off the translation immediately + translateTopology(); } /** - * Is not supported. In order to execute use {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or {@link - * FlinkClient}. * - * @throws UnsupportedOperationException - * at every invocation + * Creates a Flink program that uses the specified spouts and bolts. + * @param stormBuilder The storm topology builder to use for creating the Flink topology. + * @return A Flink Topology which may be executed. */ - @Override - public JobExecutionResult execute() throws Exception { - throw new UnsupportedOperationException( - "A FlinkTopology cannot be executed directly. Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient " + - "instead."); + public static FlinkTopology createTopology(TopologyBuilder stormBuilder) { + return new FlinkTopology(stormBuilder); } /** - * Is not supported. In order to execute use {@link FlinkLocalCluster}, {@link FlinkSubmitter} or {@link - * FlinkClient}. - * - * @throws UnsupportedOperationException - * at every invocation + * Returns the underlying Flink ExecutionEnvironment for the Storm topology. + * @return The contextual environment. */ - @Override - public JobExecutionResult execute(final String jobName) throws Exception { - throw new UnsupportedOperationException( - "A FlinkTopology cannot be executed directly. Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient " + - "instead."); + public StreamExecutionEnvironment getExecutionEnvironment() { + return this.env; } /** - * Increased the number of declared tasks of this program by the given value. - * - * @param dop - * The dop of a new operator that increases the number of overall tasks. + * Directly executes the Storm topology based on the current context (local when in IDE and + * remote when executed thorugh ./bin/flink). + * @return The execution result + * @throws Exception */ - public void increaseNumberOfTasks(final int dop) { - assert (dop > 0); - this.numberOfTasks += dop; + public JobExecutionResult execute() throws Exception { + return env.execute(); + } + + + @SuppressWarnings("unchecked") + private Map getPrivateField(String field) { + try { + Field f = builder.getClass().getDeclaredField(field); + f.setAccessible(true); + return copyObject((Map) f.get(builder)); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException("Couldn't get " + field + " from TopologyBuilder", e); + } + } + + private T copyObject(T object) { + try { + return InstantiationUtil.deserializeObject( + InstantiationUtil.serializeObject(object), + getClass().getClassLoader() + ); + } catch (IOException | ClassNotFoundException e) { + throw new RuntimeException("Failed to copy object."); + } } /** - * Return the number or required tasks to execute this program. - * - * @return the number or required tasks to execute this program + * Creates a Flink program that uses the specified spouts and bolts. */ - public int getNumberOfTasks() { - return this.numberOfTasks; + private void translateTopology() { + + unprocessdInputsPerBolt.clear(); + outputStreams.clear(); + declarers.clear(); + availableInputs.clear(); + + // Storm defaults to parallelism 1 + env.setParallelism(1); + + /* Translation of topology */ + + + for (final Entry spout : spouts.entrySet()) { + final String spoutId = spout.getKey(); + final IRichSpout userSpout = spout.getValue(); + + final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer(); + userSpout.declareOutputFields(declarer); + final HashMap sourceStreams = declarer.outputStreams; + this.outputStreams.put(spoutId, sourceStreams); + declarers.put(spoutId, declarer); + + + final HashMap> outputStreams = new HashMap>(); + final DataStreamSource source; + + if (sourceStreams.size() == 1) { + final SpoutWrapper spoutWrapperSingleOutput = new SpoutWrapper(userSpout); + spoutWrapperSingleOutput.setStormTopology(stormTopology); + + final String outputStreamId = (String) sourceStreams.keySet().toArray()[0]; + + DataStreamSource src = env.addSource(spoutWrapperSingleOutput, spoutId, + declarer.getOutputType(outputStreamId)); + + outputStreams.put(outputStreamId, src); + source = src; + } else { + final SpoutWrapper> spoutWrapperMultipleOutputs = new SpoutWrapper>( + userSpout); + spoutWrapperMultipleOutputs.setStormTopology(stormTopology); + + @SuppressWarnings({ "unchecked", "rawtypes" }) + DataStreamSource> multiSource = env.addSource( + spoutWrapperMultipleOutputs, spoutId, + (TypeInformation) TypeExtractor.getForClass(SplitStreamType.class)); + + SplitStream> splitSource = multiSource + .split(new StormStreamSelector()); + for (String streamId : sourceStreams.keySet()) { + outputStreams.put(streamId, splitSource.select(streamId).map(new SplitStreamMapper())); + } + source = multiSource; + } + availableInputs.put(spoutId, outputStreams); + + final ComponentCommon common = stormTopology.get_spouts().get(spoutId).get_common(); + if (common.is_set_parallelism_hint()) { + int dop = common.get_parallelism_hint(); + source.setParallelism(dop); + } else { + common.set_parallelism_hint(1); + } + } + + /** + * 1. Connect all spout streams with bolts streams + * 2. Then proceed with the bolts stream already connected + * + * Because we do not know the order in which an iterator steps over a set, we might process a consumer before + * its producer + * ->thus, we might need to repeat multiple times + */ + boolean makeProgress = true; + while (bolts.size() > 0) { + if (!makeProgress) { + throw new RuntimeException( + "Unable to build Topology. Could not connect the following bolts: " + + bolts.keySet()); + } + makeProgress = false; + + final Iterator> boltsIterator = bolts.entrySet().iterator(); + while (boltsIterator.hasNext()) { + + final Entry bolt = boltsIterator.next(); + final String boltId = bolt.getKey(); + final IRichBolt userBolt = copyObject(bolt.getValue()); + + final ComponentCommon common = stormTopology.get_bolts().get(boltId).get_common(); + + Set> unprocessedBoltInputs = unprocessdInputsPerBolt.get(boltId); + if (unprocessedBoltInputs == null) { + unprocessedBoltInputs = new HashSet<>(); + unprocessedBoltInputs.addAll(common.get_inputs().entrySet()); + unprocessdInputsPerBolt.put(boltId, unprocessedBoltInputs); + } + + // check if all inputs are available + final int numberOfInputs = unprocessedBoltInputs.size(); + int inputsAvailable = 0; + for (Entry entry : unprocessedBoltInputs) { + final String producerId = entry.getKey().get_componentId(); + final String streamId = entry.getKey().get_streamId(); + final HashMap> streams = availableInputs.get(producerId); + if (streams != null && streams.get(streamId) != null) { + inputsAvailable++; + } + } + + if (inputsAvailable != numberOfInputs) { + // traverse other bolts first until inputs are available + continue; + } else { + makeProgress = true; + boltsIterator.remove(); + } + + final Map> inputStreams = new HashMap<>(numberOfInputs); + + for (Entry input : unprocessedBoltInputs) { + final GlobalStreamId streamId = input.getKey(); + final Grouping grouping = input.getValue(); + + final String producerId = streamId.get_componentId(); + + final Map> producer = availableInputs.get(producerId); + + inputStreams.put(streamId, processInput(boltId, userBolt, streamId, grouping, producer)); + } + + final Iterator>> iterator = inputStreams.entrySet().iterator(); + + final Entry> firstInput = iterator.next(); + GlobalStreamId streamId = firstInput.getKey(); + DataStream inputStream = firstInput.getValue(); + + final SingleOutputStreamOperator outputStream; + + switch (numberOfInputs) { + case 1: + outputStream = createOutput(boltId, userBolt, streamId, inputStream); + break; + case 2: + Entry> secondInput = iterator.next(); + GlobalStreamId streamId2 = secondInput.getKey(); + DataStream inputStream2 = secondInput.getValue(); + outputStream = createOutput(boltId, userBolt, streamId, inputStream, streamId2, inputStream2); + break; + default: + throw new UnsupportedOperationException("Don't know how to translate a bolt " + + boltId + " with " + numberOfInputs + " inputs."); + } + + if (common.is_set_parallelism_hint()) { + int dop = common.get_parallelism_hint(); + outputStream.setParallelism(dop); + } else { + common.set_parallelism_hint(1); + } + + } + } } + private DataStream processInput(String boltId, IRichBolt userBolt, + GlobalStreamId streamId, Grouping grouping, + Map> producer) { + + Preconditions.checkNotNull(userBolt); + Preconditions.checkNotNull(boltId); + Preconditions.checkNotNull(streamId); + Preconditions.checkNotNull(grouping); + Preconditions.checkNotNull(producer); + + final String producerId = streamId.get_componentId(); + final String inputStreamId = streamId.get_streamId(); + + DataStream inputStream = producer.get(inputStreamId); + + final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer(); + declarers.put(boltId, declarer); + userBolt.declareOutputFields(declarer); + this.outputStreams.put(boltId, declarer.outputStreams); + + // if producer was processed already + if (grouping.is_set_shuffle()) { + // Storm uses a round-robin shuffle strategy + inputStream = inputStream.rebalance(); + } else if (grouping.is_set_fields()) { + // global grouping is emulated in Storm via an empty fields grouping list + final List fields = grouping.get_fields(); + if (fields.size() > 0) { + FlinkOutputFieldsDeclarer prodDeclarer = this.declarers.get(producerId); + inputStream = inputStream.keyBy(prodDeclarer + .getGroupingFieldIndexes(inputStreamId, + grouping.get_fields())); + } else { + inputStream = inputStream.global(); + } + } else if (grouping.is_set_all()) { + inputStream = inputStream.broadcast(); + } else if (!grouping.is_set_local_or_shuffle()) { + throw new UnsupportedOperationException( + "Flink only supports (local-or-)shuffle, fields, all, and global grouping"); + } + + return inputStream; + } + + private SingleOutputStreamOperator createOutput(String boltId, IRichBolt bolt, GlobalStreamId streamId, DataStream inputStream) { + return createOutput(boltId, bolt, streamId, inputStream, null, null); + } + + private SingleOutputStreamOperator createOutput(String boltId, IRichBolt bolt, + GlobalStreamId streamId, DataStream inputStream, + GlobalStreamId streamId2, DataStream inputStream2) { + Preconditions.checkNotNull(boltId); + Preconditions.checkNotNull(streamId); --- End diff -- Asserts are not enabled by default. I think it's important to always check this condition. > FlinkTopologyBuilder cannot handle multiple input streams > --------------------------------------------------------- > > Key: FLINK-2837 > URL: https://issues.apache.org/jira/browse/FLINK-2837 > Project: Flink > Issue Type: Bug > Components: Storm Compatibility > Reporter: Matthias J. Sax > Assignee: Maximilian Michels > > FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead of union the incoming streams, it replicates the consuming bolt and each (logical) instance processes one of the input streams. > For example: > {noformat} > final FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); > builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10)); > builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8)); > builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13)); > builder.setBolt(boltId, new MergerBolt()) > .shuffleGrouping(spoutId1) > .shuffleGrouping(spoutId2) > .shuffleGrouping(spoutId3); > builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter())) > .shuffleGrouping(boltId); > {noformat} > will only print the data from a single source instead of all sources. -- This message was sent by Atlassian JIRA (v6.3.4#6332)