apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXMALHAR-2106) Support merging multiple streams with StreamMerger
Date Tue, 07 Jun 2016 20:08:21 GMT

    [ https://issues.apache.org/jira/browse/APEXMALHAR-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15319260#comment-15319260
] 

ASF GitHub Bot commented on APEXMALHAR-2106:
--------------------------------------------

Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/309#discussion_r66142732
  
    --- Diff: library/src/main/java/com/datatorrent/lib/stream/MultipleStreamMerger.java ---
    @@ -0,0 +1,277 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.stream;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Module;
    +
    +/**
    + * Module that adds functionality to bypass the platform limitations of combining more
than two streams at a time with
    + * Stream Merger.
    + *
    + * Usage:
    + *
    + * dag.addOperator("Stream_1", op1);
    + * dag.addOperator("Stream_2", op2);
    + * dag.addOperator("Stream_3", op3);
    + *
    + * MultipleStreamMerger merger = new MultipleStreamMerger();
    + * merger.merge(op1.out)
    + * .merge(op2.out)
    + * .merge(op3.out)
    + * .insertInto(dag, conf);
    + *
    + * dag.addModule("Merger", merger);
    + *
    + * @param <K>
    + */
    +public class MultipleStreamMerger<K> implements Module
    +{
    +  public class Stream
    +  {
    +    DefaultInputPort destPort;
    +    DefaultOutputPort sourcePort;
    +    String name;
    +
    +    public Stream(String name, DefaultOutputPort sourcePort, DefaultInputPort destPort)
    +    {
    +      this.destPort = destPort;
    +      this.sourcePort = sourcePort;
    +      this.name = name;
    +    }
    +  }
    +
    +  public class NamedMerger
    +  {
    +    StreamMerger<K> merger;
    +    String name;
    +
    +    public NamedMerger(String name, StreamMerger<K> merger)
    +    {
    +      this.merger = merger;
    +      this.name = name;
    +    }
    +  }
    +
    +  private int streamCount = 0;
    +
    +  ArrayList<DefaultOutputPort<K>> streamsToMerge = new ArrayList<>();
    +
    +  public transient ProxyOutputPort<K> streamOutput = new ProxyOutputPort<>();
    +
    +  /**
    +   * Used to define all the sources to be merged into a single stream.
    +   *
    +   * @param sourcePort - The output port from the upstream operator that provides data
    +   * @return The updated MultipleStreamMerger object that tracks which streams should
be unified.
    +   */
    +  public MultipleStreamMerger<K> merge(DefaultOutputPort<K> sourcePort)
    +  {
    +    streamsToMerge.add(sourcePort);
    +    return this;
    +  }
    +
    +  /**
    +   * To merge more than two streams at a time, we construct a binary tree of thread-local
StreamMerger operators
    +   * E.g.
    +   *
    +   * Tier 0          Tier 1              Tier 2
    +   *
    +   * Stream 1 ->
    +   * StreamMerger_1 ->
    +   * Stream 2 ->
    +   * StreamMerger_Final -> Out
    +   * Stream 3 ->
    +   * StreamMerger_2 ->
    +   * Stream 4 ->
    +   *
    +   * This function updates the provided DAG with the relevant streams.
    +   */
    +  public void mergeStreams(DAG dag, Configuration conf)
    +  {
    +    if (streamsToMerge.size() < 2) {
    +      throw new IllegalArgumentException("Not enough streams to merge, at least two streams
must be selected for " +
    +          "merging with `.merge()`.");
    +    }
    +
    +    ArrayList<Stream> streamsToAddToDag = new ArrayList<>();
    +    ArrayList<NamedMerger> operatorsToAdd = new ArrayList<>();
    +
    +    // Determine operators and streams to add to the DAG
    +    constructMergeTree(streamsToAddToDag, operatorsToAdd);
    +
    +    for (NamedMerger m : operatorsToAdd) {
    +      dag.addOperator(m.name, m.merger);
    +    }
    +
    +    for (Stream s : streamsToAddToDag) {
    +      dag.addStream(s.name, s.sourcePort, s.destPort).setLocality(DAG.Locality.CONTAINER_LOCAL);
    +    }
    +  }
    +
    +  /**
    +   * Given a set of streams to be merged (defined via {@link #merge(DefaultOutputPort)}),
compute the optimal
    +   * structure of cascading mergers that need to be instantiated, added to the dag, and
linked together.
    +   * @param streamsToAddToDag - (output)  A list that is populated with streams that
should be added to the  DAG
    +   * @param operatorsToAdd - (output) A list that is populated with operators to be added
to the DAG
    +   */
    +  public void constructMergeTree(
    +      ArrayList<Stream> streamsToAddToDag,
    +      ArrayList<NamedMerger> operatorsToAdd)
    +  {
    +    if (streamsToMerge.size() < 2) {
    +      throw new IllegalArgumentException("Not enough streams to merge. Ensure `.merge`
was called for each stream " +
    +          "to be added.");
    +    }
    +
    +    // Define the final merger in the sequence and connect its output to the module's
output
    +    StreamMerger<K> finalMerger = new StreamMerger<>();
    +    operatorsToAdd.add(new NamedMerger("Merger_Final", finalMerger));
    +    streamOutput.set(finalMerger.out);
    +
    +    ArrayList<ArrayList<StreamMerger<K>>> mergers = new ArrayList<>();
    +
    +    /**
    +     * First, calculate the number of tiers we need to merge all streams given that each
merger can only merge two
    +     * streams at a time.
    +     */
    +    int numTiers = (int)Math.ceil(Math.log(streamsToMerge.size()) / Math.log(2));
    +
    +    // Handle the simple case where we only have a single tier (only two streams to merge)
    +    if (numTiers == 1) {
    +      assert (streamsToMerge.size() == 2);
    +      streamsToAddToDag.add(new Stream("FinalMerge_Stream_0", streamsToMerge.get(0),
finalMerger.data1));
    +      streamsToAddToDag.add(new Stream("FinalMerge_Stream_1", streamsToMerge.get(1),
finalMerger.data2));
    +
    +      // We don't need to add any operators since we've already added the final merger
    +    } else {
    +      Iterator<DefaultOutputPort<K>> streams = streamsToMerge.iterator();
    +
    +      // When assigning streams, we will switch between ports 1 and 2 as we use successive
mergers.
    +      boolean usePort1;
    +
    +      // For each tier, create the mergers in that tier, and connect the relevant streams
    +      for (int i = 0; i < numTiers - 1; i++) {
    +        int streamIdx = 0;
    +        usePort1 = true;
    +
    +        int numMergers = (int)Math.ceil(streamsToMerge.size() / Math.pow(2, i + 1));
    --- End diff --
    
    Should this be floor instead of ceil? The one odd stream can be directly fed to the final
tier.


> Support merging multiple streams with StreamMerger 
> ---------------------------------------------------
>
>                 Key: APEXMALHAR-2106
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2106
>             Project: Apache Apex Malhar
>          Issue Type: New Feature
>            Reporter: Ilya Ganelin
>            Assignee: Ilya Ganelin
>
> To properly implement the Flatten transformation (and other Stream combination operations),
Apex must support merging data from multiple sources. The StreamMerger operator can be improved
to merge multiple streams, rather than just the two streams it can handle in the present implementation.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message