tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhiyu...@apache.org
Subject [2/3] tez git commit: TEZ-3708. Improve parallelism and auto grouping of unpartitioned cartesian product (zhiyuany)
Date Thu, 11 May 2017 22:21:57 GMT
http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/FairCartesianProductVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/FairCartesianProductVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/FairCartesianProductVertexManager.java
new file mode 100644
index 0000000..a38e20d
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/FairCartesianProductVertexManager.java
@@ -0,0 +1,551 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.math.LongMath;
+import com.google.common.primitives.Ints;
+import com.google.protobuf.ByteString;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
+import org.apache.tez.runtime.library.utils.Grouper;
+import org.roaringbitmap.RoaringBitmap;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+import static org.apache.tez.dag.api.EdgeProperty.DataMovementType.CUSTOM;
+import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.CartesianProductConfigProto;
+
+/**
+ * In fair cartesian product case, we have one destination task for each source chunk combination.
+ * A source is a source vertex or a source vertex group. A chunk is a part of source output. The
+ * mapping from source chunk to destination task id is done by {@link <CartesianProductCombination>}
+ *
+ * This requires source output to be partitioned with a round robin partitioner, even data is
+ * unpartitioned intrinsically. By doing this, we can achieve arbitrary parallelism.
+ *
+ * It tries to distribute work evenly by having each task to perform similar number of cartesian
+ * product operations. To achieve this, it estimate #record from each source and total # ops.
+ *
+ * The parallelism is decided based on estimated total #ops and two configurations, max allowed
+ * parallelism and min-ops-per-worker. The max parallelism will be tried first and used if resulting
+ * #ops-per-worker is no less than min-ops-per-worker. Otherwise, parallelism will be total # ops
+ * divided by #ops-per-worker.
+ *
+ * To reduce shuffle overhead, we try to group output from same task first. A chunk from a source
+ * vertex contains continuous physical output from a task or its neighboring task.
+ *
+ * Vertex group is supported. Chunk i of a source group contains chunk i of every vertex in this
+ * group.
+ */
+class FairCartesianProductVertexManager extends CartesianProductVertexManagerReal {
+  /**
+   * a cartesian product source.
+   * Chunk i of a source contains chunk i of every vertex in this source
+   */
+  static class Source {
+    // list of source vertices of this source
+    List<SrcVertex> srcVertices = new ArrayList<>();
+    // position of this source in all sources
+    int position;
+    // name of source vertex or vertex group
+    String name;
+
+    // total number of chunks in this source
+    // each vertex in this source has same numChunk
+    int numChunk;
+    // total number of acknowledged output record (before reconfiguration)
+    // or estimated total number of output record (after reconfiguration)
+    long numRecord;
+
+    public String toString(boolean afterReconfigure) {
+      StringBuilder sb = new StringBuilder();
+      sb.append("Source at position ");
+      sb.append(position);
+      if (name != null) {
+        sb.append(", ");
+        sb.append("name ");
+        sb.append(name);
+
+      }
+      sb.append("num chunk ").append(numChunk);
+      sb.append(": {");
+      for (SrcVertex srcV : srcVertices) {
+        sb.append("[");
+        sb.append(srcV.toString(afterReconfigure));
+        sb.append("], ");
+      }
+      sb.deleteCharAt(sb.length() - 1);
+      sb.setCharAt(sb.length() - 1, '}');
+      return sb.toString();
+    }
+
+    // estimate total number of output record from all vertices in this group
+    public long estimateNumRecord() {
+      long estimation = 0;
+      for (SrcVertex srcV : srcVertices) {
+        estimation += srcV.estimateNumRecord();
+      }
+      return estimation;
+    }
+
+    private boolean isChunkCompleted(int chunkId) {
+      // a chunk is completed only if its corresponding chunk in each vertex is completed
+      for (SrcVertex srcV : srcVertices) {
+        if (!srcV.isChunkCompleted(chunkId)) {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    public int getNumTask() {
+      int numTask = 0;
+      for (SrcVertex srcV : srcVertices) {
+        numTask += srcV.numTask;
+      }
+      return numTask;
+    }
+
+    public SrcVertex getSrcVertexWithMostOutput() {
+      SrcVertex srcVWithMaxOutput = null;
+      for (SrcVertex srcV : srcVertices) {
+        if (srcVWithMaxOutput == null || srcV.numRecord > srcVWithMaxOutput.numRecord) {
+          srcVWithMaxOutput = srcV;
+        }
+      }
+      return srcVWithMaxOutput;
+    }
+  }
+
+  /**
+   * a cartesian product source vertex
+   */
+  class SrcVertex {
+    // which source this vertex belongs to
+    Source source;
+    // vertex name
+    String name;
+    int numTask;
+
+    RoaringBitmap taskCompleted = new RoaringBitmap();
+    RoaringBitmap taskWithVMEvent = new RoaringBitmap();
+    // total number of acknowledged output record (before reconfiguration)
+    // or estimated total number of output record (after reconfiguration)
+    long numRecord;
+
+    public String toString(boolean afterReconfigure) {
+      StringBuilder sb = new StringBuilder();
+      sb.append("vertex ").append(name).append(", ");
+      if (afterReconfigure) {
+        sb.append("estimated # output records ").append(numRecord).append(", ");
+        sb.append("# chunks ").append(source.numChunk);
+      } else {
+        sb.append(numTask).append(" tasks, ");
+        sb.append(taskWithVMEvent.getCardinality()).append(" VMEvents, ");
+        sb.append("numRecord ").append(numRecord);
+      }
+      return sb.toString();
+    }
+
+    public long estimateNumRecord() {
+      if (taskWithVMEvent.isEmpty()) {
+        return 0;
+      } else {
+        return numRecord * numTask / taskWithVMEvent.getCardinality();
+      }
+    }
+
+    public boolean isChunkCompleted(int chunkId) {
+      grouper.init(numTask * numPartitions, source.numChunk);
+      int firstRelevantTask = grouper.getFirstItemInGroup(chunkId) / maxParallelism;
+      int lastRelevantTask = grouper.getLastItemInGroup(chunkId) / maxParallelism;
+      for (int relevantTask = firstRelevantTask; relevantTask <= lastRelevantTask; relevantTask++) {
+        if (!taskCompleted.contains(relevantTask)) {
+          return false;
+        }
+      }
+      return true;
+    }
+  }
+
+  private static final Logger LOG =
+    org.slf4j.LoggerFactory.getLogger(FairCartesianProductVertexManager.class);
+
+  private CartesianProductConfigProto config;
+  private List<String> sourceList;
+  private Map<String, Source> sourcesByName = new HashMap<>();
+  private Map<String, SrcVertex> srcVerticesByName = new HashMap<>();
+  private boolean enableGrouping;
+  private int maxParallelism;
+  private int numPartitions;
+  private long minOpsPerWorker;
+
+  private long minNumRecordForEstimation;
+  private boolean vertexReconfigured = false;
+  private boolean vertexStarted = false;
+  private boolean vertexStartSchedule = false;
+  private int numCPSrcNotInConfigureState = 0;
+  private int numBroadcastSrcNotInRunningState = 0;
+  private Queue<TaskAttemptIdentifier> completedSrcTaskToProcess = new LinkedList<>();
+  private RoaringBitmap scheduledTasks = new RoaringBitmap();
+
+  private int parallelism;
+
+  /* auto reduce related */
+  // num of chunks of source at the corresponding position in source list
+  private int[] numChunksPerSrc;
+  private Grouper grouper = new Grouper();
+
+  public FairCartesianProductVertexManager(VertexManagerPluginContext context) {
+    super(context);
+  }
+
+  @Override
+  public void initialize(CartesianProductConfigProto config) throws Exception {
+    this.config = config;
+    maxParallelism = config.hasMaxParallelism() ? config.getMaxParallelism()
+      :CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_MAX_PARALLELISM_DEFAULT;
+    enableGrouping = config.hasEnableGrouping() ? config.getEnableGrouping()
+      :CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_ENABLE_GROUPING_DEFAULT;
+    minOpsPerWorker = config.hasMinOpsPerWorker() ? config.getMinOpsPerWorker()
+      : CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_MIN_OPS_PER_WORKER_DEFAULT;
+    sourceList = config.getSourcesList();
+    if (config.hasNumPartitionsForFairCase()) {
+      numPartitions = config.getNumPartitionsForFairCase();
+    } else {
+      numPartitions = (int) Math.pow(maxParallelism, 1.0 / sourceList.size());
+    }
+
+    for (Map.Entry<String, EdgeProperty> e : getContext().getInputVertexEdgeProperties().entrySet()) {
+      if (e.getValue().getDataMovementType() == CUSTOM
+        && e.getValue().getEdgeManagerDescriptor().getClassName()
+        .equals(CartesianProductEdgeManager.class.getName())) {
+        srcVerticesByName.put(e.getKey(), new SrcVertex());
+        srcVerticesByName.get(e.getKey()).name = e.getKey();
+        getContext().registerForVertexStateUpdates(e.getKey(), EnumSet.of(VertexState.CONFIGURED));
+        numCPSrcNotInConfigureState++;
+      } else {
+        getContext().registerForVertexStateUpdates(e.getKey(), EnumSet.of(VertexState.RUNNING));
+        numBroadcastSrcNotInRunningState++;
+      }
+    }
+
+    Map<String, List<String>> srcGroups = getContext().getInputVertexGroups();
+    for (int i = 0; i < sourceList.size(); i++) {
+      String srcName = sourceList.get(i);
+      Source source = new Source();
+      source.position = i;
+      if (srcGroups.containsKey(srcName)) {
+        source.name = srcName;
+        for (String srcVName : srcGroups.get(srcName)) {
+          source.srcVertices.add(srcVerticesByName.get(srcVName));
+          srcVerticesByName.get(srcVName).source = source;
+        }
+      } else {
+        source.srcVertices.add(srcVerticesByName.get(srcName));
+        srcVerticesByName.get(srcName).source = source;
+      }
+      sourcesByName.put(srcName, source);
+    }
+
+    minNumRecordForEstimation =
+      (long) Math.pow(minOpsPerWorker * maxParallelism, 1.0 / sourceList.size());
+
+    numChunksPerSrc = new int[sourcesByName.size()];
+    getContext().vertexReconfigurationPlanned();
+  }
+
+  @Override
+  public synchronized void onVertexStarted(List<TaskAttemptIdentifier> completions)
+    throws Exception {
+    vertexStarted = true;
+    if (completions != null) {
+      for (TaskAttemptIdentifier attempt : completions) {
+        addCompletedSrcTaskToProcess(attempt);
+      }
+    }
+    tryScheduleTasks();
+  }
+
+  @Override
+  public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws IOException {
+    String vertex = stateUpdate.getVertexName();
+    VertexState state = stateUpdate.getVertexState();
+
+    if (state == VertexState.CONFIGURED) {
+      srcVerticesByName.get(vertex).numTask = getContext().getVertexNumTasks(vertex);
+      numCPSrcNotInConfigureState--;
+    } else if (state == VertexState.RUNNING) {
+      numBroadcastSrcNotInRunningState--;
+    }
+    tryScheduleTasks();
+  }
+
+  @Override
+  public synchronized void onSourceTaskCompleted(TaskAttemptIdentifier attempt) throws Exception {
+    addCompletedSrcTaskToProcess(attempt);
+    tryScheduleTasks();
+  }
+
+  private void addCompletedSrcTaskToProcess(TaskAttemptIdentifier attempt) {
+    int taskId = attempt.getTaskIdentifier().getIdentifier();
+    String vertex = attempt.getTaskIdentifier().getVertexIdentifier().getName();
+    SrcVertex srcV = srcVerticesByName.get(vertex);
+    if (srcV != null && !srcV.taskCompleted.contains(taskId)) {
+      srcV.taskCompleted.add(taskId);
+      completedSrcTaskToProcess.add(attempt);
+    }
+  }
+
+  private boolean tryStartSchedule() {
+    vertexStartSchedule =
+      (vertexReconfigured && vertexStarted && numBroadcastSrcNotInRunningState == 0);
+    return vertexStartSchedule;
+  }
+
+  @Override
+  public synchronized void onVertexManagerEventReceived(VertexManagerEvent vmEvent)
+    throws IOException {
+    /* vmEvent after reconfigure doesn't matter */
+    if (vertexReconfigured) {
+      return;
+    }
+
+    if (vmEvent.getUserPayload() != null) {
+      String srcVertex =
+        vmEvent.getProducerAttemptIdentifier().getTaskIdentifier().getVertexIdentifier().getName();
+      SrcVertex srcV = srcVerticesByName.get(srcVertex);
+
+      // vmEvent from non-cp vertex doesn't matter
+      if (srcV == null) {
+        return;
+      }
+
+      VertexManagerEventPayloadProto proto =
+        VertexManagerEventPayloadProto.parseFrom(ByteString.copyFrom(vmEvent.getUserPayload()));
+      srcV.numRecord += proto.getNumRecord();
+      srcV.taskWithVMEvent.add(
+        vmEvent.getProducerAttemptIdentifier().getTaskIdentifier().getIdentifier());
+    }
+
+    tryScheduleTasks();
+  }
+
+  private void reconfigureWithZeroTask() {
+    getContext().reconfigureVertex(0, null, null);
+    vertexReconfigured = true;
+    getContext().doneReconfiguringVertex();
+  }
+
+  private boolean tryReconfigure() throws IOException {
+    if (numCPSrcNotInConfigureState > 0) {
+      return false;
+    }
+
+    for (Source src : sourcesByName.values()) {
+      if (src.getNumTask() == 0) {
+        parallelism = 0;
+        reconfigureWithZeroTask();
+        return true;
+      }
+    }
+
+    if (config.hasGroupingFraction() && config.getGroupingFraction() > 0) {
+      // every src vertex must complete a certain number of task before we do estimation
+      for (SrcVertex srcV : srcVerticesByName.values()) {
+        if (srcV.taskCompleted.getCardinality() < srcV.numTask
+          && (srcV.numTask * config.getGroupingFraction() > srcV.taskCompleted.getCardinality()
+            || srcV.numRecord == 0)) {
+          return false;
+        }
+      }
+    } else {
+      // every src vertex must generate enough output records before we do estimation
+      // or all its tasks already finish but we cannot get enough result for estimation
+      for (SrcVertex srcV : srcVerticesByName.values()) {
+        if (srcV.numRecord < minNumRecordForEstimation
+          && srcV.taskWithVMEvent.getCardinality() < srcV.numTask) {
+          return false;
+        }
+      }
+    }
+
+    LOG.info("Start reconfigure, "
+      + ", max parallelism: " + maxParallelism
+      + ", min-ops-per-worker: " + minOpsPerWorker);
+    for (Source src : sourcesByName.values()) {
+      LOG.info(src.toString(false));
+    }
+
+    long totalOps = 1;
+    for (Source src : sourcesByName.values()) {
+      src.numRecord = src.estimateNumRecord();
+      if (src.numRecord == 0) {
+        reconfigureWithZeroTask();
+        return true;
+      }
+
+      try {
+        totalOps  = LongMath.checkedMultiply(totalOps, src.numRecord);
+      } catch (ArithmeticException e) {
+        totalOps = Long.MAX_VALUE;
+      }
+    }
+
+    // determine initial parallelism
+    if (totalOps / minOpsPerWorker >= maxParallelism) {
+      parallelism = maxParallelism;
+    } else {
+      parallelism = (int) ((totalOps + minOpsPerWorker - 1) / minOpsPerWorker);
+    }
+
+    // determine num chunk for each source by weighted factorization of initial parallelism
+    // final parallelism will be product of all #chunk
+    double k = Math.log10(parallelism);
+    for (Source src : sourcesByName.values()) {
+      k -= Math.log10(src.numRecord);
+    }
+    k = Math.pow(10, k / sourcesByName.size());
+
+    parallelism = 1;
+    for (Source src : sourcesByName.values()) {
+      if (enableGrouping) {
+        src.numChunk = Math.min(src.getSrcVertexWithMostOutput().numTask * numPartitions,
+          Math.max(1, (int) (src.numRecord * k)));
+      } else {
+        src.numChunk = src.getSrcVertexWithMostOutput().numTask;
+      }
+      parallelism *= src.numChunk;
+    }
+
+    LOG.info("After reconfigure, ");
+    for (Source src : sourcesByName.values()) {
+      LOG.info(src.toString(false));
+    }
+    LOG.info("Final parallelism: " + parallelism);
+
+    for (int i = 0; i < numChunksPerSrc.length; i++) {
+      numChunksPerSrc[i] = sourcesByName.get(sourceList.get(i)).numChunk;
+    }
+
+    CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder(config);
+    builder.addAllNumChunks(Ints.asList(this.numChunksPerSrc));
+
+    Map<String, EdgeProperty> edgeProperties = getContext().getInputVertexEdgeProperties();
+    Iterator<Map.Entry<String, EdgeProperty>> iter = edgeProperties.entrySet().iterator();
+    while (iter.hasNext()) {
+      Map.Entry<String, EdgeProperty> e = iter.next();
+      if (e.getValue().getDataMovementType() != CUSTOM) {
+        iter.remove();
+      }
+    }
+
+    // send out vertex group info for computing physical input id of destination task
+    for (Source src : sourcesByName.values()) {
+      builder.clearNumTaskPerVertexInGroup();
+      for (int i = 0; i < src.srcVertices.size(); i++) {
+        SrcVertex srcV = src.srcVertices.get(i);
+        builder.setPositionInGroup(i);
+        edgeProperties.get(srcV.name).getEdgeManagerDescriptor()
+          .setUserPayload(UserPayload.create(ByteBuffer.wrap(builder.build().toByteArray())));
+        builder.addNumTaskPerVertexInGroup(srcV.numTask);
+      }
+    }
+    getContext().reconfigureVertex(parallelism, null, edgeProperties);
+    vertexReconfigured = true;
+    getContext().doneReconfiguringVertex();
+    return true;
+  }
+
+  private void tryScheduleTasks() throws IOException {
+    if (!vertexReconfigured && !tryReconfigure()) {
+      return;
+    }
+    if (!vertexStartSchedule && !tryStartSchedule()) {
+      return;
+    }
+
+    while (!completedSrcTaskToProcess.isEmpty()) {
+      scheduleTasksDependOnCompletion(completedSrcTaskToProcess.poll());
+    }
+  }
+
+  private void scheduleTasksDependOnCompletion(TaskAttemptIdentifier attempt) {
+    if (parallelism == 0) {
+      return;
+    }
+
+    int taskId = attempt.getTaskIdentifier().getIdentifier();
+    String vertex = attempt.getTaskIdentifier().getVertexIdentifier().getName();
+    SrcVertex srcV = srcVerticesByName.get(vertex);
+    Source src = srcV.source;
+
+    List<ScheduleTaskRequest> requests = new ArrayList<>();
+    CartesianProductCombination combination =
+      new CartesianProductCombination(numChunksPerSrc, src.position);
+
+    grouper.init(srcV.numTask * maxParallelism, src.numChunk);
+    int firstRelevantChunk = grouper.getGroupId(taskId * maxParallelism);
+    int lastRelevantChunk = grouper.getGroupId(taskId * maxParallelism + maxParallelism - 1);
+    for (int chunkId = firstRelevantChunk; chunkId <= lastRelevantChunk; chunkId++) {
+      combination.firstTaskWithFixedChunk(chunkId);
+      do {
+        List<Integer> list = combination.getCombination();
+
+        if (scheduledTasks.contains(combination.getTaskId())) {
+          continue;
+        }
+
+        // a task is ready for schedule only if all its src chunk has been completed
+        boolean readyToSchedule = src.isChunkCompleted(list.get(src.position));
+        for (int srcId = 0; readyToSchedule && srcId < list.size(); srcId++) {
+          if (srcId != src.position){
+            readyToSchedule =
+              sourcesByName.get(sourceList.get(srcId)).isChunkCompleted(list.get(srcId));
+          }
+        }
+
+        if (readyToSchedule) {
+          requests.add(ScheduleTaskRequest.create(combination.getTaskId(), null));
+          scheduledTasks.add(combination.getTaskId());
+        }
+      } while (combination.nextTaskWithFixedChunk());
+    }
+
+    if (!requests.isEmpty()) {
+      getContext().scheduleTasks(requests);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/partitioner/RoundRobinPartitioner.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/partitioner/RoundRobinPartitioner.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/partitioner/RoundRobinPartitioner.java
new file mode 100644
index 0000000..3c3e41a
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/partitioner/RoundRobinPartitioner.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.partitioner;
+
+import org.apache.tez.runtime.library.api.Partitioner;
+
+public class RoundRobinPartitioner implements Partitioner {
+    private int x = 0;
+
+    @Override
+    public int getPartition(Object key, Object value, int numPartitions) {
+      x = x % numPartitions;
+      return (x++) % numPartitions;
+    }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/Grouper.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/Grouper.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/Grouper.java
index 73a8c87..b99f3d4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/Grouper.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/Grouper.java
@@ -20,70 +20,70 @@ package org.apache.tez.runtime.library.utils;
 import com.google.common.base.Preconditions;
 
 /**
- * This grouper group specified number of tasks into specified number of groups.
+ * This grouper group specified number of items into specified number of groups.
  *
- * If numTask%numGroup is zero, every group has numTask/numGroup tasks.
- * Otherwise, every group will get numTask/numGroup tasks first, and remaining tasks will be
- * distributed in last numTask-numTask%numGroup*numGroup groups (one task for each group).
- * For example, if we group 8 tasks into 3 groups, each group get {2, 3, 3} tasks.
+ * If numItem%numGroup is zero, every group has numItem/numGroup items.
+ * Otherwise, every group will get numItem/numGroup items first, and remaining items will be
+ * distributed in last numItem-numItem%numGroup*numGroup groups (one item for each group).
+ * For example, if we group 8 items into 3 groups, each group get {2, 3, 3} items.
  */
 public class Grouper {
   private int numGroup;
-  private int numTask;
+  private int numItem;
   private int numGroup1;
-  private int taskPerGroup1;
+  private int itemPerGroup1;
   private int numGroup2;
-  private int taskPerGroup2;
+  private int itemPerGroup2;
 
-  public Grouper init(int numTask, int numGroup) {
+  public Grouper init(int numItem, int numGroup) {
     Preconditions.checkArgument(numGroup > 0,
       "Number of groups is " + numGroup + ". Should be positive");
-    Preconditions.checkArgument(numTask > 0,
-      "Number of tasks is " + numTask + ". Should be positive");
-    Preconditions.checkArgument(numTask >= numGroup,
-      "Num of groups + " + numGroup + " shouldn't be more than number of tasks " + numTask);
-    this.numTask = numTask;
+    Preconditions.checkArgument(numItem > 0,
+      "Number of items is " + numItem + ". Should be positive");
+    Preconditions.checkArgument(numItem >= numGroup,
+      "Num of groups + " + numGroup + " shouldn't be more than number of items " + numItem);
+    this.numItem = numItem;
     this.numGroup = numGroup;
-    this.taskPerGroup1 = numTask / numGroup;
-    this.taskPerGroup2 = taskPerGroup1 + 1;
-    this.numGroup2 = numTask % numGroup;
+    this.itemPerGroup1 = numItem / numGroup;
+    this.itemPerGroup2 = itemPerGroup1 + 1;
+    this.numGroup2 = numItem % numGroup;
     this.numGroup1 = numGroup - numGroup2;
 
     return this;
   }
 
-  public int getFirstTaskInGroup(int groupId) {
+  public int getFirstItemInGroup(int groupId) {
     Preconditions.checkArgument(0 <= groupId && groupId < numGroup, "Invalid groupId " + groupId);
     if (groupId < numGroup1) {
-      return groupId * taskPerGroup1;
+      return groupId * itemPerGroup1;
     } else {
-      return groupId * taskPerGroup1 + (groupId - numGroup1);
+      return groupId * itemPerGroup1 + (groupId - numGroup1);
     }
   }
 
-  public int getNumTasksInGroup(int groupId) {
+  public int getNumItemsInGroup(int groupId) {
     Preconditions.checkArgument(0 <= groupId && groupId < numGroup, "Invalid groupId" + groupId);
-    return groupId < numGroup1 ? taskPerGroup1 : taskPerGroup2;
+    return groupId < numGroup1 ? itemPerGroup1 : itemPerGroup2;
   }
 
-  public int getLastTaskInGroup(int groupId) {
+  public int getLastItemInGroup(int groupId) {
     Preconditions.checkArgument(0 <= groupId && groupId < numGroup, "Invalid groupId" + groupId);
-    return getFirstTaskInGroup(groupId) + getNumTasksInGroup(groupId) - 1;
+    return getFirstItemInGroup(groupId) + getNumItemsInGroup(groupId) - 1;
   }
 
-  public int getGroupId(int taskId) {
-    Preconditions.checkArgument(0 <= taskId && taskId < numTask, "Invalid taskId" + taskId);
-    if (taskId < taskPerGroup1 * numGroup1) {
-      return taskId/taskPerGroup1;
+  public int getGroupId(int itemId) {
+    Preconditions.checkArgument(0 <= itemId && itemId < numItem, "Invalid itemId" + itemId);
+    if (itemId < itemPerGroup1 * numGroup1) {
+      return itemId/ itemPerGroup1;
     } else {
-      return numGroup1 + (taskId - taskPerGroup1 * numGroup1) / taskPerGroup2;
+      return numGroup1 + (itemId - itemPerGroup1 * numGroup1) / itemPerGroup2;
     }
   }
 
-  public boolean isInGroup(int taskId, int groupId) {
+  public boolean isInGroup(int itemId, int groupId) {
     Preconditions.checkArgument(0 <= groupId && groupId < numGroup, "Invalid groupId" + groupId);
-    Preconditions.checkArgument(0 <= taskId && taskId < numTask, "Invalid taskId" + taskId);
-    return getFirstTaskInGroup(groupId) <= taskId
-      && taskId < getFirstTaskInGroup(groupId) + getNumTasksInGroup(groupId);
+    Preconditions.checkArgument(0 <= itemId && itemId < numItem, "Invalid itemId" + itemId);
+    return getFirstItemInGroup(groupId) <= itemId
+      && itemId < getFirstItemInGroup(groupId) + getNumItemsInGroup(groupId);
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/main/proto/CartesianProductPayload.proto
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/proto/CartesianProductPayload.proto b/tez-runtime-library/src/main/proto/CartesianProductPayload.proto
index cb503ea..ae65502 100644
--- a/tez-runtime-library/src/main/proto/CartesianProductPayload.proto
+++ b/tez-runtime-library/src/main/proto/CartesianProductPayload.proto
@@ -27,9 +27,12 @@ message CartesianProductConfigProto {
     optional bytes filterUserPayload = 5;
     optional float minFraction = 6;
     optional float maxFraction = 7;
-    optional bool enableAutoGrouping = 8;
-    optional int64 desiredBytesPerChunk = 9;
+    optional int32 maxParallelism = 8;
+    optional int64 minOpsPerWorker = 9;
     repeated int32 numChunks = 10;
-    optional int32 numChunk = 11;
-    optional int32 chunkIdOffset = 12;
+    repeated int32 numTaskPerVertexInGroup = 11;
+    optional int32 positionInGroup = 12;
+    optional int32 numPartitionsForFairCase = 13;
+    optional bool enableGrouping = 14;
+    optional float groupingFraction = 15;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductCombination.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductCombination.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductCombination.java
index 3755ac8..4193aec 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductCombination.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductCombination.java
@@ -30,7 +30,7 @@ import static org.junit.Assert.assertTrue;
 public class TestCartesianProductCombination {
   private void verifyCombination(CartesianProductCombination combination, int[] result, int taskId) {
     assertArrayEquals(result, Ints.toArray(combination.getCombination()));
-    assertEquals(taskId, combination.getChunkId());
+    assertEquals(taskId, combination.getTaskId());
   }
 
   private void testCombinationTwoWayVertex0() {

http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductConfig.java
index 4857749..3b4200a 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductConfig.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductConfig.java
@@ -36,7 +36,6 @@ import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 
 public class TestCartesianProductConfig {
   private TezConfiguration conf;
@@ -66,7 +65,7 @@ public class TestCartesianProductConfig {
   }
 
   @Test(timeout = 5000)
-  public void testSerializationUnpartitioned() throws Exception {
+  public void testSerializationFair() throws Exception {
     List<String> sourceVertices = new ArrayList<>();
     sourceVertices.add("v1");
     sourceVertices.add("v2");
@@ -77,7 +76,7 @@ public class TestCartesianProductConfig {
     CartesianProductConfig parsedConfig = CartesianProductConfig.fromUserPayload(payload);
     assertConfigEquals(config, parsedConfig);
 
-    // unpartitioned config should have null in numPartitions fields
+    // fair cartesian product config should have null in numPartitions fields
     try {
       config = new CartesianProductConfig(false, new int[]{}, new String[]{"v0","v1"},null);
       config.checkNumPartitions();
@@ -113,24 +112,34 @@ public class TestCartesianProductConfig {
   }
 
   @Test(timeout = 5000)
-  public void testAutoGroupingConfig() {
+  public void testFairCartesianProductConfig() {
     List<String> sourceVertices = new ArrayList<>();
     sourceVertices.add("v0");
     sourceVertices.add("v1");
     CartesianProductConfig config = new CartesianProductConfig(sourceVertices);
 
-    // auto grouping conf not set
+    // conf not set
     CartesianProductConfigProto proto = config.toProto(conf);
-    assertFalse(proto.hasEnableAutoGrouping());
-    assertFalse(proto.hasDesiredBytesPerChunk());
+    assertEquals(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_MAX_PARALLELISM_DEFAULT,
+      proto.getMaxParallelism());
+    assertEquals(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_MIN_OPS_PER_WORKER_DEFAULT,
+      proto.getMinOpsPerWorker());
+    assertEquals(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_ENABLE_GROUPING_DEFAULT,
+      proto.getEnableGrouping());
+    assertFalse(proto.hasNumPartitionsForFairCase());
+    assertFalse(proto.hasGroupingFraction());
 
-    // auto groupinig conf not set
-    conf.setBoolean(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_ENABLE_AUTO_GROUPING, true);
-    conf.setLong(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_DESIRED_BYTES_PER_GROUP, 1000);
+    // conf set
+    conf.setInt(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_MAX_PARALLELISM, 1000);
+    conf.setLong(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_MIN_OPS_PER_WORKER, 1000000);
+    conf.setBoolean(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_ENABLE_GROUPING, false);
+    conf.setFloat(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_GROUPING_FRACTION, 0.75f);
+    conf.setInt(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_NUM_PARTITIONS, 25);
     proto = config.toProto(conf);
-    assertTrue(proto.hasEnableAutoGrouping());
-    assertTrue(proto.hasDesiredBytesPerChunk());
-    assertEquals(true, proto.getEnableAutoGrouping());
-    assertEquals(1000, proto.getDesiredBytesPerChunk());
+    assertEquals(1000, proto.getMaxParallelism());
+    assertEquals(1000000, proto.getMinOpsPerWorker());
+    assertFalse(proto.getEnableGrouping());
+    assertEquals(0.75f, proto.getGroupingFraction(), 0.01);
+    assertEquals(25, proto.getNumPartitionsForFairCase());
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManager.java
index d722932..58f460f 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManager.java
@@ -41,7 +41,8 @@ public class TestCartesianProductEdgeManager {
     CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder();
     builder.setIsPartitioned(true)
       .addAllSources(Arrays.asList("v0", "v1"))
-      .addAllNumPartitions(Ints.asList(2,3));
+      .addAllNumPartitions(Ints.asList(2,3))
+      .setMaxParallelism(100).setMinOpsPerWorker(1);
     UserPayload payload = UserPayload.create(ByteBuffer.wrap(builder.build().toByteArray()));
     when(context.getUserPayload()).thenReturn(payload);
     edgeManager.initialize();
@@ -52,12 +53,13 @@ public class TestCartesianProductEdgeManager {
     builder.clear();
     builder.setIsPartitioned(false)
       .addAllSources(Arrays.asList("v0", "v1"))
-      .addAllNumChunks(Ints.asList(2,3));
+      .addAllNumChunks(Ints.asList(2,3))
+      .setMaxParallelism(100).setMinOpsPerWorker(1);
     payload = UserPayload.create(ByteBuffer.wrap(builder.build().toByteArray()));
     when(context.getUserPayload()).thenReturn(payload);
     when(context.getSourceVertexNumTasks()).thenReturn(2);
     edgeManager.initialize();
     assertTrue(edgeManager.getEdgeManagerReal()
-      instanceof CartesianProductEdgeManagerUnpartitioned);
+      instanceof FairCartesianProductEdgeManager);
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerConfig.java
deleted file mode 100644
index 3ba6aad..0000000
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerConfig.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.runtime.library.cartesianproduct;
-
-import com.google.common.primitives.Ints;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.CartesianProductConfigProto;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-public class TestCartesianProductEdgeManagerConfig {
-  @Test(timeout = 5000)
-  public void testUnpartitionedAutoGroupingConfig() throws IOException {
-    List<String> sourceVertices = new ArrayList<>();
-    sourceVertices.add("v0");
-    sourceVertices.add("v1");
-    int[] numChunkPerSrc = new int[] {2, 3};
-    int numGroup = 3, chunkIdOffset = 0;
-
-    CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder();
-    builder.setIsPartitioned(false).addAllNumChunks(Ints.asList(numChunkPerSrc))
-      .addAllSources(sourceVertices).setNumChunk(numGroup).setChunkIdOffset(chunkIdOffset);
-    UserPayload payload = UserPayload.create(ByteBuffer.wrap(builder.build().toByteArray()));
-
-    CartesianProductEdgeManagerConfig config =
-      CartesianProductEdgeManagerConfig.fromUserPayload(payload);
-    assertArrayEquals(numChunkPerSrc, config.numChunksPerSrc);
-    assertEquals(numGroup, config.numChunk);
-    assertEquals(chunkIdOffset, config.chunkIdOffset);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java
index b586de6..462760f 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java
@@ -17,6 +17,7 @@
  */
 package org.apache.tez.runtime.library.cartesianproduct;
 
+import com.google.protobuf.ByteString;
 import org.apache.tez.dag.api.EdgeManagerPluginContext;
 import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata;
 import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.CompositeEventRouteMetadata;
@@ -27,6 +28,7 @@ import org.junit.Test;
 import java.nio.ByteBuffer;
 import java.util.Map;
 
+import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.*;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -50,19 +52,22 @@ public class TestCartesianProductEdgeManagerPartitioned {
    */
   @Test(timeout = 5000)
   public void testTwoWay() throws Exception {
-    CartesianProductEdgeManagerConfig emConfig = new CartesianProductEdgeManagerConfig(true,
-      new String[]{"v0","v1"}, new int[]{3,4}, null, 0, 0, null);
+    CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder();
+    builder.setIsPartitioned(true).addSources("v0").addSources("v1")
+      .addNumPartitions(3).addNumPartitions(4);
     when(mockContext.getDestinationVertexNumTasks()).thenReturn(12);
-    testTwoWayV0(emConfig);
-    testTwoWayV1(emConfig);
+    CartesianProductConfigProto config = builder.build();
+    testTwoWayV0(config);
+    testTwoWayV1(config);
   }
 
-  private void testTwoWayV0(CartesianProductEdgeManagerConfig config) throws Exception {
+  private void testTwoWayV0(CartesianProductConfigProto config) throws Exception {
     when(mockContext.getSourceVertexName()).thenReturn("v0");
     when(mockContext.getSourceVertexNumTasks()).thenReturn(2);
     edgeManager.initialize(config);
 
-    CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+    CompositeEventRouteMetadata compositeRoutingData =
+      edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
     assertNotNull(compositeRoutingData);
     assertEquals(1, compositeRoutingData.getCount());
     assertEquals(0, compositeRoutingData.getSource());
@@ -88,12 +93,13 @@ public class TestCartesianProductEdgeManagerPartitioned {
     assertEquals(3, edgeManager.getNumSourceTaskPhysicalOutputs(2));
   }
 
-  private void testTwoWayV1(CartesianProductEdgeManagerConfig config) throws Exception {
+  private void testTwoWayV1(CartesianProductConfigProto config) throws Exception {
     when(mockContext.getSourceVertexName()).thenReturn("v1");
     when(mockContext.getSourceVertexNumTasks()).thenReturn(3);
     edgeManager.initialize(config);
 
-    CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+    CompositeEventRouteMetadata compositeRoutingData =
+      edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
     assertNotNull(compositeRoutingData);
     assertEquals(1, compositeRoutingData.getCount());
     assertEquals(1, compositeRoutingData.getSource());
@@ -138,25 +144,25 @@ public class TestCartesianProductEdgeManagerPartitioned {
    */
   @Test(timeout = 5000)
   public void testTwoWayWithFilter() throws Exception {
-    ByteBuffer buffer = ByteBuffer.allocate(2);
-    buffer.putChar('>');
+    CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder();
+    ByteBuffer buffer = ByteBuffer.allocate(2).putChar('>');
     buffer.flip();
-    CartesianProductFilterDescriptor filterDescriptor =
-      new CartesianProductFilterDescriptor(TestFilter.class.getName())
-        .setUserPayload(UserPayload.create(buffer));
-    CartesianProductEdgeManagerConfig emConfig = new CartesianProductEdgeManagerConfig(true,
-      new String[]{"v0","v1"}, new int[]{3,4}, null, 0, 0, filterDescriptor);
+    builder.setIsPartitioned(true).addSources("v0").addSources("v1")
+      .addNumPartitions(3).addNumPartitions(4).setFilterClassName(TestFilter.class.getName())
+      .setFilterUserPayload(ByteString.copyFrom(buffer));
+    CartesianProductConfigProto config = builder.build();
     when(mockContext.getDestinationVertexNumTasks()).thenReturn(3);
-    testTwoWayV0WithFilter(emConfig);
-    testTwoWayV1WithFilter(emConfig);
+    testTwoWayV0WithFilter(config);
+    testTwoWayV1WithFilter(config);
   }
 
-  private void testTwoWayV0WithFilter(CartesianProductEdgeManagerConfig config) throws Exception {
+  private void testTwoWayV0WithFilter(CartesianProductConfigProto config) throws Exception {
     when(mockContext.getSourceVertexName()).thenReturn("v0");
     when(mockContext.getSourceVertexNumTasks()).thenReturn(2);
     edgeManager.initialize(config);
 
-    CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+    CompositeEventRouteMetadata compositeRoutingData =
+      edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
     assertNotNull(compositeRoutingData);
     assertEquals(1, compositeRoutingData.getCount());
     assertEquals(2, compositeRoutingData.getSource());
@@ -174,12 +180,13 @@ public class TestCartesianProductEdgeManagerPartitioned {
     assertEquals(3, edgeManager.getNumSourceTaskPhysicalOutputs(2));
   }
 
-  private void testTwoWayV1WithFilter(CartesianProductEdgeManagerConfig config) throws Exception {
+  private void testTwoWayV1WithFilter(CartesianProductConfigProto config) throws Exception {
     when(mockContext.getSourceVertexName()).thenReturn("v1");
     when(mockContext.getSourceVertexNumTasks()).thenReturn(3);
     edgeManager.initialize(config);
 
-    CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+    CompositeEventRouteMetadata compositeRoutingData =
+      edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
     assertNotNull(compositeRoutingData);
     assertEquals(1, compositeRoutingData.getCount());
     assertEquals(0, compositeRoutingData.getSource());
@@ -204,21 +211,25 @@ public class TestCartesianProductEdgeManagerPartitioned {
    */
   @Test(timeout = 5000)
   public void testThreeWay() throws Exception {
-    CartesianProductEdgeManagerConfig emConfig = new CartesianProductEdgeManagerConfig(true,
-      new String[]{"v0","v1","v2"}, new int[]{4,3,2}, null, 0, 0, null);
+    CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder();
+    builder.setIsPartitioned(true).addSources("v0").addSources("v1").addSources("v2")
+      .addNumPartitions(4).addNumPartitions(3).addNumPartitions(2);
+    CartesianProductConfigProto config = builder.build();
+
     when(mockContext.getDestinationVertexNumTasks()).thenReturn(24);
-    testThreeWayV0(emConfig);
-    testThreeWayV1(emConfig);
-    testThreeWayV2(emConfig);
+    testThreeWayV0(config);
+    testThreeWayV1(config);
+    testThreeWayV2(config);
   }
 
-  private void testThreeWayV0(CartesianProductEdgeManagerConfig config) throws Exception {
+  private void testThreeWayV0(CartesianProductConfigProto config) throws Exception {
     when(mockContext.getSourceVertexName()).thenReturn("v0");
 
     when(mockContext.getSourceVertexNumTasks()).thenReturn(2);
     edgeManager.initialize(config);
 
-    CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+    CompositeEventRouteMetadata compositeRoutingData =
+      edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
     assertNotNull(compositeRoutingData);
     assertEquals(1, compositeRoutingData.getCount());
     assertEquals(0, compositeRoutingData.getSource());
@@ -236,12 +247,13 @@ public class TestCartesianProductEdgeManagerPartitioned {
     assertEquals(4, edgeManager.getNumSourceTaskPhysicalOutputs(2));
   }
 
-  private void testThreeWayV1(CartesianProductEdgeManagerConfig config) throws Exception {
+  private void testThreeWayV1(CartesianProductConfigProto config) throws Exception {
     when(mockContext.getSourceVertexName()).thenReturn("v1");
     when(mockContext.getSourceVertexNumTasks()).thenReturn(3);
     edgeManager.initialize(config);
 
-    CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+    CompositeEventRouteMetadata compositeRoutingData =
+      edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
     assertNotNull(compositeRoutingData);
     assertEquals(1, compositeRoutingData.getCount());
     assertEquals(0, compositeRoutingData.getSource());
@@ -259,12 +271,13 @@ public class TestCartesianProductEdgeManagerPartitioned {
     assertEquals(3, edgeManager.getNumSourceTaskPhysicalOutputs(2));
   }
 
-  private void testThreeWayV2(CartesianProductEdgeManagerConfig config) throws Exception {
+  private void testThreeWayV2(CartesianProductConfigProto config) throws Exception {
     when(mockContext.getSourceVertexName()).thenReturn("v2");
     when(mockContext.getSourceVertexNumTasks()).thenReturn(4);
     edgeManager.initialize(config);
 
-    CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+    CompositeEventRouteMetadata compositeRoutingData =
+      edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
     assertNotNull(compositeRoutingData);
     assertEquals(1, compositeRoutingData.getCount());
     assertEquals(1, compositeRoutingData.getSource());

http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java
deleted file mode 100644
index 1ce9c8b..0000000
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.runtime.library.cartesianproduct;
-
-import org.apache.tez.dag.api.EdgeManagerPluginContext;
-import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata;
-import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.CompositeEventRouteMetadata;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.tez.runtime.library.cartesianproduct.TestCartesianProductEdgeManagerUnpartitioned.TestData.dataForDest;
-import static org.apache.tez.runtime.library.cartesianproduct.TestCartesianProductEdgeManagerUnpartitioned.TestData.dataForInputError;
-import static org.apache.tez.runtime.library.cartesianproduct.TestCartesianProductEdgeManagerUnpartitioned.TestData.dataForRouting;
-import static org.apache.tez.runtime.library.cartesianproduct.TestCartesianProductEdgeManagerUnpartitioned.TestData.dataForSrc;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class TestCartesianProductEdgeManagerUnpartitioned {
-  private EdgeManagerPluginContext mockContext;
-  private CartesianProductEdgeManagerUnpartitioned edgeManager;
-
-  @Before
-  public void setup() {
-    mockContext = mock(EdgeManagerPluginContext.class);
-    edgeManager = new CartesianProductEdgeManagerUnpartitioned(mockContext);
-  }
-
-  static class TestData {
-    int srcId, destId, inputId;
-    Object expected;
-
-    public TestData(int srcId, int destId, int inputId, Object expected) {
-      this.srcId = srcId;
-      this.destId = destId;
-      this.inputId = inputId;
-      this.expected = expected;
-    }
-
-    public static TestData dataForRouting(int srcId, int destId, Object expected) {
-      return new TestData(srcId, destId, -1, expected);
-    }
-
-    public static TestData dataForInputError(int destId, int inputId, Object expected) {
-      return new TestData(-1, destId, inputId, expected);
-    }
-
-    public static TestData dataForSrc(int srcId, Object expected) {
-      return new TestData(srcId, -1, -1, expected);
-    }
-
-    public static TestData dataForDest(int destId, Object expected) {
-      return new TestData(-1, destId, -1, expected);
-    }
-  }
-
-  private void testEdgeManager(CartesianProductEdgeManagerConfig conf, String vName, int numTask,
-                               String groupName, TestData cDMEInvalid, TestData cDMEValid,
-                               TestData srcFailInvalid, TestData srcFailValid,
-                               TestData inputError, TestData numDestInput,
-                               TestData numSrcOutputTest, TestData numConsumerTest)
-    throws Exception {
-    when(mockContext.getSourceVertexName()).thenReturn(vName);
-    when(mockContext.getSourceVertexNumTasks()).thenReturn(numTask);
-    when(mockContext.getVertexGroupName()).thenReturn(groupName);
-    edgeManager.initialize(conf);
-
-    CompositeEventRouteMetadata cDME =
-      edgeManager.routeCompositeDataMovementEventToDestination(cDMEInvalid.srcId,
-        cDMEInvalid.destId);
-    assertNull(cDME);
-
-    cDME = edgeManager.routeCompositeDataMovementEventToDestination(cDMEValid.srcId,
-      cDMEValid.destId);
-    assertNotNull(cDME);
-    CompositeEventRouteMetadata expectedCDME = (CompositeEventRouteMetadata)(cDMEValid.expected);
-    assertEquals(expectedCDME.getCount(), cDME.getCount());
-    assertEquals(expectedCDME.getTarget(), cDME.getTarget());
-    assertEquals(expectedCDME.getSource(), cDME.getSource());
-
-    EventRouteMetadata dme =
-      edgeManager.routeInputSourceTaskFailedEventToDestination(srcFailInvalid.srcId,
-        srcFailInvalid.destId);
-    assertNull(dme);
-
-    dme = edgeManager.routeInputSourceTaskFailedEventToDestination(srcFailValid.srcId,
-      srcFailValid.destId);
-    assertNotNull(dme);
-    EventRouteMetadata expectedDME = (EventRouteMetadata)(srcFailValid.expected);
-    assertEquals(expectedDME.getNumEvents(), dme.getNumEvents());
-    assertArrayEquals(expectedDME.getTargetIndices(), dme.getTargetIndices());
-
-    assertEquals(inputError.expected,
-      edgeManager.routeInputErrorEventToSource(inputError.destId, inputError.inputId));
-
-    assertEquals(numDestInput.expected,
-      edgeManager.getNumDestinationTaskPhysicalInputs(numDestInput.destId));
-    assertEquals(numSrcOutputTest.expected,
-      edgeManager.getNumSourceTaskPhysicalOutputs(numSrcOutputTest.srcId));
-    assertEquals(numConsumerTest.expected,
-      edgeManager.getNumDestinationConsumerTasks(numConsumerTest.srcId));
-  }
-
-  /**
-   * Vertex v0 has 2 tasks
-   * Vertex v1 has 3 tasks
-   */
-  @Test(timeout = 5000)
-  public void testTwoWayAllVertex() throws Exception {
-    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","v1"}, null,
-      new int[]{2,3}, 2, 0, null), "v0", 2, null,
-      dataForRouting(1, 1, null), dataForRouting(1, 3, CompositeEventRouteMetadata.create(1, 0, 0)),
-      dataForRouting(1, 1, null), dataForRouting(1, 3, EventRouteMetadata.create(1, new int[]{0})),
-      dataForInputError(1, 0, 0), dataForDest(1, 1), dataForSrc(1, 1), dataForSrc(1, 3));
-    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","v1"}, null,
-        new int[]{2,3}, 3, 0, null), "v1", 3, null,
-      dataForRouting(1, 2, null), dataForRouting(1, 1, CompositeEventRouteMetadata.create(1, 0, 0)),
-      dataForRouting(1, 2, null), dataForRouting(1, 1, EventRouteMetadata.create(1, new int[]{0})),
-      dataForInputError(1,0,1), dataForDest(1, 1), dataForSrc(1, 1), dataForSrc(1, 2));
-  }
-
-  /**
-   * Vertex v0 has 2 tasks
-   * Vertex v1 has 3 tasks
-   * Vertex v2 has 4 tasks
-   */
-  @Test(timeout = 5000)
-  public void testThreeWayAllVertex() throws Exception {
-    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","v1","v2"},
-      null, new int[]{2,3,4}, 2, 0, null), "v0", 2, null,
-      dataForRouting(1, 1, null), dataForRouting(1, 12, CompositeEventRouteMetadata.create(1, 0, 0)),
-      dataForRouting(1, 1, null), dataForRouting(1, 12, EventRouteMetadata.create(1, new int[]{0})),
-      dataForInputError(1, 0, 0), dataForDest(1, 1), dataForSrc(1, 1), dataForSrc(1, 12));
-    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","v1","v2"},
-        null, new int[]{2,3,4}, 3, 0, null), "v1", 3, null,
-      dataForRouting(1, 1, null), dataForRouting(1, 16, CompositeEventRouteMetadata.create(1, 0, 0)),
-      dataForRouting(1, 1, null), dataForRouting(1, 16, EventRouteMetadata.create(1, new int[]{0})),
-      dataForInputError(1, 0, 0), dataForDest(1, 1), dataForSrc(1, 1), dataForSrc(1, 8));
-    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","v1","v2"},
-        null, new int[]{2,3,4}, 4, 0, null), "v2", 4, null,
-      dataForRouting(1, 0, null), dataForRouting(1, 13, CompositeEventRouteMetadata.create(1, 0, 0)),
-      dataForRouting(1, 0, null), dataForRouting(1, 13, EventRouteMetadata.create(1, new int[]{0})),
-      dataForInputError(1, 0, 1), dataForDest(1, 1), dataForSrc(1, 1), dataForSrc(1, 6));
-  }
-
-  @Test(timeout = 5000)
-  public void testZeroSrcTask() {
-    CartesianProductEdgeManagerConfig emConfig =
-      new CartesianProductEdgeManagerConfig(false, new String[]{"v0", "v1"}, null,
-        new int[]{2,0}, 0,0, null);
-    testZeroSrcTaskV0(emConfig);
-    testZeroSrcTaskV1(emConfig);
-  }
-
-  private void testZeroSrcTaskV0(CartesianProductEdgeManagerConfig config) {
-    when(mockContext.getSourceVertexName()).thenReturn("v0");
-    when(mockContext.getSourceVertexNumTasks()).thenReturn(2);
-    edgeManager.initialize(config);
-
-    assertEquals(0, edgeManager.getNumDestinationConsumerTasks(0));
-    assertEquals(0, edgeManager.getNumDestinationConsumerTasks(1));
-  }
-
-  private void testZeroSrcTaskV1(CartesianProductEdgeManagerConfig config) {
-    when(mockContext.getSourceVertexName()).thenReturn("v1");
-    when(mockContext.getSourceVertexNumTasks()).thenReturn(0);
-    edgeManager.initialize(config);
-  }
-
-  /**
-   * Vertex v0 has 10 tasks 2 groups
-   * Vertex v1 has 30 tasks 3 group
-   */
-  @Test(timeout = 5000)
-  public void testTwoWayAllVertexAutoGrouping() throws Exception {
-    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","v1"},
-        null, new int[]{2,3}, 2, 0, null), "v0", 10, null,
-      dataForRouting(6, 1, null), dataForRouting(1, 0, CompositeEventRouteMetadata.create(1, 1, 0)),
-      dataForRouting(6, 1, null), dataForRouting(1, 0, EventRouteMetadata.create(1, new int[]{1})),
-      dataForInputError(1, 1, 1), dataForDest(1, 5), dataForSrc(1, 1), dataForSrc(1, 3));
-    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","v1"},
-        null, new int[]{2,3}, 3, 0, null), "v1", 30, null,
-      dataForRouting(6, 1, null), dataForRouting(11, 1, CompositeEventRouteMetadata.create(1, 1, 0)),
-      dataForRouting(6, 1, null), dataForRouting(11, 1, EventRouteMetadata.create(1, new int[]{1})),
-      dataForInputError(1, 1, 11), dataForDest(1, 10), dataForSrc(1, 1), dataForSrc(1, 2));
-  }
-
-  /**
-   * v0 with group g0 {v1, v2}
-   * Vertex v0 has 2 tasks
-   * Vertex v1 has 1 tasks
-   * Vertex v2 has 2 tasks
-   */
-  @Test(timeout = 5000)
-  public void testTwoWayVertexWithVertexGroup() throws Exception {
-    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","g0"},
-        null, new int[]{2,3}, 2, 0, null), "v0", 2, null,
-      dataForRouting(1, 1, null), dataForRouting(1, 3, CompositeEventRouteMetadata.create(1, 0, 0)),
-      dataForRouting(1, 1, null), dataForRouting(1, 3, EventRouteMetadata.create(1, new int[]{0})),
-      dataForInputError(1, 0, 0), dataForDest(1, 1), dataForSrc(1, 1), dataForSrc(1, 3));
-    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","g0"},
-        null, new int[]{2,3}, 1, 0, null), "v1", 1, "g0",
-      dataForRouting(0, 1, null), dataForRouting(0, 3, CompositeEventRouteMetadata.create(1, 0, 0)),
-      dataForRouting(0, 1, null), dataForRouting(0, 3, EventRouteMetadata.create(1, new int[]{0})),
-      dataForInputError(3, 0, 0), dataForDest(0, 1), dataForSrc(0, 1), dataForSrc(0, 2));
-    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","g0"},
-        null, new int[]{2,3}, 2, 1, null), "v2", 2, "g0",
-      dataForRouting(1, 1, null), dataForRouting(0, 1, CompositeEventRouteMetadata.create(1, 0, 0)),
-      dataForRouting(1, 1, null), dataForRouting(0, 1, EventRouteMetadata.create(1, new int[]{0})),
-      dataForInputError(1, 0, 0), dataForDest(1, 1), dataForSrc(1, 1), dataForSrc(1, 2));
-  }
-
-  /**
-   * group g0 {v1, v2} with group g1 {v3, v4}
-   *
-   * Vertex v0 has 1 tasks
-   * Vertex v1 has 2 tasks
-   * Vertex v2 has 3 tasks
-   * Vertex v3 has 4 tasks
-   */
-  @Test(timeout = 5000)
-  public void testTwoWayAllVertexGroup() throws Exception {
-    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"g0","g1"},
-        null, new int[]{3,7}, 1, 0, null), "v0", 1, "g0",
-      dataForRouting(0, 7, null), dataForRouting(0, 1, CompositeEventRouteMetadata.create(1, 0, 0)),
-      dataForRouting(0, 7, null), dataForRouting(0, 1, EventRouteMetadata.create(1, new int[]{0})),
-      dataForInputError(1, 0, 0), dataForDest(1, 1), dataForSrc(0, 1), dataForSrc(0, 7));
-    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"g0","g1"},
-        null, new int[]{3,7}, 2, 1, null), "v1", 2, "g0",
-      dataForRouting(0, 1, null), dataForRouting(1, 15, CompositeEventRouteMetadata.create(1, 0, 0)),
-      dataForRouting(0, 1, null), dataForRouting(1, 15, EventRouteMetadata.create(1, new int[]{0})),
-      dataForInputError(7, 0, 0), dataForDest(7, 1), dataForSrc(1, 1), dataForSrc(1, 7));
-    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"g0","g1"},
-        null, new int[]{3,7}, 3, 0, null), "v2", 3, "g1",
-      dataForRouting(1, 0, null), dataForRouting(1, 1, CompositeEventRouteMetadata.create(1, 0, 0)),
-      dataForRouting(1, 0, null), dataForRouting(1, 1, EventRouteMetadata.create(1, new int[]{0})),
-      dataForInputError(1, 0, 1), dataForDest(1, 1), dataForSrc(1, 1), dataForSrc(1, 3));
-    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"g0","g1"},
-        null, new int[]{3,7}, 4, 3, null), "v3", 4, "g1",
-      dataForRouting(0, 1, null), dataForRouting(1, 4, CompositeEventRouteMetadata.create(1, 0, 0)),
-      dataForRouting(0, 1, null), dataForRouting(1, 4, EventRouteMetadata.create(1, new int[]{0})),
-      dataForInputError(4, 0, 1), dataForDest(4, 1), dataForSrc(1, 1), dataForSrc(1, 3));
-  }
-
-
-  /**
-   * v0 with group g0 {v1, v2}
-   * Vertex v0 has 10 tasks, 2 groups
-   * Vertex v1 has 10 tasks, 1 group
-   * Vertex v2 has 20 tasks, 2 groups
-   */
-  @Test(timeout = 5000)
-  public void testTwoWayWithVertexGroupAutoGrouping() throws Exception {
-    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","g0"},
-        null, new int[]{2,3}, 2, 0, null), "v0", 10, null,
-      dataForRouting(0, 4, null), dataForRouting(2, 1, CompositeEventRouteMetadata.create(1, 2, 0)),
-      dataForRouting(0, 4, null), dataForRouting(2, 1, EventRouteMetadata.create(1, new int[]{2})),
-      dataForInputError(1, 3, 3), dataForDest(1, 5), dataForSrc(1, 1), dataForSrc(1, 3));
-    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","g0"},
-        null, new int[]{2,3}, 1, 0, null), "v1", 10, "g0",
-      dataForRouting(1, 1, null), dataForRouting(2, 3, CompositeEventRouteMetadata.create(1, 2, 0)),
-      dataForRouting(1, 1, null), dataForRouting(2, 3, EventRouteMetadata.create(1, new int[]{2})),
-      dataForInputError(3, 1, 1), dataForDest(0, 10), dataForSrc(1, 1), dataForSrc(1, 2));
-    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","g0"},
-        null, new int[]{2,3}, 2, 1, null), "v2", 20, "g0",
-      dataForRouting(11, 1, null), dataForRouting(12, 2, CompositeEventRouteMetadata.create(1, 2, 0)),
-      dataForRouting(11, 1, null), dataForRouting(12, 2, EventRouteMetadata.create(1, new int[]{2})),
-      dataForInputError(2, 2, 12), dataForDest(1, 10), dataForSrc(1, 1), dataForSrc(1, 2));
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManager.java
index 5144e69..5846c8b 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManager.java
@@ -97,7 +97,7 @@ public class TestCartesianProductVertexManager {
     config = new CartesianProductConfig(sourceVertices);
     vertexManager.initialize();
     assertTrue(vertexManager.getVertexManagerReal()
-      instanceof CartesianProductVertexManagerUnpartitioned);
+      instanceof FairCartesianProductVertexManager);
   }
 
   @Test(timeout = 5000)

http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerConfig.java
deleted file mode 100644
index 5c6ffa7..0000000
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerConfig.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.runtime.library.cartesianproduct;
-
-import org.apache.tez.dag.api.TezConfiguration;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-
-public class TestCartesianProductVertexManagerConfig {
-  @Test(timeout = 5000)
-  public void testAutoGroupingConfig() throws IOException {
-    List<String> sourceVertices = new ArrayList<>();
-    sourceVertices.add("v0");
-    sourceVertices.add("v1");
-    CartesianProductConfig config = new CartesianProductConfig(sourceVertices);
-    TezConfiguration conf = new TezConfiguration();
-
-    // auto group not set in proto
-    CartesianProductVertexManagerConfig vmConf =
-      CartesianProductVertexManagerConfig.fromUserPayload(config.toUserPayload(conf));
-    assertEquals(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_ENABLE_AUTO_GROUPING_DEFAULT,
-      vmConf.enableAutoGrouping);
-    assertEquals(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_DESIRED_BYTES_PER_GROUP_DEFAULT,
-      vmConf.desiredBytesPerChunk);
-
-    // auto group set in proto
-    conf.setBoolean(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_ENABLE_AUTO_GROUPING, true);
-    conf.setLong(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_DESIRED_BYTES_PER_GROUP, 1000);
-    vmConf = CartesianProductVertexManagerConfig.fromUserPayload(config.toUserPayload(conf));
-    assertEquals(true, vmConf.enableAutoGrouping);
-    assertEquals(1000, vmConf.desiredBytesPerChunk);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerPartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerPartitioned.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerPartitioned.java
index 36c0325..1012a36 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerPartitioned.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerPartitioned.java
@@ -32,6 +32,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.CartesianProductConfigProto;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -66,14 +67,13 @@ public class TestCartesianProductVertexManagerPartitioned {
 
   @Before
   public void setup() throws TezReflectionException {
-    setupWithConfig(
-      new CartesianProductVertexManagerConfig(true, new String[]{"v0","v1"}, new int[] {2, 2},
-        CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_SLOW_START_MIN_FRACTION_DEFAULT,
-        CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_SLOW_START_MAX_FRACTION_DEFAULT,
-        false, 0, null));
+    CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder();
+    builder.setIsPartitioned(true).addSources("v0").addSources("v1")
+      .addNumPartitions(2).addNumPartitions(2);
+    setupWithConfig(builder.build());
   }
 
-  private void setupWithConfig(CartesianProductVertexManagerConfig config)
+  private void setupWithConfig(CartesianProductConfigProto config)
     throws TezReflectionException {
     MockitoAnnotations.initMocks(this);
     context = mock(VertexManagerPluginContext.class);
@@ -102,7 +102,7 @@ public class TestCartesianProductVertexManagerPartitioned {
     }
   }
 
-  private void testReconfigureVertexHelper(CartesianProductVertexManagerConfig config,
+  private void testReconfigureVertexHelper(CartesianProductConfigProto config,
                                            int parallelism)
     throws Exception {
     setupWithConfig(config);
@@ -117,12 +117,12 @@ public class TestCartesianProductVertexManagerPartitioned {
 
   @Test(timeout = 5000)
   public void testReconfigureVertex() throws Exception {
-    testReconfigureVertexHelper(
-      new CartesianProductVertexManagerConfig(true, new String[]{"v0", "v1"}, new int[] {5, 5}, 0,
-        0, false, 0, new CartesianProductFilterDescriptor(TestFilter.class.getName())), 10);
-    testReconfigureVertexHelper(
-      new CartesianProductVertexManagerConfig(true, new String[]{"v0", "v1"}, new int[] {5, 5}, 0,
-        0, false, 0, null), 25);
+    CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder();
+    builder.setIsPartitioned(true).addSources("v0").addSources("v1")
+      .addNumPartitions(5).addNumPartitions(5).setFilterClassName(TestFilter.class.getName());
+    testReconfigureVertexHelper(builder.build(), 10);
+    builder.clearFilterClassName();
+    testReconfigureVertexHelper(builder.build(), 25);
   }
 
   @Test(timeout = 5000)


Mime
View raw message