tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhiyu...@apache.org
Subject [3/3] tez git commit: TEZ-3708. Improve parallelism and auto grouping of unpartitioned cartesian product (zhiyuany)
Date Thu, 11 May 2017 22:21:58 GMT
TEZ-3708. Improve parallelism and auto grouping of unpartitioned cartesian product (zhiyuany)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a55fe80b
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a55fe80b
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a55fe80b

Branch: refs/heads/master
Commit: a55fe80bfa7beef646c95ca73a844ffc7cae999c
Parents: dec7c1b
Author: Zhiyuan Yang <zhiyuany@apache.org>
Authored: Thu May 11 15:19:39 2017 -0700
Committer: Zhiyuan Yang <zhiyuany@apache.org>
Committed: Thu May 11 15:19:39 2017 -0700

----------------------------------------------------------------------
 .../CartesianProductCombination.java            |   4 +-
 .../CartesianProductConfig.java                 |  45 +-
 .../CartesianProductEdgeManager.java            |  11 +-
 .../CartesianProductEdgeManagerConfig.java      |  67 ---
 .../CartesianProductEdgeManagerPartitioned.java |  31 +-
 .../CartesianProductEdgeManagerReal.java        |   3 +-
 ...artesianProductEdgeManagerUnpartitioned.java | 125 -----
 .../CartesianProductVertexManager.java          |  64 ++-
 .../CartesianProductVertexManagerConfig.java    |  77 ---
 ...artesianProductVertexManagerPartitioned.java |  38 +-
 .../CartesianProductVertexManagerReal.java      |   3 +-
 ...tesianProductVertexManagerUnpartitioned.java | 438 ---------------
 .../FairCartesianProductEdgeManager.java        | 174 ++++++
 .../FairCartesianProductVertexManager.java      | 551 +++++++++++++++++++
 .../partitioner/RoundRobinPartitioner.java      |  30 +
 .../tez/runtime/library/utils/Grouper.java      |  66 +--
 .../main/proto/CartesianProductPayload.proto    |  11 +-
 .../TestCartesianProductCombination.java        |   2 +-
 .../TestCartesianProductConfig.java             |  37 +-
 .../TestCartesianProductEdgeManager.java        |   8 +-
 .../TestCartesianProductEdgeManagerConfig.java  |  53 --
 ...tCartesianProductEdgeManagerPartitioned.java |  77 +--
 ...artesianProductEdgeManagerUnpartitioned.java | 288 ----------
 .../TestCartesianProductVertexManager.java      |   2 +-
 ...TestCartesianProductVertexManagerConfig.java |  53 --
 ...artesianProductVertexManagerPartitioned.java |  26 +-
 ...tesianProductVertexManagerUnpartitioned.java | 460 ----------------
 .../TestFairCartesianProductEdgeManager.java    | 245 +++++++++
 .../TestFairCartesianProductVertexManager.java  | 500 +++++++++++++++++
 .../library/cartesianproduct/TestGrouper.java   |  36 +-
 .../mapreduce/examples/CartesianProduct.java    | 385 +++++++++++++
 .../tez/mapreduce/examples/ExampleDriver.java   |   2 +
 .../org/apache/tez/test/TestFaultTolerance.java |   5 +-
 .../java/org/apache/tez/test/TestOutput.java    |  11 +
 .../java/org/apache/tez/test/TestTezJobs.java   |  13 +
 35 files changed, 2191 insertions(+), 1750 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java
index 97f3eb2..8de8a02 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java
@@ -143,9 +143,9 @@ class CartesianProductCombination {
   }
 
   /**
-   * @return corresponding chunk id for current combination
+   * @return corresponding task id for current combination
    */
-  public int getChunkId() {
+  public int getTaskId() {
     int chunkId = 0;
     for (int i = 0; i < combination.length; i++) {
       chunkId += combination[i]*factor[i];

http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java
index 12a17cb..7aac1d7 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java
@@ -48,12 +48,12 @@ public class CartesianProductConfig {
   private final boolean isPartitioned;
   private final String[] sources;
  // numPartition[i] means how many partitions sourceVertices[i] will generate
- // (not used in unpartitioned case)
+ // (not used in fair cartesian product)
   private final int[] numPartitions;
   private final CartesianProductFilterDescriptor filterDescriptor;
 
   /**
-   * create config for unpartitioned case
+   * create config for fair cartesian product
    * @param sources list of names of source vertices or vertex groups
    */
   public CartesianProductConfig(List<String> sources) {
@@ -84,7 +84,7 @@ public class CartesianProductConfig {
                                 CartesianProductFilterDescriptor filterDescriptor) {
     Preconditions.checkArgument(vertexPartitionMap != null, "vertex-partition map cannot be null");
     Preconditions.checkArgument(vertexPartitionMap.size() > 1,
-      "there must be more than 1 source " + "vertices, currently only " + vertexPartitionMap.size());
+      "there must be more than 1 source vertices, currently only " + vertexPartitionMap.size());
 
     this.isPartitioned = true;
     this.numPartitions = new int[vertexPartitionMap.size()];
@@ -151,7 +151,7 @@ public class CartesianProductConfig {
         "every source has 1 partition in a partitioned case");
     } else {
       Preconditions.checkArgument(this.numPartitions == null,
-        "partition counts should be null in unpartitioned case");
+        "partition counts should be null in fair cartesian product");
     }
   }
 
@@ -202,11 +202,6 @@ public class CartesianProductConfig {
       }
     }
 
-    builder.setMinFraction(
-      CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_SLOW_START_MIN_FRACTION_DEFAULT);
-    builder.setMaxFraction(
-      CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_SLOW_START_MAX_FRACTION_DEFAULT);
-
     if (conf != null) {
       builder.setMinFraction(conf.getFloat(
         CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_SLOW_START_MIN_FRACTION,
@@ -214,20 +209,36 @@ public class CartesianProductConfig {
       builder.setMaxFraction(conf.getFloat(
         CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_SLOW_START_MAX_FRACTION,
         CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_SLOW_START_MAX_FRACTION_DEFAULT));
-      String enableAutoGrouping =
-        conf.get(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_ENABLE_AUTO_GROUPING);
-      if (enableAutoGrouping != null) {
-        builder.setEnableAutoGrouping(Boolean.parseBoolean(enableAutoGrouping));
+      builder.setMaxParallelism(conf.getInt(
+        CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_MAX_PARALLELISM,
+        CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_MAX_PARALLELISM_DEFAULT));
+      builder.setMinOpsPerWorker(conf.getLong(
+        CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_MIN_OPS_PER_WORKER,
+        CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_MIN_OPS_PER_WORKER_DEFAULT));
+      builder.setEnableGrouping(conf.getBoolean(
+        CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_ENABLE_GROUPING,
+        CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_ENABLE_GROUPING_DEFAULT));
+      if (conf.get(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_GROUPING_FRACTION) != null) {
+        builder.setGroupingFraction(Float.parseFloat(
+          conf.get(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_GROUPING_FRACTION)));
+        Preconditions.checkArgument(0 < builder.getGroupingFraction() &&
+          builder.getGroupingFraction() <= 1, "grouping fraction should be larger than 0 and less" +
+          " or equal to 1, current value: " + builder.getGroupingFraction());
       }
-      String desiredBytesPerGroup =
-        conf.get(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_DESIRED_BYTES_PER_GROUP);
-      if (desiredBytesPerGroup != null) {
-        builder.setDesiredBytesPerChunk(Long.parseLong(desiredBytesPerGroup));
+      if (conf.get(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_NUM_PARTITIONS) != null) {
+        builder.setNumPartitionsForFairCase(Integer.parseInt(
+          conf.get(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_NUM_PARTITIONS)));
+        Preconditions.checkArgument(builder.getNumPartitionsForFairCase() > 0,
+          "Number of partitions for fair cartesian product should be positive integer");
       }
     }
     Preconditions.checkArgument(builder.getMinFraction() <= builder.getMaxFraction(),
       "min fraction(" + builder.getMinFraction() + ") should be less than max fraction(" +
         builder.getMaxFraction() + ") in cartesian product slow start");
+    Preconditions.checkArgument(builder.getMaxParallelism() > 0,
+      "max parallelism must be positive, currently is " + builder.getMaxParallelism());
+    Preconditions.checkArgument(builder.getMinOpsPerWorker() > 0,
+      "Min ops per worker must be positive, currently is " + builder.getMinOpsPerWorker());
 
     return builder.build();
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManager.java
index 1dbe6bf..a406c1b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManager.java
@@ -19,14 +19,17 @@ package org.apache.tez.runtime.library.cartesianproduct;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
 import org.apache.tez.dag.api.EdgeManagerPluginContext;
 import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
 
 import javax.annotation.Nullable;
 
+import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.*;
+
 /**
  * This EM wrap a real edge manager implementation object. It choose whether it's partitioned or
- * unpartitioned implementation according to the config. All method invocations are actually
+ * fair implementation according to the config. All method invocations are actually
  * redirected to real implementation.
  */
 public class CartesianProductEdgeManager extends EdgeManagerPluginOnDemand {
@@ -39,12 +42,12 @@ public class CartesianProductEdgeManager extends EdgeManagerPluginOnDemand {
   @Override
   public void initialize() throws Exception {
     Preconditions.checkArgument(getContext().getUserPayload() != null);
-    CartesianProductEdgeManagerConfig config = CartesianProductEdgeManagerConfig.fromUserPayload(
-      getContext().getUserPayload());
+    CartesianProductConfigProto config = CartesianProductConfigProto.parseFrom(
+      ByteString.copyFrom(getContext().getUserPayload().getPayload()));
     // no need to check config because config comes from VM and is already checked by VM
     edgeManagerReal = config.getIsPartitioned()
       ? new CartesianProductEdgeManagerPartitioned(getContext())
-      : new CartesianProductEdgeManagerUnpartitioned(getContext());
+      : new FairCartesianProductEdgeManager(getContext());
     edgeManagerReal.initialize(config);
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java
deleted file mode 100644
index df0bcfa..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.runtime.library.cartesianproduct;
-
-import com.google.common.primitives.Ints;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.tez.dag.api.UserPayload;
-
-import java.nio.ByteBuffer;
-
-import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.CartesianProductConfigProto;
-
-class CartesianProductEdgeManagerConfig extends CartesianProductConfig {
-  final int[] numChunksPerSrc;
-  final int numChunk;
-  final int chunkIdOffset;
-
-  protected CartesianProductEdgeManagerConfig(boolean isPartitioned, String[] sourceVertices,
-                                              int[] numPartitions, int[] numChunksPerSrc, int numChunk,
-                                              int chunkIdOffset,
-                                              CartesianProductFilterDescriptor filterDescriptor) {
-    super(isPartitioned, numPartitions, sourceVertices, filterDescriptor);
-    this.numChunksPerSrc = numChunksPerSrc;
-    this.numChunk = numChunk;
-    this.chunkIdOffset = chunkIdOffset;
-  }
-
-  public static CartesianProductEdgeManagerConfig fromUserPayload(UserPayload payload)
-    throws InvalidProtocolBufferException {
-    CartesianProductConfigProto proto =
-      CartesianProductConfigProto.parseFrom(ByteString.copyFrom(payload.getPayload()));
-
-    boolean isPartitioned = proto.getIsPartitioned();
-    String[] sources = new String[proto.getSourcesList().size()];
-    proto.getSourcesList().toArray(sources);
-    int[] numPartitions =
-      proto.getNumPartitionsCount() == 0 ? null : Ints.toArray(proto.getNumPartitionsList());
-    CartesianProductFilterDescriptor filterDescriptor = proto.hasFilterClassName()
-      ? new CartesianProductFilterDescriptor(proto.getFilterClassName()) : null;
-    if (proto.hasFilterUserPayload()) {
-      filterDescriptor.setUserPayload(
-        UserPayload.create(ByteBuffer.wrap(proto.getFilterUserPayload().toByteArray())));
-    }
-    int[] humChunksPerSrc =
-      proto.getNumChunksCount() == 0 ? null : Ints.toArray(proto.getNumChunksList());
-    int numChunk = proto.getNumChunk();
-    int chunkIdOffset = proto.getChunkIdOffset();
-    return new CartesianProductEdgeManagerConfig(isPartitioned, sources, numPartitions,
-      humChunksPerSrc, numChunk, chunkIdOffset, filterDescriptor);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java
index 5ece5cf..5f2910a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java
@@ -22,34 +22,44 @@ import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.dag.api.EdgeManagerPluginContext;
 import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata;
 import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.CompositeEventRouteMetadata;
+import org.apache.tez.dag.api.TezReflectionException;
 import org.apache.tez.dag.api.UserPayload;
 
 import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.*;
+
 class CartesianProductEdgeManagerPartitioned extends CartesianProductEdgeManagerReal {
   private int positionId;
   private CartesianProductFilter filter;
   private int[] taskIdMapping;
-  private CartesianProductEdgeManagerConfig config;
   private int[] numPartitions;
+  private List<String> sources;
 
   public CartesianProductEdgeManagerPartitioned(EdgeManagerPluginContext context) {
     super(context);
   }
 
   @Override
-  public void initialize(CartesianProductEdgeManagerConfig config) throws Exception {
-    this.config = config;
-    this.numPartitions = Ints.toArray(config.getNumPartitions());
-    positionId = config.getSourceVertices().indexOf(getContext().getSourceVertexName());
-    CartesianProductFilterDescriptor filterDescriptor = config.getFilterDescriptor();
-    if (filterDescriptor != null) {
-      filter = ReflectionUtils.createClazzInstance(filterDescriptor.getClassName(),
-        new Class[] {UserPayload.class}, new UserPayload[] {filterDescriptor.getUserPayload()});
+  public void initialize(CartesianProductConfigProto config) throws Exception {
+    this.numPartitions = Ints.toArray(config.getNumPartitionsList());
+    this.sources = config.getSourcesList();
+    this.positionId = sources.indexOf(getContext().getSourceVertexName());
+
+    if (config.hasFilterClassName()) {
+      UserPayload userPayload = config.hasFilterUserPayload()
+        ? UserPayload.create(ByteBuffer.wrap(config.getFilterUserPayload().toByteArray())) : null;
+      try {
+        filter = ReflectionUtils.createClazzInstance(config.getFilterClassName(),
+          new Class[]{UserPayload.class}, new UserPayload[]{userPayload});
+      } catch (TezReflectionException e) {
+        throw e;
+      }
     }
     generateTaskIdMapping();
   }
@@ -107,13 +117,12 @@ class CartesianProductEdgeManagerPartitioned extends CartesianProductEdgeManager
     CartesianProductCombination combination =
       new CartesianProductCombination(numPartitions);
     combination.firstTask();
-    List<String> sources = config.getSourceVertices();
     do {
       for (int i = 0; i < sources.size(); i++) {
         vertexPartitionMap.put(sources.get(i), combination.getCombination().get(i));
       }
       if (filter == null || filter.isValidCombination(vertexPartitionMap)) {
-        idealTaskId.add(combination.getChunkId());
+        idealTaskId.add(combination.getTaskId());
       }
     } while (combination.nextTask());
     this.taskIdMapping = Ints.toArray(idealTaskId);

http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java
index f22035b..0b91ec2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java
@@ -20,6 +20,7 @@ package org.apache.tez.runtime.library.cartesianproduct;
 import org.apache.tez.dag.api.EdgeManagerPluginContext;
 import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata;
 import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.CompositeEventRouteMetadata;
+import org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.CartesianProductConfigProto;
 
 /**
  * base class of cartesian product edge manager implementation
@@ -35,7 +36,7 @@ abstract class CartesianProductEdgeManagerReal {
     return this.context;
   }
 
-  public abstract void initialize(CartesianProductEdgeManagerConfig config) throws Exception;
+  public abstract void initialize(CartesianProductConfigProto config) throws Exception;
 
   public void prepareForRouting() throws Exception {}
 

http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java
deleted file mode 100644
index 80d7dc1..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.runtime.library.cartesianproduct;
-
-import org.apache.tez.dag.api.EdgeManagerPluginContext;
-import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.CompositeEventRouteMetadata;
-import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata;
-import org.apache.tez.runtime.library.utils.Grouper;
-
-import javax.annotation.Nullable;
-
-
-class CartesianProductEdgeManagerUnpartitioned extends CartesianProductEdgeManagerReal {
-  private int positionId;
-  private int numChunk;
-  private int chunkIdOffset;
-  private int[] numChunkPerSrc;
-  private int numDestinationConsumerTasks;
-  private Grouper grouper = new Grouper();
-
-  public CartesianProductEdgeManagerUnpartitioned(EdgeManagerPluginContext context) {
-    super(context);
-  }
-
-  public void initialize(CartesianProductEdgeManagerConfig config) {
-    String groupName = getContext().getVertexGroupName();
-    String srcName = groupName != null ? groupName : getContext().getSourceVertexName();
-    this.positionId = config.getSourceVertices().indexOf(srcName);
-    this.numChunkPerSrc = config.numChunksPerSrc;
-    this.numChunk = config.numChunk;
-    this.chunkIdOffset = config.chunkIdOffset;
-
-    if (numChunk != 0) {
-      grouper.init(getContext().getSourceVertexNumTasks(), numChunk);
-      numDestinationConsumerTasks = 1;
-      for (int numGroup : numChunkPerSrc) {
-        numDestinationConsumerTasks *= numGroup;
-      }
-      numDestinationConsumerTasks /= numChunkPerSrc[positionId];
-    }
-  }
-
-  @Override
-  public int routeInputErrorEventToSource(int destTaskId, int failedInputId) throws Exception {
-    return failedInputId + grouper.getFirstTaskInGroup(
-      CartesianProductCombination.fromTaskId(numChunkPerSrc, destTaskId).getCombination().get(positionId)
-        - chunkIdOffset);
-  }
-
-  @Override
-  public EventRouteMetadata routeDataMovementEventToDestination(int srcTaskId, int srcOutputId,
-                                                                int destTaskId) throws Exception {
-    int chunkId =
-      CartesianProductCombination.fromTaskId(numChunkPerSrc, destTaskId).getCombination().get(positionId)
-        - chunkIdOffset;
-    if (0 <= chunkId && chunkId < numChunk && grouper.isInGroup(srcTaskId, chunkId)) {
-      int idx = srcTaskId - grouper.getFirstTaskInGroup(chunkId);
-      return EventRouteMetadata.create(1, new int[] {idx});
-    }
-    return null;
-  }
-
-  @Nullable
-  @Override
-  public CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId,
-                                                                                  int destTaskId)
-    throws Exception {
-    int chunkId =
-      CartesianProductCombination.fromTaskId(numChunkPerSrc, destTaskId).getCombination().get(positionId)
-        - chunkIdOffset;
-    if (0 <= chunkId && chunkId < numChunk && grouper.isInGroup(srcTaskId, chunkId)) {
-      int idx = srcTaskId - grouper.getFirstTaskInGroup(chunkId);
-      return CompositeEventRouteMetadata.create(1, idx, 0);
-    }
-    return null;
-  }
-
-  @Nullable
-  @Override
-  public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(int srcTaskId,
-                                                                         int destTaskId)
-    throws Exception {
-    int chunkId =
-      CartesianProductCombination.fromTaskId(numChunkPerSrc, destTaskId).getCombination().get(positionId)
-        - chunkIdOffset;
-    if (0 <= chunkId && chunkId < numChunk && grouper.isInGroup(srcTaskId, chunkId)) {
-      int idx = srcTaskId - grouper.getFirstTaskInGroup(chunkId);
-      return EventRouteMetadata.create(1, new int[] {idx});
-    }
-    return null;
-  }
-
-  @Override
-  public int getNumDestinationTaskPhysicalInputs(int destTaskId) {
-    int chunkId =
-      CartesianProductCombination.fromTaskId(numChunkPerSrc, destTaskId).getCombination().get(positionId)
-        - chunkIdOffset;
-    return 0 <= chunkId && chunkId < numChunk ? grouper.getNumTasksInGroup(chunkId) : 0;
-  }
-
-  @Override
-  public int getNumSourceTaskPhysicalOutputs(int srcTaskId) {
-    return 1;
-  }
-
-  @Override
-  public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
-    return numDestinationConsumerTasks;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java
index 857f11e..ff22593 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java
@@ -19,6 +19,7 @@ package org.apache.tez.runtime.library.cartesianproduct;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
 import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.InputDescriptor;
@@ -39,10 +40,11 @@ import java.util.Set;
 
 import static org.apache.tez.dag.api.EdgeProperty.DataMovementType.BROADCAST;
 import static org.apache.tez.dag.api.EdgeProperty.DataMovementType.CUSTOM;
+import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.*;
 
 /**
  * This VM wrap a real vertex manager implementation object. It choose whether it's partitioned or
- * unpartitioned implementation according to the config. All method invocations are actually
+ * fair implementation according to the config. All method invocations are actually
  * redirected to real implementation.
  *
  * Predefined parallelism isn't allowed for cartesian product vertex. Parallellism has to be
@@ -51,9 +53,9 @@ import static org.apache.tez.dag.api.EdgeProperty.DataMovementType.CUSTOM;
  * If a vertex use this vertex, its input edges must be either cartesian product edge or broadcast
  * edge.
  *
- * Sources can be either vertices or vertex groups (only in unpartitioned case).
+ * Sources can be either vertices or vertex groups (only in fair cartesian product).
  *
- * Slow start only works in partitioned case. Auto grouping only works in unpartitioned case.
+ * Slow start only works in partitioned case.
  */
 public class CartesianProductVertexManager extends VertexManagerPlugin {
   /**
@@ -72,22 +74,46 @@ public class CartesianProductVertexManager extends VertexManagerPlugin {
   public static final float TEZ_CARTESIAN_PRODUCT_SLOW_START_MAX_FRACTION_DEFAULT = 0.75f;
 
   /**
-   * Enables automatic grouping. It groups source tasks of each cartesian product source vertex
-   * so that every group generates similar output size. And parallelism can be reduced because
-   * destination tasks handle combinations of per group output instead of per task output. This is
-   * only available for unpartitioned case for now, and it's useful for scenarios where there are
-   * many source tasks generate small outputs.
+   * Num partitions as int value, for fair cartesian product only.
+   * Set this if auto determined num partition is not large enough
    */
-  public static final String TEZ_CARTESIAN_PRODUCT_ENABLE_AUTO_GROUPING =
-    "tez.cartesian-product.enable-auto-grouping";
-  public static final boolean TEZ_CARTESIAN_PRODUCT_ENABLE_AUTO_GROUPING_DEFAULT = false;
+  public static final String TEZ_CARTESIAN_PRODUCT_NUM_PARTITIONS =
+    "tez.cartesian-product.num-partitions";
 
   /**
-   * The number of output bytes we want from each group.
+   * Whether to disable grouping in fair cartesian product
+   * If this is set to true, it's best to set "tez.cartesian-product.num-partitions" to 1 to avoid
+   * unnecessary overhead caused by multiple partitions.
    */
-  public static final String TEZ_CARTESIAN_PRODUCT_DESIRED_BYTES_PER_GROUP =
-    "tez.cartesian-product.desired-input-per-src";
-  public static final long TEZ_CARTESIAN_PRODUCT_DESIRED_BYTES_PER_GROUP_DEFAULT = 32 * 1024 * 1024;
+  public static final String TEZ_CARTESIAN_PRODUCT_ENABLE_GROUPING =
+    "tez.cartesian-product.disable-grouping";
+  public static final boolean TEZ_CARTESIAN_PRODUCT_ENABLE_GROUPING_DEFAULT = true;
+
+  /**
+   * If every source vertex has this percents of tasks completed and generate some output,
+   * we can begin auto grouping.
+   *
+   * Positive float value, max 1.
+   * If not set, auto grouping will begin once every source vertex generate enough output
+   */
+  public static final String TEZ_CARTESIAN_PRODUCT_GROUPING_FRACTION =
+    "tez.cartesian-product.grouping-fraction";
+
+  /**
+   * Max parallelism, for fair cartesian product only.
+   * This is used to avoid get too many tasks. The value must be positive.
+   */
+  public static final String TEZ_CARTESIAN_PRODUCT_MAX_PARALLELISM =
+    "tez.cartesian-product.max-parallelism";
+  public static final int TEZ_CARTESIAN_PRODUCT_MAX_PARALLELISM_DEFAULT = 1000;
+
+  /**
+   * Min cartesian product operations per worker, for fair cartesian product only.
+   * This is used to avoid a task gets too small workload. The value must be positive.
+   */
+  public static final String TEZ_CARTESIAN_PRODUCT_MIN_OPS_PER_WORKER =
+    "tez.cartesian-product.min-ops-per-worker";
+  public static final long TEZ_CARTESIAN_PRODUCT_MIN_OPS_PER_WORKER_DEFAULT = 1000000;
 
   private CartesianProductVertexManagerReal vertexManagerReal = null;
 
@@ -99,12 +125,12 @@ public class CartesianProductVertexManager extends VertexManagerPlugin {
 
   @Override
   public void initialize() throws Exception {
-    CartesianProductVertexManagerConfig config =
-      CartesianProductVertexManagerConfig.fromUserPayload(getContext().getUserPayload());
+    CartesianProductConfigProto config = CartesianProductConfigProto.parseFrom(
+      ByteString.copyFrom(getContext().getUserPayload().getPayload()));
     // check whether DAG and config are is consistent
     Map<String, EdgeProperty> edgePropertyMap = getContext().getInputVertexEdgeProperties();
     Set<String> sourceVerticesDAG = edgePropertyMap.keySet();
-    Set<String> sourceVerticesConfig = new HashSet<>(config.getSourceVertices());
+    Set<String> sourceVerticesConfig = new HashSet<>(config.getSourcesList());
 
     Map<String, List<String>> vertexGroups = getContext().getInputVertexGroups();
     Map<String, String> vertexToGroup = new HashMap<>();
@@ -159,7 +185,7 @@ public class CartesianProductVertexManager extends VertexManagerPlugin {
 
     vertexManagerReal = config.getIsPartitioned()
       ? new CartesianProductVertexManagerPartitioned(getContext())
-      : new CartesianProductVertexManagerUnpartitioned(getContext());
+      : new FairCartesianProductVertexManager(getContext());
     vertexManagerReal.initialize(config);
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java
deleted file mode 100644
index e082ec3..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.runtime.library.cartesianproduct;
-
-import com.google.common.base.Preconditions;
-import com.google.common.primitives.Ints;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.tez.dag.api.UserPayload;
-
-import java.nio.ByteBuffer;
-
-import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.*;
-
-class CartesianProductVertexManagerConfig extends CartesianProductConfig {
-  final float minFraction;
-  final float maxFraction;
-  final boolean enableAutoGrouping;
-  final long desiredBytesPerChunk;
-
-  public CartesianProductVertexManagerConfig(boolean isPartitioned, String[] sources,
-                                             int[] numPartitions,
-                                             float minFraction, float maxFraction,
-                                             boolean enableAutoGrouping, long desiredBytesPerChunk,
-                                             CartesianProductFilterDescriptor filterDescriptor) {
-    super(isPartitioned, numPartitions, sources, filterDescriptor);
-    Preconditions.checkArgument(minFraction <= maxFraction,
-      "min fraction(" + minFraction + ") should be less than max fraction(" +
-        maxFraction  + ") in cartesian product slow start");
-    this.minFraction = minFraction;
-    this.maxFraction = maxFraction;
-    this.enableAutoGrouping = enableAutoGrouping;
-    this.desiredBytesPerChunk = desiredBytesPerChunk;
-  }
-
-  public static CartesianProductVertexManagerConfig fromUserPayload(UserPayload payload)
-    throws InvalidProtocolBufferException {
-    CartesianProductConfigProto proto =
-      CartesianProductConfigProto.parseFrom(ByteString.copyFrom(payload.getPayload()));
-
-    boolean isPartitioned = proto.getIsPartitioned();
-    String[] sources = new String[proto.getSourcesList().size()];
-    proto.getSourcesList().toArray(sources);
-    int[] numPartitions =
-      proto.getNumPartitionsCount() == 0 ? null : Ints.toArray(proto.getNumPartitionsList());
-    CartesianProductFilterDescriptor filterDescriptor = proto.hasFilterClassName()
-      ? new CartesianProductFilterDescriptor(proto.getFilterClassName()) : null;
-    if (proto.hasFilterUserPayload()) {
-      filterDescriptor.setUserPayload(
-        UserPayload.create(ByteBuffer.wrap(proto.getFilterUserPayload().toByteArray())));
-    }
-    float minFraction = proto.getMinFraction();
-    float maxFraction = proto.getMaxFraction();
-
-    boolean enableAutoGrouping = proto.hasEnableAutoGrouping() ? proto.getEnableAutoGrouping()
-      : CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_ENABLE_AUTO_GROUPING_DEFAULT;
-    long desiredBytesPerGroup = proto.hasDesiredBytesPerChunk() ? proto.getDesiredBytesPerChunk()
-      : CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_DESIRED_BYTES_PER_GROUP_DEFAULT;
-    return new CartesianProductVertexManagerConfig(isPartitioned, sources, numPartitions,
-      minFraction, maxFraction, enableAutoGrouping, desiredBytesPerGroup, filterDescriptor);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java
index ddff37d..e4aaad6 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java
@@ -27,10 +27,12 @@ import org.apache.tez.dag.api.event.VertexState;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.runtime.api.TaskAttemptIdentifier;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.CartesianProductConfigProto;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.EnumSet;
@@ -43,8 +45,9 @@ import java.util.Map;
  * min fraction and schedules all task when max fraction is reached
  */
 class CartesianProductVertexManagerPartitioned extends CartesianProductVertexManagerReal {
-  private CartesianProductVertexManagerConfig config;
   private List<String> sourceVertices;
+  private int[] numPartitions;
+  private float minFraction, maxFraction;
   private int parallelism = 0;
   private boolean vertexStarted = false;
   private boolean vertexReconfigured = false;
@@ -64,19 +67,26 @@ class CartesianProductVertexManagerPartitioned extends CartesianProductVertexMan
   }
 
   @Override
-  public void initialize(CartesianProductVertexManagerConfig config) throws TezReflectionException {
-    this.config = config;
-    this.sourceVertices = config.getSourceVertices();
-    CartesianProductFilterDescriptor filterDescriptor = config.getFilterDescriptor();
-    if (filterDescriptor != null) {
+  public void initialize(CartesianProductConfigProto config) throws TezReflectionException {
+    this.sourceVertices = config.getSourcesList();
+    this.numPartitions = Ints.toArray(config.getNumPartitionsList());
+    this.minFraction = config.hasMinFraction() ? config.getMinFraction()
+      : CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_SLOW_START_MIN_FRACTION_DEFAULT;
+    this.maxFraction = config.hasMaxFraction() ? config.getMaxFraction()
+      : CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_SLOW_START_MAX_FRACTION_DEFAULT;
+
+    if (config.hasFilterClassName()) {
+      UserPayload userPayload = config.hasFilterUserPayload()
+        ? UserPayload.create(ByteBuffer.wrap(config.getFilterUserPayload().toByteArray())) : null;
       try {
-        filter = ReflectionUtils.createClazzInstance(filterDescriptor.getClassName(),
-          new Class[]{UserPayload.class}, new UserPayload[]{filterDescriptor.getUserPayload()});
+        filter = ReflectionUtils.createClazzInstance(config.getFilterClassName(),
+          new Class[]{UserPayload.class}, new UserPayload[]{userPayload});
       } catch (TezReflectionException e) {
         LOG.error("Creating filter failed");
         throw e;
       }
     }
+
     for (String sourceVertex : sourceVertices) {
       sourceTaskCompleted.put(sourceVertex, new BitSet());
     }
@@ -147,7 +157,7 @@ class CartesianProductVertexManagerPartitioned extends CartesianProductVertexMan
     Map<String, Integer> vertexPartitionMap = new HashMap<>();
 
     CartesianProductCombination combination =
-      new CartesianProductCombination(Ints.toArray(config.getNumPartitions()));
+      new CartesianProductCombination(numPartitions);
     combination.firstTask();
     do {
       for (int i = 0; i < sourceVertices.size(); i++) {
@@ -174,12 +184,12 @@ class CartesianProductVertexManagerPartitioned extends CartesianProductVertexMan
     // determine the destination task with largest id to schedule
     float percentFinishedSrcTask = numFinishedSrcTasks*1f/totalNumSrcTasks;
     int numTaskToSchedule;
-    if (percentFinishedSrcTask < config.minFraction) {
+    if (percentFinishedSrcTask < minFraction) {
       numTaskToSchedule = 0;
-    } else if (config.minFraction <= percentFinishedSrcTask &&
-        percentFinishedSrcTask <= config.maxFraction) {
-      numTaskToSchedule = (int) ((percentFinishedSrcTask-config.minFraction)
-        /(config.maxFraction-config.minFraction)*parallelism);
+    } else if (minFraction <= percentFinishedSrcTask &&
+        percentFinishedSrcTask <= maxFraction) {
+      numTaskToSchedule = (int) ((percentFinishedSrcTask - minFraction)
+        /(maxFraction - minFraction) * parallelism);
     } else {
       numTaskToSchedule = parallelism;
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerReal.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerReal.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerReal.java
index 1a397fd..f28f4a3 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerReal.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerReal.java
@@ -21,6 +21,7 @@ import org.apache.tez.dag.api.VertexManagerPluginContext;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.runtime.api.TaskAttemptIdentifier;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.CartesianProductConfigProto;
 
 import java.io.IOException;
 import java.util.List;
@@ -39,7 +40,7 @@ abstract class CartesianProductVertexManagerReal {
     return this.context;
   }
 
-  public abstract void initialize(CartesianProductVertexManagerConfig config) throws Exception;
+  public abstract void initialize(CartesianProductConfigProto config) throws Exception;
 
   public abstract void onVertexManagerEventReceived(VertexManagerEvent vmEvent) throws IOException;
 

http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerUnpartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerUnpartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerUnpartitioned.java
deleted file mode 100644
index 46ea76e..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerUnpartitioned.java
+++ /dev/null
@@ -1,438 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.runtime.library.cartesianproduct;
-
-import com.google.common.primitives.Ints;
-import com.google.protobuf.ByteString;
-import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.dag.api.VertexManagerPluginContext;
-import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
-import org.apache.tez.dag.api.event.VertexState;
-import org.apache.tez.dag.api.event.VertexStateUpdate;
-import org.apache.tez.runtime.api.TaskAttemptIdentifier;
-import org.apache.tez.runtime.api.events.VertexManagerEvent;
-import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
-import org.apache.tez.runtime.library.utils.Grouper;
-import org.roaringbitmap.RoaringBitmap;
-import org.slf4j.Logger;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-
-import static org.apache.tez.dag.api.EdgeProperty.DataMovementType.CUSTOM;
-import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.CartesianProductConfigProto;
-
-/**
- * In unpartitioned case, we have one destination task for each source chunk combination. A source
- * is a source vertex or a source vertex group. A chunk is one source task (without auto grouping)
- * or a group of source tasks (with auto grouping). A chunk may contains multiple tasks across
- * vertices. The mapping from source chunk to destination task id is done by
- * {@link <CartesianProductCombination>}.
- *
- * If auto grouping is enabled, this vertex manager will estimate output size of each source and
- * group source tasks of each source in chunk according to desired grouping size configured by user.
- *
- *
- */
-class CartesianProductVertexManagerUnpartitioned extends CartesianProductVertexManagerReal {
-  /**
-   * a cartesian product source
-   */
-  static class Source {
-    // list of source vertices of this source
-    List<SrcVertex> srcVertices = new ArrayList<>();
-    // position of this source in all sources
-    int position;
-    // name of source vertex or vertex group
-    String name;
-
-    // total number of chunks in this source
-    public int getNumChunk() {
-      int numChunk = 0;
-      for (SrcVertex srcV : srcVertices) {
-        numChunk += srcV.numChunk;
-      }
-      return numChunk;
-    }
-
-    // whether this source has any task completed
-    public boolean hasTaskCompleted() {
-      for (SrcVertex srcV : srcVertices) {
-        if (!srcV.taskCompleted.isEmpty()) {
-          return true;
-        }
-      }
-      return false;
-    }
-
-    public String toString(boolean afterReconfigure) {
-      StringBuilder sb = new StringBuilder();
-      sb.append("Source at position ");
-      sb.append(position);
-      if (name != null) {
-        sb.append(", ");
-        sb.append("vertex group ");
-        sb.append(name);
-
-      }
-      sb.append(": {");
-      for (SrcVertex srcV : srcVertices) {
-        sb.append("[");
-        sb.append(srcV.toString(afterReconfigure));
-        sb.append("], ");
-      }
-      sb.deleteCharAt(sb.length() - 1);
-      sb.setCharAt(sb.length() - 1, '}');
-      return sb.toString();
-    }
-  }
-
-  /**
-   * a cartesian product source vertex
-   */
-  class SrcVertex {
-    // which source this vertex belongs to
-    Source source;
-    // vertex name
-    String name;
-    int numTask;
-    // num chunks of this source vertex
-    int numChunk;
-    // offset of chunk id in vertex group
-    // we need sequence chunks in the vertex group to make them look like from single vertex
-    int chunkIdOffset = 0;
-    RoaringBitmap taskCompleted = new RoaringBitmap();
-    RoaringBitmap taskWithVMEvent = new RoaringBitmap();
-    long outputBytes;
-
-    public void doGrouping() {
-      numChunk = numTask;
-      if (config.enableAutoGrouping) {
-        outputBytes = outputBytes * numTask / taskWithVMEvent.getCardinality();
-        numChunk = Math.min(numChunk,
-          (int) ((outputBytes + config.desiredBytesPerChunk - 1) / config.desiredBytesPerChunk));
-      }
-    }
-
-    public String toString(boolean afterReconfigure) {
-      StringBuilder sb = new StringBuilder();
-      sb.append("vertex ").append(name).append(", ");
-      if (afterReconfigure) {
-        sb.append("estimated output ").append(outputBytes).append(" bytes, ");
-        sb.append(numChunk).append(" chunks");
-      } else {
-        sb.append(numTask).append(" tasks, ");
-        sb.append(taskWithVMEvent.getCardinality()).append(" VMEvents, ");
-        sb.append("output ").append(outputBytes).append(" bytes");
-      }
-      return sb.toString();
-    }
-  }
-
-  private static final Logger LOG =
-    org.slf4j.LoggerFactory.getLogger(CartesianProductVertexManagerUnpartitioned.class);
-
-  CartesianProductVertexManagerConfig config;
-  Map<String, Source> sourcesByName = new HashMap<>();
-  Map<String, SrcVertex> srcVerticesByName = new HashMap<>();
-
-  private boolean vertexReconfigured = false;
-  private boolean vertexStarted = false;
-  private boolean vertexStartSchedule = false;
-  private int numCPSrcNotInConfigureState = 0;
-  private int numBroadcastSrcNotInRunningState = 0;
-  private Queue<TaskAttemptIdentifier> completedSrcTaskToProcess = new LinkedList<>();
-  private RoaringBitmap scheduledTasks = new RoaringBitmap();
-
-  /* auto reduce related */
-  // num of chunks of source at the corresponding position in source list
-  private int[] numChunksPerSrc;
-  private Set<String> vertexSentVME = new HashSet<>();
-  private Grouper grouper = new Grouper();
-
-  public CartesianProductVertexManagerUnpartitioned(VertexManagerPluginContext context) {
-    super(context);
-  }
-
-  @Override
-  public void initialize(CartesianProductVertexManagerConfig config) throws Exception {
-    for (Map.Entry<String, EdgeProperty> e : getContext().getInputVertexEdgeProperties().entrySet()) {
-      if (e.getValue().getDataMovementType() == CUSTOM
-        && e.getValue().getEdgeManagerDescriptor().getClassName()
-          .equals(CartesianProductEdgeManager.class.getName())) {
-        srcVerticesByName.put(e.getKey(), new SrcVertex());
-        srcVerticesByName.get(e.getKey()).name = e.getKey();
-        getContext().registerForVertexStateUpdates(e.getKey(), EnumSet.of(VertexState.CONFIGURED));
-        numCPSrcNotInConfigureState++;
-      } else {
-        getContext().registerForVertexStateUpdates(e.getKey(), EnumSet.of(VertexState.RUNNING));
-        numBroadcastSrcNotInRunningState++;
-      }
-    }
-
-    Map<String, List<String>> srcGroups = getContext().getInputVertexGroups();
-    for (int i = 0; i < config.getSourceVertices().size(); i++) {
-      String srcName = config.getSourceVertices().get(i);
-      Source source = new Source();
-      source.position = i;
-      if (srcGroups.containsKey(srcName)) {
-        source.name = srcName;
-        for (String srcVName : srcGroups.get(srcName)) {
-          source.srcVertices.add(srcVerticesByName.get(srcVName));
-          srcVerticesByName.get(srcVName).source = source;
-        }
-      } else {
-        source.srcVertices.add(srcVerticesByName.get(srcName));
-        srcVerticesByName.get(srcName).source = source;
-      }
-      sourcesByName.put(srcName, source);
-    }
-
-    numChunksPerSrc = new int[sourcesByName.size()];
-    this.config = config;
-    getContext().vertexReconfigurationPlanned();
-  }
-
-  @Override
-  public synchronized void onVertexStarted(List<TaskAttemptIdentifier> completions)
-    throws Exception {
-    vertexStarted = true;
-    if (completions != null) {
-      for (TaskAttemptIdentifier attempt : completions) {
-        addCompletedSrcTaskToProcess(attempt);
-      }
-    }
-    tryScheduleTasks();
-  }
-
-  @Override
-  public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws IOException {
-    String vertex = stateUpdate.getVertexName();
-    VertexState state = stateUpdate.getVertexState();
-
-    if (state == VertexState.CONFIGURED) {
-      srcVerticesByName.get(vertex).numTask = getContext().getVertexNumTasks(vertex);
-      numCPSrcNotInConfigureState--;
-    } else if (state == VertexState.RUNNING) {
-      numBroadcastSrcNotInRunningState--;
-    }
-    tryScheduleTasks();
-  }
-
-  @Override
-  public synchronized void onSourceTaskCompleted(TaskAttemptIdentifier attempt) throws Exception {
-    addCompletedSrcTaskToProcess(attempt);
-    tryScheduleTasks();
-  }
-
-  private void addCompletedSrcTaskToProcess(TaskAttemptIdentifier attempt) {
-    int taskId = attempt.getTaskIdentifier().getIdentifier();
-    String vertex = attempt.getTaskIdentifier().getVertexIdentifier().getName();
-    SrcVertex srcV = srcVerticesByName.get(vertex);
-    if (srcV != null && !srcV.taskCompleted.contains(taskId)) {
-      srcV.taskCompleted.add(taskId);
-      completedSrcTaskToProcess.add(attempt);
-    }
-  }
-
-  private boolean tryStartSchedule() {
-    if (!vertexReconfigured || !vertexStarted || numBroadcastSrcNotInRunningState > 0) {
-      return false;
-    }
-
-    for (Source src : sourcesByName.values()) {
-      if (!src.hasTaskCompleted()) {
-        return false;
-      }
-    }
-    vertexStartSchedule = true;
-    return true;
-  }
-
-  public synchronized void onVertexManagerEventReceived(VertexManagerEvent vmEvent)
-    throws IOException {
-    /* vmEvent after reconfigure doesn't matter */
-    if (vertexReconfigured) {
-      return;
-    }
-
-    if (vmEvent.getUserPayload() != null) {
-      String srcVertex =
-        vmEvent.getProducerAttemptIdentifier().getTaskIdentifier().getVertexIdentifier().getName();
-      SrcVertex srcV = srcVerticesByName.get(srcVertex);
-
-      // vmEvent from non-cp vertex doesn't matter
-      if (srcV == null) {
-        return;
-      }
-
-      VertexManagerEventPayloadProto proto =
-        VertexManagerEventPayloadProto.parseFrom(ByteString.copyFrom(vmEvent.getUserPayload()));
-      srcV.outputBytes += proto.getOutputSize();
-      srcV.taskWithVMEvent.add(vmEvent.getProducerAttemptIdentifier().getTaskIdentifier().getIdentifier());
-      vertexSentVME.add(srcVertex);
-    }
-
-    tryScheduleTasks();
-  }
-
-  private boolean tryReconfigure() throws IOException {
-    if (numCPSrcNotInConfigureState > 0) {
-      return false;
-    }
-    if (config.enableAutoGrouping) {
-      if (vertexSentVME.size() != srcVerticesByName.size()) {
-        return false;
-      }
-      // every src v must output at least one chunk size
-      for (SrcVertex srcV : srcVerticesByName.values()) {
-        if (srcV.outputBytes < config.desiredBytesPerChunk
-          && srcV.taskWithVMEvent.getCardinality() < srcV.numTask) {
-          return false;
-        }
-      }
-    }
-
-    LOG.info("Start reconfigure, grouping: " + config.enableAutoGrouping
-      + ", chunk size: " + config.desiredBytesPerChunk + " bytes.");
-    for (String srcName : config.getSourceVertices()) {
-      LOG.info(sourcesByName.get(srcName).toString(false));
-    }
-
-    for (Source src : sourcesByName.values()) {
-      for (int i = 0; i < src.srcVertices.size(); i++) {
-        src.srcVertices.get(i).doGrouping();
-        if (i > 0) {
-          src.srcVertices.get(i).chunkIdOffset += src.srcVertices.get(i-1).numChunk;
-        }
-      }
-      numChunksPerSrc[src.position] = src.getNumChunk();
-    }
-
-    int parallelism = 1;
-    for (Source src : sourcesByName.values()) {
-      parallelism *= src.getNumChunk();
-    }
-
-    LOG.info("After reconfigure, ");
-    for (String srcName : config.getSourceVertices()) {
-      LOG.info(sourcesByName.get(srcName).toString(true));
-    }
-    LOG.info("Final parallelism: " + parallelism);
-
-    CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder();
-    for (int i = 0; i < numChunksPerSrc.length; i++) {
-      numChunksPerSrc[i] = sourcesByName.get(config.getSourceVertices().get(i)).getNumChunk();
-    }
-    builder.setIsPartitioned(false).addAllSources(config.getSourceVertices())
-      .addAllNumChunks(Ints.asList(this.numChunksPerSrc));
-
-    Map<String, EdgeProperty> edgeProperties = getContext().getInputVertexEdgeProperties();
-    Iterator<Map.Entry<String,EdgeProperty>> iter = edgeProperties.entrySet().iterator();
-    while (iter.hasNext()) {
-      Map.Entry<String, EdgeProperty> e = iter.next();
-      if (e.getValue().getDataMovementType() != CUSTOM) {
-        iter.remove();
-      } else {
-        SrcVertex srcV = srcVerticesByName.get(e.getKey());
-        builder.setNumChunk(srcV.numChunk).setChunkIdOffset(srcV.chunkIdOffset);
-        e.getValue().getEdgeManagerDescriptor()
-          .setUserPayload(UserPayload.create(ByteBuffer.wrap(builder.build().toByteArray())));
-      }
-    }
-    getContext().reconfigureVertex(parallelism, null, edgeProperties);
-    vertexReconfigured = true;
-    getContext().doneReconfiguringVertex();
-    return true;
-  }
-
-  private void tryScheduleTasks() throws IOException {
-    if (!vertexReconfigured && !tryReconfigure()) {
-      return;
-    }
-    if (!vertexStartSchedule && !tryStartSchedule()) {
-      return;
-    }
-
-    while (!completedSrcTaskToProcess.isEmpty()) {
-      scheduleTasksDependOnCompletion(completedSrcTaskToProcess.poll());
-    }
-  }
-
-  private void scheduleTasksDependOnCompletion(TaskAttemptIdentifier attempt) {
-    int taskId = attempt.getTaskIdentifier().getIdentifier();
-    String vertex = attempt.getTaskIdentifier().getVertexIdentifier().getName();
-    SrcVertex srcV = srcVerticesByName.get(vertex);
-    Source src = srcV.source;
-
-    List<ScheduleTaskRequest> requests = new ArrayList<>();
-    CartesianProductCombination combination =
-      new CartesianProductCombination(numChunksPerSrc, src.position);
-    grouper.init(srcV.numTask, srcV.numChunk);
-    combination.firstTaskWithFixedChunk(grouper.getGroupId(taskId) + srcV.chunkIdOffset);
-    do {
-      List<Integer> list = combination.getCombination();
-
-      if (scheduledTasks.contains(combination.getChunkId())) {
-        continue;
-      }
-      boolean readyToSchedule = true;
-      for (int i = 0; i < list.size(); i++) {
-        int chunkId = list.get(i);
-        SrcVertex srcVHasGroup = null;
-        for (SrcVertex v : sourcesByName.get(config.getSourceVertices().get(i)).srcVertices) {
-          if (v.chunkIdOffset <= chunkId && chunkId < v.chunkIdOffset + v.numChunk) {
-            srcVHasGroup = v;
-            break;
-          }
-        }
-        assert srcVHasGroup != null;
-        grouper.init(srcVHasGroup.numTask, srcVHasGroup.numChunk);
-        chunkId -= srcVHasGroup.chunkIdOffset;
-        for (int j = grouper.getFirstTaskInGroup(chunkId); j <= grouper.getLastTaskInGroup(chunkId); j++) {
-          if (!srcVHasGroup.taskCompleted.contains(j)) {
-            readyToSchedule = false;
-            break;
-          }
-        }
-        if (!readyToSchedule) {
-          break;
-        }
-      }
-
-      if (readyToSchedule) {
-        requests.add(ScheduleTaskRequest.create(combination.getChunkId(), null));
-        scheduledTasks.add(combination.getChunkId());
-      }
-    } while (combination.nextTaskWithFixedChunk());
-    if (!requests.isEmpty()) {
-      getContext().scheduleTasks(requests);
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/FairCartesianProductEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/FairCartesianProductEdgeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/FairCartesianProductEdgeManager.java
new file mode 100644
index 0000000..3085e5e
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/FairCartesianProductEdgeManager.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.primitives.Ints;
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.CompositeEventRouteMetadata;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata;
+import org.apache.tez.runtime.library.utils.Grouper;
+
+import javax.annotation.Nullable;
+
+import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductCombination.fromTaskId;
+import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.*;
+
+
+class FairCartesianProductEdgeManager extends CartesianProductEdgeManagerReal {
+  private int numPartition;
+  // position of current source in all cartesian product sources
+  private int positionInSrc;
+  // #chunk of each cartesian product source
+  private int[] numChunkPerSrc;
+  // #task of each vertex in vertex group that contains src vertex
+  private int[] numTaskPerSrcVertexInGroup;
+  // position of src vertex in vertex group
+  private int positionInGroup;
+  // # destination tasks that consume same chunk
+  private int numDestConsumerPerChunk;
+  private Grouper grouper = new Grouper();
+  private Grouper grouperForComputeOffset = new Grouper();
+
+  public FairCartesianProductEdgeManager(EdgeManagerPluginContext context) {
+    super(context);
+  }
+
+  @Override
+  public void initialize(CartesianProductConfigProto config) {
+    String groupName = getContext().getVertexGroupName();
+    String srcName = groupName != null ? groupName : getContext().getSourceVertexName();
+    this.positionInSrc = config.getSourcesList().indexOf(srcName);
+
+    if (config.hasNumPartitionsForFairCase()) {
+      this.numPartition = config.getNumPartitionsForFairCase();
+    } else {
+      this.numPartition = (int) Math.pow(config.getMaxParallelism(), 1.0 / config.getSourcesCount());
+    }
+
+    if (config.getNumChunksCount() > 0) {
+      // initialize after reconfiguration
+      this.numChunkPerSrc = Ints.toArray(config.getNumChunksList());
+      grouper.init(getContext().getSourceVertexNumTasks() * numPartition,
+        numChunkPerSrc[positionInSrc]);
+      this.numTaskPerSrcVertexInGroup = Ints.toArray(config.getNumTaskPerVertexInGroupList());
+      this.positionInGroup = config.getPositionInGroup();
+
+      numDestConsumerPerChunk = 1;
+      for (int numChunk : numChunkPerSrc) {
+        numDestConsumerPerChunk *= numChunk;
+      }
+      numDestConsumerPerChunk /= numChunkPerSrc[positionInSrc];
+    }
+  }
+
+  @Override
+  public int routeInputErrorEventToSource(int destTaskId, int failedInputId) throws Exception {
+    int chunkId = fromTaskId(numChunkPerSrc, destTaskId).getCombination().get(positionInSrc);
+    int itemId = failedInputId - getItemIdOffset(chunkId) + grouper.getFirstItemInGroup(chunkId);
+    return itemId / numPartition;
+  }
+
+  @Override
+  public EventRouteMetadata routeDataMovementEventToDestination(int srcTaskId, int srcOutputId,
+                                                                int destTaskId) throws Exception {
+    int itemId = srcTaskId * numPartition + srcOutputId;
+    int chunkId = fromTaskId(numChunkPerSrc, destTaskId).getCombination().get(positionInSrc);
+    if (grouper.isInGroup(itemId, chunkId)) {
+      int idx = itemId - grouper.getFirstItemInGroup(chunkId) + getItemIdOffset(chunkId);
+      return EventRouteMetadata.create(1, new int[] {idx});
+    }
+    return null;
+  }
+
+  @Nullable
+  @Override
+  public CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId,
+                                                                                  int destTaskId)
+    throws Exception {
+    int chunkId = fromTaskId(numChunkPerSrc, destTaskId).getCombination().get(positionInSrc);
+    int firstItemInChunk = grouper.getFirstItemInGroup(chunkId);
+    int lastItemInChunk = grouper.getLastItemInGroup(chunkId);
+    int firstItemInSrcTask =  srcTaskId * numPartition;
+    int lastItemInSrcTask = firstItemInSrcTask + numPartition - 1;
+    if (!(lastItemInChunk < firstItemInSrcTask || firstItemInChunk > lastItemInSrcTask)) {
+      int firstItem = Math.max(firstItemInChunk, firstItemInSrcTask);
+      int lastItem = Math.min(lastItemInChunk, lastItemInSrcTask);
+      return CompositeEventRouteMetadata.create(lastItem - firstItem + 1,
+        firstItem - firstItemInChunk + getItemIdOffset(chunkId), firstItem - firstItemInSrcTask);
+    }
+    return null;
+  }
+
+  /**
+   * #item from vertices before source vertex in the same vertex group
+   * @param chunkId
+   * @return
+   */
+  private int getItemIdOffset(int chunkId) {
+    int offset = 0;
+    for (int i = 0; i < positionInGroup; i++) {
+      grouperForComputeOffset.init(numTaskPerSrcVertexInGroup[i] * numPartition,
+        numChunkPerSrc[positionInSrc]);
+      offset += grouperForComputeOffset.getNumItemsInGroup(chunkId);
+    }
+    return offset;
+  }
+
+  @Nullable
+  @Override
+  public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(int srcTaskId,
+                                                                         int destTaskId)
+    throws Exception {
+    int chunkId = fromTaskId(numChunkPerSrc, destTaskId).getCombination().get(positionInSrc);
+    int firstItemInChunk = grouper.getFirstItemInGroup(chunkId);
+    int lastItemInChunk = grouper.getLastItemInGroup(chunkId);
+    int firstItemInSrcTask = srcTaskId * numPartition;
+    int lastItemInSrcTask = firstItemInSrcTask + numPartition - 1;
+    if (!(lastItemInChunk < firstItemInSrcTask || firstItemInChunk > lastItemInSrcTask)) {
+      int firstItem = Math.max(firstItemInChunk, firstItemInSrcTask);
+      int lastItem = Math.min(lastItemInChunk, lastItemInSrcTask);
+      int[] targetIndices = new int[lastItem - firstItem + 1];
+      for (int i = firstItem; i <= lastItem; i++) {
+        targetIndices[i - firstItem] = i - firstItemInChunk + getItemIdOffset(chunkId);
+      }
+      return EventRouteMetadata.create(targetIndices.length, targetIndices);
+    }
+    return null;
+  }
+
+  @Override
+  public int getNumDestinationTaskPhysicalInputs(int destTaskId) {
+    int chunkId = fromTaskId(numChunkPerSrc, destTaskId).getCombination().get(positionInSrc);
+    if (0 <= chunkId && chunkId < numChunkPerSrc[positionInSrc]) {
+      return grouper.getNumItemsInGroup(chunkId);
+    }
+    return 0;
+  }
+
+  @Override
+  public int getNumSourceTaskPhysicalOutputs(int srcTaskId) {
+    return numPartition;
+  }
+
+  @Override
+  public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
+    int numChunk = grouper.getGroupId(sourceTaskIndex * numPartition + numPartition - 1)
+      - grouper.getGroupId(sourceTaskIndex * numPartition) + 1;
+    return numDestConsumerPerChunk * numChunk;
+  }
+}
\ No newline at end of file


Mime
View raw message