tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhiyu...@apache.org
Subject [1/3] tez git commit: TEZ-3708. Improve parallelism and auto grouping of unpartitioned cartesian product (zhiyuany)
Date Thu, 11 May 2017 22:21:56 GMT
Repository: tez
Updated Branches:
  refs/heads/master dec7c1b52 -> a55fe80bf


http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java
deleted file mode 100644
index f95daa7..0000000
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java
+++ /dev/null
@@ -1,460 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.runtime.library.cartesianproduct;
-
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
-import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.VertexLocationHint;
-import org.apache.tez.dag.api.VertexManagerPluginContext;
-import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
-import org.apache.tez.dag.api.event.VertexState;
-import org.apache.tez.dag.api.event.VertexStateUpdate;
-import org.apache.tez.dag.records.TaskAttemptIdentifierImpl;
-import org.apache.tez.dag.records.TezDAGID;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.runtime.api.TaskAttemptIdentifier;
-import org.apache.tez.runtime.api.events.VertexManagerEvent;
-import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.MockitoAnnotations;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.tez.dag.api.EdgeProperty.DataMovementType.BROADCAST;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyMapOf;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.isNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-public class TestCartesianProductVertexManagerUnpartitioned {
-  private static long desiredBytesPerGroup = 1000;
-  @Captor
-  private ArgumentCaptor<Map<String, EdgeProperty>> edgePropertiesCaptor;
-  @Captor
-  private ArgumentCaptor<List<ScheduleTaskRequest>> scheduleRequestCaptor;
-  @Captor
-  private ArgumentCaptor<Integer> parallelismCaptor;
-  private CartesianProductVertexManagerUnpartitioned vertexManager;
-  private VertexManagerPluginContext ctx;
-
-  @Before
-  public void setup() {
-    MockitoAnnotations.initMocks(this);
-    ctx = mock(VertexManagerPluginContext.class);
-    vertexManager = new CartesianProductVertexManagerUnpartitioned(ctx);
-  }
-
-  /**
-   * v0 and v1 are two cartesian product sources
-   */
-  private void setupDAGVertexOnly(boolean doGrouping) throws Exception {
-    when(ctx.getInputVertexEdgeProperties()).thenReturn(getEdgePropertyMap(2));
-    setSrcParallelism(ctx, doGrouping ? 10 : 1, 2, 3);
-
-    CartesianProductVertexManagerConfig config = new CartesianProductVertexManagerConfig(
-      false, new String[]{"v0","v1"}, null, 0, 0, doGrouping, desiredBytesPerGroup, null);
-    vertexManager.initialize(config);
-  }
-
-  /**
-   * v0 and v1 are two cartesian product sources; v2 is broadcast source; without auto grouping
-   */
-  private void setupDAGVertexOnlyWithBroadcast() throws Exception {
-    Map<String, EdgeProperty> edgePropertyMap = getEdgePropertyMap(2);
-    edgePropertyMap.put("v2", EdgeProperty.create(BROADCAST, null, null, null, null));
-    when(ctx.getInputVertexEdgeProperties()).thenReturn(edgePropertyMap);
-    setSrcParallelism(ctx, 2, 3, 5);
-
-    CartesianProductVertexManagerConfig config =
-      new CartesianProductVertexManagerConfig(
-        false, new String[]{"v0","v1"}, null, 0, 0, false, 0, null);
-    vertexManager.initialize(config);
-  }
-
-  /**
-   * v0 and g0 are two sources; g0 is vertex group of v1 and v2
-   */
-  private void setupDAGVertexGroup(boolean doGrouping) throws Exception {
-    when(ctx.getInputVertexEdgeProperties()).thenReturn(getEdgePropertyMap(3));
-    setSrcParallelism(ctx, doGrouping ? 10: 1, 2, 3, 4);
-
-    Map<String, List<String>> vertexGroupMap = new HashMap<>();
-    vertexGroupMap.put("g0", Arrays.asList("v1", "v2"));
-    when(ctx.getInputVertexGroups()).thenReturn(vertexGroupMap);
-
-    CartesianProductVertexManagerConfig config = new CartesianProductVertexManagerConfig(
-      false, new String[]{"v0","g0"}, null, 0, 0, doGrouping, desiredBytesPerGroup, null);
-    vertexManager.initialize(config);
-  }
-
-  /**
-   * g0 and g1 are two sources; g0 is vertex group of v0 and v1; g1 is vertex group of v2 and v3
-   */
-  private void setupDAGVertexGroupOnly(boolean doGrouping) throws Exception {
-    when(ctx.getInputVertexEdgeProperties()).thenReturn(getEdgePropertyMap(4));
-    setSrcParallelism(ctx, doGrouping ? 10 : 1, 2, 3, 4, 5);
-
-    Map<String, List<String>> vertexGroupMap = new HashMap<>();
-    vertexGroupMap.put("g0", Arrays.asList("v0", "v1"));
-    vertexGroupMap.put("g1", Arrays.asList("v2", "v3"));
-    when(ctx.getInputVertexGroups()).thenReturn(vertexGroupMap);
-
-    CartesianProductVertexManagerConfig config = new CartesianProductVertexManagerConfig(
-      false, new String[]{"g0","g1"}, null, 0, 0, doGrouping, desiredBytesPerGroup, null);
-    vertexManager.initialize(config);
-  }
-
-  private Map<String, EdgeProperty> getEdgePropertyMap(int numSrcV) {
-    Map<String, EdgeProperty> edgePropertyMap = new HashMap<>();
-    for (int i = 0; i < numSrcV; i++) {
-      edgePropertyMap.put("v"+i, EdgeProperty.create(EdgeManagerPluginDescriptor.create(
-        CartesianProductEdgeManager.class.getName()), null, null, null, null));
-    }
-    return edgePropertyMap;
-  }
-
-  private void setSrcParallelism(VertexManagerPluginContext ctx, int multiplier, int... numTasks) {
-    int i = 0;
-    for (int numTask : numTasks) {
-      when(ctx.getVertexNumTasks(eq("v"+i))).thenReturn(numTask * multiplier);
-      i++;
-    }
-  }
-
-  private TaskAttemptIdentifier getTaId(String vertexName, int taskId) {
-    return new TaskAttemptIdentifierImpl("dag", vertexName,
-      TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
-        TezDAGID.getInstance("0", 0, 0), 0), taskId), 0));
-  }
-
-  private VertexManagerEvent getVMEevnt(long outputSize, String vName, int taskId) {
-
-    VertexManagerEventPayloadProto.Builder builder = VertexManagerEventPayloadProto.newBuilder();
-    builder.setOutputSize(outputSize);
-    VertexManagerEvent vmEvent =
-      VertexManagerEvent.create("cp vertex", builder.build().toByteString().asReadOnlyByteBuffer());
-    vmEvent.setProducerAttemptIdentifier(getTaId(vName, taskId));
-    return vmEvent;
-  }
-
-  private void verifyEdgeProperties(EdgeProperty edgeProperty, String[] sources,
-                                    int[] numChunksPerSrc, int numChunk, int chunkIdOffset)
-    throws InvalidProtocolBufferException {
-    CartesianProductEdgeManagerConfig conf = CartesianProductEdgeManagerConfig.fromUserPayload(
-      edgeProperty.getEdgeManagerDescriptor().getUserPayload());
-    assertArrayEquals(sources, conf.getSourceVertices().toArray());
-    assertArrayEquals(numChunksPerSrc, conf.numChunksPerSrc);
-    assertEquals(numChunk, conf.numChunk);
-    assertEquals(chunkIdOffset, conf.chunkIdOffset);
-  }
-
-  private void verifyScheduleRequest(int expectedTimes, int... expectedTid) {
-    verify(ctx, times(expectedTimes)).scheduleTasks(scheduleRequestCaptor.capture());
-    if (expectedTimes > 0) {
-      List<ScheduleTaskRequest> requests = scheduleRequestCaptor.getValue();
-      int i = 0;
-      for (int tid : expectedTid) {
-        assertEquals(tid, requests.get(i).getTaskIndex());
-        i++;
-      }
-    }
-  }
-
-  @Test(timeout = 5000)
-  public void testDAGVertexOnly() throws Exception {
-    setupDAGVertexOnly(false);
-
-    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
-    verify(ctx, never()).reconfigureVertex(
-      anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, EdgeProperty.class));
-
-    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
-    verify(ctx, times(1)).reconfigureVertex(parallelismCaptor.capture(),
-      isNull(VertexLocationHint.class), edgePropertiesCaptor.capture());
-    assertEquals(6, (int) parallelismCaptor.getValue());
-    Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue();
-    verifyEdgeProperties(edgeProperties.get("v0"), new String[]{"v0", "v1"}, new int[]{2, 3}, 2, 0);
-    verifyEdgeProperties(edgeProperties.get("v1"), new String[]{"v0", "v1"}, new int[]{2, 3}, 3, 0);
-
-    vertexManager.onVertexStarted(null);
-    verifyScheduleRequest(0);
-    vertexManager.onSourceTaskCompleted(getTaId("v0", 0));
-    verify(ctx, never()).scheduleTasks(scheduleRequestCaptor.capture());
-    vertexManager.onSourceTaskCompleted(getTaId("v1", 0));
-    verify(ctx, times(1)).scheduleTasks(scheduleRequestCaptor.capture());
-    verifyScheduleRequest(1, 0);
-
-    vertexManager.onSourceTaskCompleted(getTaId("v1", 1));
-    verify(ctx, times(2)).scheduleTasks(scheduleRequestCaptor.capture());
-    verifyScheduleRequest(2, 1);
-  }
-
-  @Test(timeout = 5000)
-  public void testDAGVertexOnlyWithGrouping() throws Exception {
-    setupDAGVertexOnly(true);
-    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
-    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
-
-    vertexManager.onVertexManagerEventReceived(getVMEevnt(desiredBytesPerGroup, "v0", 0));
-    verify(ctx, never()).reconfigureVertex(
-      anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, EdgeProperty.class));
-
-    vertexManager.onVertexManagerEventReceived(getVMEevnt(1, "v1", 0));
-    verify(ctx, never()).reconfigureVertex(
-      anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, EdgeProperty.class));
-
-    vertexManager.onVertexManagerEventReceived(getVMEevnt(0, "v0", 1));
-    for (int i = 1; i < 30; i++) {
-      vertexManager.onVertexManagerEventReceived(getVMEevnt(1, "v1", i));
-    }
-    verify(ctx, times(1)).reconfigureVertex(
-      anyInt(), any(VertexLocationHint.class), edgePropertiesCaptor.capture());
-    Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue();
-    verifyEdgeProperties(edgeProperties.get("v0"), new String[]{"v0", "v1"}, new int[]{10, 1}, 10, 0);
-    verifyEdgeProperties(edgeProperties.get("v1"), new String[]{"v0", "v1"}, new int[]{10, 1}, 1, 0);
-
-    vertexManager.onVertexStarted(null);
-    verifyScheduleRequest(0);
-    vertexManager.onSourceTaskCompleted(getTaId("v0", 0));
-    vertexManager.onSourceTaskCompleted(getTaId("v0", 1));
-    for (int i = 0; i < 29; i++) {
-      vertexManager.onSourceTaskCompleted(getTaId("v1", i));
-    }
-    verifyScheduleRequest(0);
-    vertexManager.onSourceTaskCompleted(getTaId("v1", 29));
-    verifyScheduleRequest(1, 0);
-  }
-
-  @Test(timeout = 5000)
-  public void testDAGVertexGroup() throws Exception {
-    setupDAGVertexGroup(false);
-
-    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
-    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
-    verify(ctx, never()).reconfigureVertex(
-      anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, EdgeProperty.class));
-
-    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v2", VertexState.CONFIGURED));
-    verify(ctx, times(1)).reconfigureVertex(parallelismCaptor.capture(),
-      isNull(VertexLocationHint.class), edgePropertiesCaptor.capture());
-    assertEquals(14, (int) parallelismCaptor.getValue());
-    Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue();
-
-    verifyEdgeProperties(edgeProperties.get("v0"), new String[]{"v0", "g0"}, new int[]{2, 7}, 2, 0);
-    verifyEdgeProperties(edgeProperties.get("v1"), new String[]{"v0", "g0"}, new int[]{2, 7}, 3, 0);
-    verifyEdgeProperties(edgeProperties.get("v2"), new String[]{"v0", "g0"}, new int[]{2, 7}, 4, 3);
-
-    vertexManager.onVertexStarted(null);
-    verifyScheduleRequest(0);
-    vertexManager.onSourceTaskCompleted(getTaId("v0",1));
-    vertexManager.onSourceTaskCompleted(getTaId("v1",2));
-    verifyScheduleRequest(1, 9);
-    vertexManager.onSourceTaskCompleted(getTaId("v2", 0));
-    verifyScheduleRequest(2, 10);
-  }
-
-  @Test(timeout = 5000)
-  public void testDAGVertexGroupWithGrouping() throws Exception {
-    setupDAGVertexGroup(true);
-
-    for (int i = 0; i < 3; i++) {
-      vertexManager.onVertexStateUpdated(new VertexStateUpdate("v" + i, VertexState.CONFIGURED));
-    }
-
-    vertexManager.onVertexManagerEventReceived(getVMEevnt(desiredBytesPerGroup, "v0", 0));
-    vertexManager.onVertexManagerEventReceived(getVMEevnt(desiredBytesPerGroup, "v1", 0));
-    verify(ctx, never()).reconfigureVertex(
-      anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, EdgeProperty.class));
-
-    vertexManager.onVertexManagerEventReceived(getVMEevnt(0, "v0", 1));
-    for (int i = 0; i < 40; i++) {
-      vertexManager.onVertexManagerEventReceived(getVMEevnt(1, "v2", i));
-    }
-
-    verify(ctx, times(1)).reconfigureVertex(
-      anyInt(), any(VertexLocationHint.class), edgePropertiesCaptor.capture());
-    Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue();
-    verifyEdgeProperties(edgeProperties.get("v0"), new String[]{"v0", "g0"}, new int[]{10, 31}, 10, 0);
-    verifyEdgeProperties(edgeProperties.get("v1"), new String[]{"v0", "g0"}, new int[]{10, 31}, 30, 0);
-    verifyEdgeProperties(edgeProperties.get("v2"), new String[]{"v0", "g0"}, new int[]{10, 31}, 1, 30);
-
-    vertexManager.onVertexStarted(null);
-    verifyScheduleRequest(0);
-    vertexManager.onSourceTaskCompleted(getTaId("v0", 0));
-    vertexManager.onSourceTaskCompleted(getTaId("v1", 10));
-    vertexManager.onSourceTaskCompleted(getTaId("v2", 0));
-    verifyScheduleRequest(0);
-    vertexManager.onSourceTaskCompleted(getTaId("v0", 1));
-    verifyScheduleRequest(1, 10);
-    for (int i = 1; i < 40; i++) {
-      vertexManager.onSourceTaskCompleted(getTaId("v2", i));
-    }
-    verifyScheduleRequest(2, 30);
-  }
-
-  @Test(timeout = 5000)
-  public void testDAGVertexGroupOnly() throws Exception {
-    setupDAGVertexGroupOnly(false);
-
-    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
-    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v2", VertexState.CONFIGURED));
-    verify(ctx, never()).reconfigureVertex(
-      anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, EdgeProperty.class));
-
-    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
-    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v3", VertexState.CONFIGURED));
-    verify(ctx, times(1)).reconfigureVertex(parallelismCaptor.capture(),
-      isNull(VertexLocationHint.class), edgePropertiesCaptor.capture());
-    assertEquals(45, (int) parallelismCaptor.getValue());
-    Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue();
-
-    verifyEdgeProperties(edgeProperties.get("v0"), new String[]{"g0", "g1"}, new int[]{5, 9}, 2, 0);
-    verifyEdgeProperties(edgeProperties.get("v1"), new String[]{"g0", "g1"}, new int[]{5, 9}, 3, 2);
-    verifyEdgeProperties(edgeProperties.get("v2"), new String[]{"g0", "g1"}, new int[]{5, 9}, 4, 0);
-    verifyEdgeProperties(edgeProperties.get("v3"), new String[]{"g0", "g1"}, new int[]{5, 9}, 5, 4);
-
-    vertexManager.onVertexStarted(null);
-    verifyScheduleRequest(0);
-    vertexManager.onSourceTaskCompleted(getTaId("v0", 1));
-    vertexManager.onSourceTaskCompleted(getTaId("v2", 3));
-    verifyScheduleRequest(1, 12);
-    vertexManager.onSourceTaskCompleted(getTaId("v1", 2));
-    verifyScheduleRequest(2, 39);
-    vertexManager.onSourceTaskCompleted(getTaId("v3", 0));
-    verifyScheduleRequest(3, 13, 40);
-  }
-
-  @Test(timeout = 5000)
-  public void testDAGVertexGroupOnlyWithGrouping() throws Exception {
-    setupDAGVertexGroupOnly(true);
-
-    for (int i = 0; i < 4; i++) {
-      vertexManager.onVertexStateUpdated(new VertexStateUpdate("v" + i, VertexState.CONFIGURED));
-    }
-
-    vertexManager.onVertexManagerEventReceived(getVMEevnt(desiredBytesPerGroup, "v0", 0));
-    vertexManager.onVertexManagerEventReceived(getVMEevnt(desiredBytesPerGroup, "v2", 0));
-    verify(ctx, never()).reconfigureVertex(
-      anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, EdgeProperty.class));
-
-    vertexManager.onVertexManagerEventReceived(getVMEevnt(0, "v0", 1));
-    for (int i = 0; i < 5; i++) {
-      vertexManager.onVertexManagerEventReceived(getVMEevnt(desiredBytesPerGroup/5, "v1", i));
-    }
-    for (int i = 0; i < 50; i++) {
-      vertexManager.onVertexManagerEventReceived(getVMEevnt(1, "v3", i));
-    }
-
-    verify(ctx, times(1)).reconfigureVertex(
-      anyInt(), any(VertexLocationHint.class), edgePropertiesCaptor.capture());
-    Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue();
-    verifyEdgeProperties(edgeProperties.get("v0"), new String[]{"g0", "g1"}, new int[]{16, 41}, 10, 0);
-    verifyEdgeProperties(edgeProperties.get("v1"), new String[]{"g0", "g1"}, new int[]{16, 41}, 6, 10);
-    verifyEdgeProperties(edgeProperties.get("v2"), new String[]{"g0", "g1"}, new int[]{16, 41}, 40, 0);
-    verifyEdgeProperties(edgeProperties.get("v3"), new String[]{"g0", "g1"}, new int[]{16, 41}, 1, 40);
-
-    vertexManager.onVertexStarted(null);
-    verifyScheduleRequest(0);
-    vertexManager.onSourceTaskCompleted(getTaId("v0", 1));
-    vertexManager.onSourceTaskCompleted(getTaId("v2", 20));
-    verifyScheduleRequest(0);
-    vertexManager.onSourceTaskCompleted(getTaId("v0", 0));
-    verifyScheduleRequest(1, 20);
-    vertexManager.onSourceTaskCompleted(getTaId("v3", 0));
-    verifyScheduleRequest(1);
-    for (int i = 1; i < 50; i++) {
-      vertexManager.onSourceTaskCompleted(getTaId("v3", i));
-    }
-    verifyScheduleRequest(2, 40);
-    vertexManager.onSourceTaskCompleted(getTaId("v1", 5));
-    verifyScheduleRequest(2);
-    for (int i = 6; i < 10; i++) {
-      vertexManager.onSourceTaskCompleted(getTaId("v1", i));
-    }
-    verifyScheduleRequest(3, 471, 491);
-  }
-
-  @Test(timeout = 5000)
-  public void testSchedulingVertexOnlyWithBroadcast() throws Exception {
-    setupDAGVertexOnlyWithBroadcast();
-    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
-    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
-    vertexManager.onVertexStarted(null);
-
-    verifyScheduleRequest(0);
-    vertexManager.onSourceTaskCompleted(getTaId("v0", 0));
-    vertexManager.onSourceTaskCompleted(getTaId("v1", 1));
-    verifyScheduleRequest(0);
-    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v2", VertexState.RUNNING));
-    verifyScheduleRequest(1);
-    verify(ctx, times(1)).scheduleTasks(scheduleRequestCaptor.capture());
-  }
-
-  @Test(timeout = 5000)
-  public void testOnVertexStart() throws Exception {
-    setupDAGVertexOnly(false);
-    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
-    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
-
-    vertexManager.onVertexStarted(Arrays.asList(getTaId("v0", 0), getTaId("v1", 0)));
-    verifyScheduleRequest(1, 0);
-  }
-
-  @Test(timeout = 5000)
-  public void testZeroSrcTask() throws Exception {
-    ctx = mock(VertexManagerPluginContext.class);
-    vertexManager = new CartesianProductVertexManagerUnpartitioned(ctx);
-    when(ctx.getVertexNumTasks(eq("v0"))).thenReturn(2);
-    when(ctx.getVertexNumTasks(eq("v1"))).thenReturn(0);
-
-    CartesianProductVertexManagerConfig config =
-      new CartesianProductVertexManagerConfig(
-        false, new String[]{"v0","v1"}, null, 0, 0, false, 0, null);
-    Map<String, EdgeProperty> edgePropertyMap = new HashMap<>();
-    edgePropertyMap.put("v0", EdgeProperty.create(EdgeManagerPluginDescriptor.create(
-      CartesianProductEdgeManager.class.getName()), null, null, null, null));
-    edgePropertyMap.put("v1", EdgeProperty.create(EdgeManagerPluginDescriptor.create(
-      CartesianProductEdgeManager.class.getName()), null, null, null, null));
-    when(ctx.getInputVertexEdgeProperties()).thenReturn(edgePropertyMap);
-
-    vertexManager.initialize(config);
-    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
-    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
-    vertexManager.onVertexStarted(new ArrayList<TaskAttemptIdentifier>());
-    vertexManager.onSourceTaskCompleted(getTaId("v0", 0));
-    vertexManager.onSourceTaskCompleted(getTaId("v0", 1));
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestFairCartesianProductEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestFairCartesianProductEdgeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestFairCartesianProductEdgeManager.java
new file mode 100644
index 0000000..ac7262e
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestFairCartesianProductEdgeManager.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.CompositeEventRouteMetadata;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.*;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestFairCartesianProductEdgeManager {
+  private EdgeManagerPluginContext mockContext;
+  private FairCartesianProductEdgeManager edgeManager;
+
+  @Before
+  public void setup() {
+    mockContext = mock(EdgeManagerPluginContext.class);
+    edgeManager = new FairCartesianProductEdgeManager(mockContext);
+  }
+
+  static class TestData {
+    int srcId, destId, inputId;
+    Object expected;
+
+    public TestData(int srcId, int destId, int inputId, Object expected) {
+      this.srcId = srcId;
+      this.destId = destId;
+      this.inputId = inputId;
+      this.expected = expected;
+    }
+  }
+
+  private TestData dataForRouting(int srcId, int destId, Object expected) {
+    return new TestData(srcId, destId, -1, expected);
+  }
+
+  private TestData dataForInputError(int destId, int inputId, Object expected) {
+    return new TestData(-1, destId, inputId, expected);
+  }
+
+  private TestData dataForSrc(int srcId, Object expected) {
+    return new TestData(srcId, -1, -1, expected);
+  }
+
+  private TestData dataForDest(int destId, Object expected) {
+    return new TestData(-1, destId, -1, expected);
+  }
+
+  private void testEdgeManager(CartesianProductConfigProto conf, String vName, int numTask,
+                               String groupName, TestData cDMEInvalid, TestData cDMEValid,
+                               TestData srcFailInvalid, TestData srcFailValid,
+                               TestData inputError, TestData numDestInput,
+                               TestData numSrcOutputTest, TestData numConsumerTest)
+    throws Exception {
+    when(mockContext.getSourceVertexName()).thenReturn(vName);
+    when(mockContext.getSourceVertexNumTasks()).thenReturn(numTask);
+    when(mockContext.getVertexGroupName()).thenReturn(groupName);
+    edgeManager.initialize(conf);
+
+    CompositeEventRouteMetadata cDME;
+
+    if (cDMEInvalid != null) {
+      cDME = edgeManager.routeCompositeDataMovementEventToDestination(cDMEInvalid.srcId,
+        cDMEInvalid.destId);
+      assertNull(cDME);
+    }
+
+    cDME = edgeManager.routeCompositeDataMovementEventToDestination(cDMEValid.srcId,
+      cDMEValid.destId);
+    assertNotNull(cDME);
+    CompositeEventRouteMetadata expectedCDME = (CompositeEventRouteMetadata)(cDMEValid.expected);
+    assertEquals(expectedCDME.getCount(), cDME.getCount());
+    assertEquals(expectedCDME.getTarget(), cDME.getTarget());
+    assertEquals(expectedCDME.getSource(), cDME.getSource());
+
+    EventRouteMetadata dme;
+    if (srcFailInvalid != null) {
+      dme = edgeManager.routeInputSourceTaskFailedEventToDestination(srcFailInvalid.srcId,
+        srcFailInvalid.destId);
+      assertNull(dme);
+    }
+
+    dme = edgeManager.routeInputSourceTaskFailedEventToDestination(srcFailValid.srcId,
+      srcFailValid.destId);
+    assertNotNull(dme);
+    EventRouteMetadata expectedDME = (EventRouteMetadata)(srcFailValid.expected);
+    assertEquals(expectedDME.getNumEvents(), dme.getNumEvents());
+    assertArrayEquals(expectedDME.getTargetIndices(), dme.getTargetIndices());
+
+    assertEquals(inputError.expected,
+      edgeManager.routeInputErrorEventToSource(inputError.destId, inputError.inputId));
+
+    assertEquals(numDestInput.expected,
+      edgeManager.getNumDestinationTaskPhysicalInputs(numDestInput.destId));
+    assertEquals(numSrcOutputTest.expected,
+      edgeManager.getNumSourceTaskPhysicalOutputs(numSrcOutputTest.srcId));
+    assertEquals(numConsumerTest.expected,
+      edgeManager.getNumDestinationConsumerTasks(numConsumerTest.srcId));
+  }
+
+  /**
+   * Vertex v0 has 2 tasks, 2 chunks
+   * Vertex v1 has 30 tasks, 3 chunks
+   */
+  @Test(timeout = 5000)
+  public void testTwoWayAllVertex() throws Exception {
+    CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder();
+    builder.setIsPartitioned(false).addSources("v0").addSources("v1")
+      .addNumChunks(2).addNumChunks(3).setMaxParallelism(10).setNumPartitionsForFairCase(10);
+    CartesianProductConfigProto config = builder.build();
+    testEdgeManager(config, "v0", 2, null, dataForRouting(1, 1, null),
+      dataForRouting(1, 3, CompositeEventRouteMetadata.create(10, 0, 0)),
+      dataForRouting(1, 1, null),
+      dataForRouting(1, 3, EventRouteMetadata.create(10, new int[]{0,1,2,3,4,5,6,7,8,9})),
+      dataForInputError(1, 0, 0), dataForDest(1, 10), dataForSrc(1, 10), dataForSrc(1, 3));
+    testEdgeManager(config, "v1", 30, null, dataForRouting(1, 2, null),
+      dataForRouting(1, 0, CompositeEventRouteMetadata.create(10, 10, 0)),
+      dataForRouting(1, 2, null),
+      dataForRouting(1, 0, EventRouteMetadata.create(10, new int[]{10,11,12,13,14,15,16,17,18,19})),
+      dataForInputError(1,0,10), dataForDest(1, 100), dataForSrc(1, 10), dataForSrc(1, 2));
+  }
+
+  /**
+   * Vertex v0 has 2 tasks, 2 chunks
+   * Vertex v1 has 30 tasks, 3 chunks
+   * Vertex v2 has 1 tasks, 4 chunks
+   */
+  @Test(timeout = 5000)
+  public void testThreeWayAllVertex() throws Exception {
+    CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder();
+    builder.setIsPartitioned(false).addSources("v0").addSources("v1").addSources("v2")
+      .addNumChunks(2).addNumChunks(3).addNumChunks(4)
+      .setMaxParallelism(12).setNumPartitionsForFairCase(12);
+    CartesianProductConfigProto config = builder.build();
+    testEdgeManager(config, "v0", 2, null, dataForRouting(1, 1, null),
+      dataForRouting(1, 12, CompositeEventRouteMetadata.create(12, 0, 0)),
+      dataForRouting(1, 1, null),
+      dataForRouting(1, 12, EventRouteMetadata.create(12, new int[]{0,1,2,3,4,5,6,7,8,9,10,11})),
+      dataForInputError(1, 0, 0), dataForDest(1, 12), dataForSrc(1, 12), dataForSrc(1, 12));
+    testEdgeManager(config, "v1", 30, null, dataForRouting(1, 4, null),
+      dataForRouting(1, 13, CompositeEventRouteMetadata.create(12, 12, 0)),
+      dataForRouting(1, 4, null),
+      dataForRouting(1, 13,
+        EventRouteMetadata.create(12, new int[]{12,13,14,15,16,17,18,19,20,21,22,23})),
+      dataForInputError(1, 0, 0), dataForDest(1, 120), dataForSrc(1, 12), dataForSrc(1, 8));
+    testEdgeManager(config, "v2", 1, null,
+      null, dataForRouting(0, 13, CompositeEventRouteMetadata.create(3, 0, 3)),
+      null, dataForRouting(0, 13, EventRouteMetadata.create(3, new int[]{0,1,2})),
+      dataForInputError(1, 0, 0), dataForDest(1, 3), dataForSrc(0, 12), dataForSrc(0, 24));
+  }
+
+  /**
+   * v0 with group g0 {v1, v2}
+   * Vertex v0 has 2 chunks
+   * Vertex v1 has 10 tasks
+   * Vertex v2 has 20 tasks
+   * Group g0 has 3 chunks
+   */
+  @Test(timeout = 5000)
+  public void testTwoWayVertexWithVertexGroup() throws Exception {
+    CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder();
+    builder.setIsPartitioned(false).addSources("v0").addSources("g0")
+      .addNumChunks(2).addNumChunks(3).setPositionInGroup(10).setNumPartitionsForFairCase(10)
+      .addNumTaskPerVertexInGroup(10).addNumTaskPerVertexInGroup(20).setPositionInGroup(0);
+    testEdgeManager(builder.build(), "v1", 10, "g0", dataForRouting(0, 4, null),
+      dataForRouting(0, 3, CompositeEventRouteMetadata.create(10, 0, 0)),
+      dataForRouting(0, 4, null),
+      dataForRouting(0, 3, EventRouteMetadata.create(10, new int[]{0,1,2,3,4,5,6,7,8,9})),
+      dataForInputError(3, 0, 0), dataForDest(2, 34), dataForSrc(0, 10), dataForSrc(0, 2));
+    builder.setPositionInGroup(1);
+    testEdgeManager(builder.build(), "v2", 20, "g0", dataForRouting(1, 1, null),
+      dataForRouting(6, 1, CompositeEventRouteMetadata.create(4, 33, 6)),
+      dataForRouting(1, 1, null),
+      dataForRouting(6, 1, EventRouteMetadata.create(4, new int[]{33,34,35,36})),
+      dataForInputError(1, 33, 6), dataForDest(0, 66), dataForSrc(1, 10), dataForSrc(6, 4));
+  }
+
+  /**
+   * group g0 {v1, v2} with group g1 {v3, v4}
+   *
+   * Vertex v0 has 2 tasks
+   * Vertex v1 has 4 tasks
+   * Group g0 has 2 chunks
+   * Group g1 has 3 chunks
+   */
+  @Test(timeout = 5000)
+  public void testTwoWayAllVertexGroup() throws Exception {
+    CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder();
+    builder.setIsPartitioned(false).addSources("g0").addSources("g1")
+      .addNumChunks(2).addNumChunks(3).setMaxParallelism(10).setNumPartitionsForFairCase(10)
+      .addNumTaskPerVertexInGroup(2).addNumTaskPerVertexInGroup(5).setPositionInGroup(0);
+    testEdgeManager(builder.build(), "v0", 2, "g0", dataForRouting(1, 1, null),
+      dataForRouting(0, 1, CompositeEventRouteMetadata.create(10, 0, 0)),
+      dataForRouting(1, 1, null),
+      dataForRouting(0, 1, EventRouteMetadata.create(10, new int[]{0,1,2,3,4,5,6,7,8,9})),
+      dataForInputError(1, 0, 0), dataForDest(1, 10), dataForSrc(1, 10), dataForSrc(1, 3));
+    builder.setPositionInGroup(1);
+    testEdgeManager(builder.build(), "v1", 5, "g0", dataForRouting(3, 1, null),
+      dataForRouting(1, 1, CompositeEventRouteMetadata.create(10, 20, 0)),
+      dataForRouting(3, 1, null),
+      dataForRouting(1, 1, EventRouteMetadata.create(10, new int[]{20,21,22,23,24,25,26,27,28,29})),
+      dataForInputError(1, 15, 0), dataForDest(1, 25), dataForSrc(1, 10), dataForSrc(1, 3));
+  }
+
+  @Test(timeout = 5000)
+  public void testNumPartition() throws Exception {
+    when(mockContext.getSourceVertexName()).thenReturn("source");
+    when(mockContext.getSourceVertexNumTasks()).thenReturn(10);
+    when(mockContext.getVertexGroupName()).thenReturn(null);
+
+    CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder();
+    builder.setIsPartitioned(false).addSources("v0").addSources("v1").setMaxParallelism(100);
+
+    edgeManager.initialize(builder.build());
+    assertEquals(10, edgeManager.getNumSourceTaskPhysicalOutputs(0));
+
+    builder.setNumPartitionsForFairCase(20);
+    edgeManager = new FairCartesianProductEdgeManager(mockContext);
+    edgeManager.initialize(builder.build());
+    assertEquals(20, edgeManager.getNumSourceTaskPhysicalOutputs(0));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestFairCartesianProductVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestFairCartesianProductVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestFairCartesianProductVertexManager.java
new file mode 100644
index 0000000..01d7f0b
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestFairCartesianProductVertexManager.java
@@ -0,0 +1,500 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.primitives.Ints;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.records.TaskAttemptIdentifierImpl;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.CartesianProductConfigProto;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.MockitoAnnotations;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.tez.dag.api.EdgeProperty.DataMovementType.BROADCAST;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyMapOf;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestFairCartesianProductVertexManager {
+  @Captor
+  private ArgumentCaptor<Map<String, EdgeProperty>> edgePropertiesCaptor;
+  @Captor
+  private ArgumentCaptor<List<ScheduleTaskRequest>> scheduleRequestCaptor;
+  private FairCartesianProductVertexManager vertexManager;
+  private VertexManagerPluginContext ctx;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    ctx = mock(VertexManagerPluginContext.class);
+    vertexManager = new FairCartesianProductVertexManager(ctx);
+  }
+
+  /**
+   * v0 and v1 are two cartesian product sources
+   */
+  private void setupDAGVertexOnly(int maxParallelism, long minOpsPerWorker,
+                                  int srcParallelismMultiplier) throws Exception {
+    when(ctx.getInputVertexEdgeProperties()).thenReturn(getEdgePropertyMap(2));
+    setSrcParallelism(ctx, srcParallelismMultiplier, 2, 3);
+
+    CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder();
+    builder.setIsPartitioned(false).addSources("v0").addSources("v1")
+      .setMaxParallelism(maxParallelism).setMinOpsPerWorker(minOpsPerWorker)
+      .setNumPartitionsForFairCase(maxParallelism);
+    vertexManager.initialize(builder.build());
+  }
+
+  /**
+   * v0 and v1 are two cartesian product sources; v2 is broadcast source
+   */
+  private void setupDAGVertexOnlyWithBroadcast(int maxParallelism, long minWorkloadPerWorker,
+                                               int srcParallelismMultiplier) throws Exception {
+    Map<String, EdgeProperty> edgePropertyMap = getEdgePropertyMap(2);
+    edgePropertyMap.put("v2", EdgeProperty.create(BROADCAST, null, null, null, null));
+    when(ctx.getInputVertexEdgeProperties()).thenReturn(edgePropertyMap);
+    setSrcParallelism(ctx, srcParallelismMultiplier, 2, 3, 5);
+
+    CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder();
+    builder.setIsPartitioned(false).addSources("v0").addSources("v1")
+      .setMaxParallelism(maxParallelism).setMinOpsPerWorker(minWorkloadPerWorker)
+      .setNumPartitionsForFairCase(maxParallelism);
+    vertexManager.initialize(builder.build());
+  }
+
+  /**
+   * v0 and g0 are two sources; g0 is vertex group of v1 and v2
+   */
+  private void setupDAGVertexGroup(int maxParallelism, long minWorkloadPerWorker,
+                                   int srcParallelismMultiplier) throws Exception {
+    when(ctx.getInputVertexEdgeProperties()).thenReturn(getEdgePropertyMap(3));
+    setSrcParallelism(ctx, srcParallelismMultiplier, 2, 3, 4);
+
+    Map<String, List<String>> vertexGroupMap = new HashMap<>();
+    vertexGroupMap.put("g0", Arrays.asList("v1", "v2"));
+    when(ctx.getInputVertexGroups()).thenReturn(vertexGroupMap);
+
+    CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder();
+    builder.setIsPartitioned(false).addSources("v0").addSources("g0")
+      .setNumPartitionsForFairCase(maxParallelism).setMaxParallelism(maxParallelism)
+      .setMinOpsPerWorker(minWorkloadPerWorker);
+    vertexManager.initialize(builder.build());
+  }
+
+  /**
+   * g0 and g1 are two sources; g0 is vertex group of v0 and v1; g1 is vertex group of v2 and v3
+   */
+  private void setupDAGVertexGroupOnly(int maxParallelism, long minWorkloadPerWorker,
+                                       int srcParallelismMultiplier) throws Exception {
+    when(ctx.getInputVertexEdgeProperties()).thenReturn(getEdgePropertyMap(4));
+    setSrcParallelism(ctx, srcParallelismMultiplier, 2, 3, 4, 5);
+
+    Map<String, List<String>> vertexGroupMap = new HashMap<>();
+    vertexGroupMap.put("g0", Arrays.asList("v0", "v1"));
+    vertexGroupMap.put("g1", Arrays.asList("v2", "v3"));
+    when(ctx.getInputVertexGroups()).thenReturn(vertexGroupMap);
+
+    CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder();
+    builder.setIsPartitioned(false).addSources("g0").addSources("g1")
+      .setNumPartitionsForFairCase(maxParallelism)
+      .setMaxParallelism(maxParallelism).setMinOpsPerWorker(minWorkloadPerWorker);
+    vertexManager.initialize(builder.build());
+  }
+
+  private Map<String, EdgeProperty> getEdgePropertyMap(int numSrcV) {
+    Map<String, EdgeProperty> edgePropertyMap = new HashMap<>();
+    for (int i = 0; i < numSrcV; i++) {
+      edgePropertyMap.put("v"+i, EdgeProperty.create(EdgeManagerPluginDescriptor.create(
+        CartesianProductEdgeManager.class.getName()), null, null, null, null));
+    }
+    return edgePropertyMap;
+  }
+
+  private void setSrcParallelism(VertexManagerPluginContext ctx, int multiplier, int... numTasks) {
+    int i = 0;
+    for (int numTask : numTasks) {
+      when(ctx.getVertexNumTasks(eq("v"+i))).thenReturn(numTask * multiplier);
+      i++;
+    }
+  }
+
+  private TaskAttemptIdentifier getTaId(String vertexName, int taskId) {
+    return new TaskAttemptIdentifierImpl("dag", vertexName,
+      TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
+        TezDAGID.getInstance("0", 0, 0), 0), taskId), 0));
+  }
+
+  private VertexManagerEvent getVMEvent(long numRecord, String vName, int taskId) {
+
+    VertexManagerEventPayloadProto.Builder builder = VertexManagerEventPayloadProto.newBuilder();
+    builder.setNumRecord(numRecord);
+    VertexManagerEvent vmEvent =
+      VertexManagerEvent.create("cp vertex", builder.build().toByteString().asReadOnlyByteBuffer());
+    vmEvent.setProducerAttemptIdentifier(getTaId(vName, taskId));
+    return vmEvent;
+  }
+
+  private void verifyEdgeProperties(EdgeProperty edgeProperty, String[] sources,
+                                    int[] numChunksPerSrc, int maxParallelism)
+    throws InvalidProtocolBufferException {
+    CartesianProductConfigProto config = CartesianProductConfigProto.parseFrom(ByteString.copyFrom(
+      edgeProperty.getEdgeManagerDescriptor().getUserPayload().getPayload()));
+    assertArrayEquals(sources, config.getSourcesList().toArray());
+    assertArrayEquals(numChunksPerSrc, Ints.toArray(config.getNumChunksList()));
+    assertEquals(maxParallelism, config.getMaxParallelism());
+  }
+
+  private void verifyVertexGroupInfo(EdgeProperty edgeProperty, int positionInGroup,
+                                     int... numTaskPerVertexInGroup)
+    throws InvalidProtocolBufferException {
+    CartesianProductConfigProto config = CartesianProductConfigProto.parseFrom(ByteString.copyFrom(
+      edgeProperty.getEdgeManagerDescriptor().getUserPayload().getPayload()));
+    assertEquals(positionInGroup, config.getPositionInGroup());
+    int i = 0;
+    for (int numTask : numTaskPerVertexInGroup) {
+      assertEquals(numTask, config.getNumTaskPerVertexInGroup(i));
+      i++;
+    }
+  }
+
+  private void verifyScheduleRequest(int expectedTimes, int... expectedTid) {
+    verify(ctx, times(expectedTimes)).scheduleTasks(scheduleRequestCaptor.capture());
+    if (expectedTimes > 0) {
+      List<ScheduleTaskRequest> requests = scheduleRequestCaptor.getValue();
+      int i = 0;
+      for (int tid : expectedTid) {
+        assertEquals(tid, requests.get(i).getTaskIndex());
+        i++;
+      }
+    }
+  }
+
+  @Test(timeout = 5000)
+  public void testDAGVertexOnlyGroupByMaxParallelism() throws Exception {
+    setupDAGVertexOnly(30, 1, 1);
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
+
+    vertexManager.onVertexManagerEventReceived(getVMEvent(250, "v0", 0));
+    verify(ctx, never()).reconfigureVertex(
+      anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, EdgeProperty.class));
+
+    vertexManager.onVertexManagerEventReceived(getVMEvent(200, "v1", 0));
+    verify(ctx, times(1)).reconfigureVertex(
+      eq(30), any(VertexLocationHint.class), edgePropertiesCaptor.capture());
+    Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue();
+    verifyEdgeProperties(edgeProperties.get("v0"), new String[]{"v0", "v1"}, new int[]{5, 6}, 30);
+    verifyVertexGroupInfo(edgeProperties.get("v0"), 0);
+    verifyEdgeProperties(edgeProperties.get("v1"), new String[]{"v0", "v1"}, new int[]{5, 6}, 30);
+    verifyVertexGroupInfo(edgeProperties.get("v1"), 0);
+
+    vertexManager.onVertexStarted(null);
+    verifyScheduleRequest(0);
+    vertexManager.onSourceTaskCompleted(getTaId("v0", 0));
+    vertexManager.onSourceTaskCompleted(getTaId("v1", 0));
+    verifyScheduleRequest(1, 0, 6, 1, 7);
+    vertexManager.onSourceTaskCompleted(getTaId("v0", 1));
+    verifyScheduleRequest(2, 12, 13, 18, 19, 24, 25);
+  }
+
+  @Test(timeout = 5000)
+  public void testDAGVertexOnlyGroupByMinOpsPerWorker() throws Exception {
+    setupDAGVertexOnly(100, 10000, 10);
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
+
+    for (int i = 0; i < 20; i++) {
+      vertexManager.onVertexManagerEventReceived(getVMEvent(20, "v0", i));
+    }
+
+    for (int i = 0; i < 30; i++) {
+      vertexManager.onVertexManagerEventReceived(getVMEvent(10, "v1", i));
+    }
+
+    verify(ctx, times(1)).reconfigureVertex(
+      eq(12), any(VertexLocationHint.class), edgePropertiesCaptor.capture());
+    Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue();
+    verifyEdgeProperties(edgeProperties.get("v0"), new String[]{"v0", "v1"}, new int[]{4, 3}, 100);
+    verifyEdgeProperties(edgeProperties.get("v1"), new String[]{"v0", "v1"}, new int[]{4, 3}, 100);
+
+    vertexManager.onVertexStarted(null);
+    verifyScheduleRequest(0);
+
+    for (int i = 0; i < 5; i++) {
+      vertexManager.onSourceTaskCompleted(getTaId("v0", i));
+    }
+    for (int i = 0; i < 10; i++) {
+      vertexManager.onSourceTaskCompleted(getTaId("v1", 10 + i));
+    }
+    verifyScheduleRequest(1, 1);
+  }
+
+  @Test(timeout = 5000)
+  public void testDAGVertexGroup() throws Exception {
+    setupDAGVertexGroup(100, 1, 1);
+
+    for (int i = 0; i < 3; i++) {
+      vertexManager.onVertexStateUpdated(new VertexStateUpdate("v" + i, VertexState.CONFIGURED));
+    }
+
+    vertexManager.onVertexManagerEventReceived(getVMEvent(100, "v0", 0));
+    vertexManager.onVertexManagerEventReceived(getVMEvent(10, "v1", 0));
+    vertexManager.onVertexManagerEventReceived(getVMEvent(5, "v2", 0));
+    vertexManager.onVertexManagerEventReceived(getVMEvent(5, "v2", 1));
+    verify(ctx, times(1)).reconfigureVertex(
+      eq(100), any(VertexLocationHint.class), edgePropertiesCaptor.capture());
+    Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue();
+    for (int i = 0; i < 3; i++) {
+      verifyEdgeProperties(edgeProperties.get("v" + i), new String[]{"v0", "g0"},
+        new int[]{20, 5}, 100);
+    }
+
+    vertexManager.onVertexStarted(null);
+    vertexManager.onSourceTaskCompleted(getTaId("v0", 0));
+    vertexManager.onSourceTaskCompleted(getTaId("v1", 0));
+    verifyScheduleRequest(0);
+    vertexManager.onSourceTaskCompleted(getTaId("v2", 0));
+    verifyScheduleRequest(1, 0, 5, 10, 15, 20, 25, 30, 35, 40, 45);
+    vertexManager.onSourceTaskCompleted(getTaId("v1", 1));
+    verifyScheduleRequest(1);
+    vertexManager.onSourceTaskCompleted(getTaId("v2", 1));
+    verifyScheduleRequest(2, 1, 6, 11, 16, 21, 26, 31, 36, 41, 46);
+  }
+
+  @Test(timeout = 5000)
+  public void testDAGVertexGroupOnly() throws Exception {
+    setupDAGVertexGroupOnly(100, 1, 1);
+
+    for (int i = 0; i < 4; i++) {
+      vertexManager.onVertexStateUpdated(new VertexStateUpdate("v" + i, VertexState.CONFIGURED));
+    }
+
+    vertexManager.onVertexManagerEventReceived(getVMEvent(20, "v0", 0));
+    vertexManager.onVertexManagerEventReceived(getVMEvent(20, "v1", 0));
+    vertexManager.onVertexManagerEventReceived(getVMEvent(5, "v2", 0));
+    vertexManager.onVertexManagerEventReceived(getVMEvent(5, "v2", 1));
+    vertexManager.onVertexManagerEventReceived(getVMEvent(16, "v3", 0));
+
+    verify(ctx, times(1)).reconfigureVertex(
+      eq(100), any(VertexLocationHint.class), edgePropertiesCaptor.capture());
+    Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue();
+    for (int i = 0; i < 4; i++) {
+      verifyEdgeProperties(edgeProperties.get("v" + i), new String[]{"g0", "g1"},
+        new int[]{10, 10}, 100);
+    }
+    verifyVertexGroupInfo(edgeProperties.get("v0"), 0);
+    verifyVertexGroupInfo(edgeProperties.get("v1"), 1, 2);
+    verifyVertexGroupInfo(edgeProperties.get("v2"), 0);
+    verifyVertexGroupInfo(edgeProperties.get("v3"), 1, 4);
+
+    vertexManager.onVertexStarted(null);
+    vertexManager.onSourceTaskCompleted(getTaId("v0", 0));
+    vertexManager.onSourceTaskCompleted(getTaId("v1", 0));
+    vertexManager.onSourceTaskCompleted(getTaId("v2", 1));
+    verifyScheduleRequest(0);
+    vertexManager.onSourceTaskCompleted(getTaId("v3", 1));
+    verifyScheduleRequest(1, 3, 13, 23);
+  }
+
+  @Test(timeout = 5000)
+  public void testSchedulingVertexOnlyWithBroadcast() throws Exception {
+    setupDAGVertexOnlyWithBroadcast(30, 1, 1);
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
+
+    vertexManager.onVertexManagerEventReceived(getVMEvent(250, "v0", 0));
+    vertexManager.onVertexManagerEventReceived(getVMEvent(200, "v1", 0));
+    verify(ctx, times(1)).reconfigureVertex(
+      eq(30), any(VertexLocationHint.class), edgePropertiesCaptor.capture());
+    assertFalse(edgePropertiesCaptor.getValue().containsKey("v2"));
+
+    vertexManager.onVertexStarted(null);
+    vertexManager.onSourceTaskCompleted(getTaId("v0", 0));
+    vertexManager.onSourceTaskCompleted(getTaId("v1", 0));
+    verifyScheduleRequest(0);
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v2", VertexState.RUNNING));
+    verifyScheduleRequest(1, 0, 1, 6, 7);
+  }
+
+
+  @Test(timeout = 5000)
+  public void testOnVertexStart() throws Exception {
+    setupDAGVertexOnly(6, 1, 1);
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
+    vertexManager.onVertexManagerEventReceived(getVMEvent(100, "v0", 0));
+    vertexManager.onVertexManagerEventReceived(getVMEvent(100, "v1", 0));
+
+    verifyScheduleRequest(0);
+
+    vertexManager.onVertexStarted(Arrays.asList(getTaId("v0", 0), getTaId("v1", 0)));
+    verifyScheduleRequest(1, 0);
+  }
+
+  @Test(timeout = 5000)
+  public void testZeroSrcTask() throws Exception {
+    ctx = mock(VertexManagerPluginContext.class);
+    vertexManager = new FairCartesianProductVertexManager(ctx);
+    when(ctx.getVertexNumTasks("v0")).thenReturn(2);
+    when(ctx.getVertexNumTasks("v1")).thenReturn(0);
+
+    CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder();
+    builder.setIsPartitioned(false).addSources("v0").addSources("v1")
+      .addNumChunks(2).addNumChunks(3).setMaxParallelism(6);
+    CartesianProductConfigProto config = builder.build();
+
+    Map<String, EdgeProperty> edgePropertyMap = new HashMap<>();
+    edgePropertyMap.put("v0", EdgeProperty.create(EdgeManagerPluginDescriptor.create(
+      CartesianProductEdgeManager.class.getName()), null, null, null, null));
+    edgePropertyMap.put("v1", EdgeProperty.create(EdgeManagerPluginDescriptor.create(
+      CartesianProductEdgeManager.class.getName()), null, null, null, null));
+    when(ctx.getInputVertexEdgeProperties()).thenReturn(edgePropertyMap);
+
+    vertexManager.initialize(config);
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
+    vertexManager.onVertexStarted(new ArrayList<TaskAttemptIdentifier>());
+    vertexManager.onSourceTaskCompleted(getTaId("v0", 0));
+    vertexManager.onSourceTaskCompleted(getTaId("v0", 1));
+  }
+
+  private void setupGroupingFractionTest() throws Exception {
+    when(ctx.getInputVertexEdgeProperties()).thenReturn(getEdgePropertyMap(2));
+    setSrcParallelism(ctx, 10, 2, 3);
+
+    CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder();
+    builder.setIsPartitioned(false).addSources("v0").addSources("v1")
+      .setMaxParallelism(30).setMinOpsPerWorker(1)
+      .setNumPartitionsForFairCase(30).setGroupingFraction(0.5f);
+    vertexManager.initialize(builder.build());
+
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
+  }
+  @Test(timeout = 5000)
+  public void testGroupingFraction() throws Exception {
+    setupGroupingFractionTest();
+    vertexManager.onVertexManagerEventReceived(getVMEvent(10000, "v0", 0));
+    vertexManager.onVertexManagerEventReceived(getVMEvent(10000, "v1", 0));
+    for (int i = 0; i < 10; i++) {
+      vertexManager.onSourceTaskCompleted(getTaId("v0", i));
+    }
+    for (int i = 0; i < 14; i++) {
+      vertexManager.onSourceTaskCompleted(getTaId("v1", i));
+    }
+    verify(ctx, never()).reconfigureVertex(
+      anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, EdgeProperty.class));
+
+    vertexManager.onSourceTaskCompleted(getTaId("v1", 14));
+    verify(ctx, times(1)).reconfigureVertex(
+      eq(24), any(VertexLocationHint.class), edgePropertiesCaptor.capture());
+  }
+
+  @Test(timeout = 5000)
+  public void testGroupFractionWithZeroStats() throws Exception {
+    setupGroupingFractionTest();
+
+    for (int i = 0; i < 10; i++) {
+      vertexManager.onSourceTaskCompleted(getTaId("v0", i));
+    }
+    for (int i = 0; i < 15; i++) {
+      vertexManager.onSourceTaskCompleted(getTaId("v1", i));
+    }
+    verify(ctx, never()).reconfigureVertex(
+      anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, EdgeProperty.class));
+  }
+
+  @Test(timeout = 5000)
+  public void testGroupingFractionWithZeroOutput() throws Exception {
+    setupGroupingFractionTest();
+
+    for (int i = 0; i < 20; i++) {
+      vertexManager.onSourceTaskCompleted(getTaId("v0", i));
+    }
+    for (int i = 0; i < 30; i++) {
+      vertexManager.onSourceTaskCompleted(getTaId("v1", i));
+    }
+    verify(ctx, times(1)).reconfigureVertex(
+      eq(0), any(VertexLocationHint.class), edgePropertiesCaptor.capture());
+  }
+
+  @Test(timeout = 5000)
+  public void testZeroSrcOutput() throws Exception {
+    setupDAGVertexOnly(10, 1, 1);
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
+    vertexManager.onVertexManagerEventReceived(getVMEvent(0, "v0", 0));
+    vertexManager.onVertexManagerEventReceived(getVMEvent(0, "v0", 1));
+    vertexManager.onVertexManagerEventReceived(getVMEvent(0, "v1", 0));
+    vertexManager.onVertexManagerEventReceived(getVMEvent(0, "v1", 1));
+    vertexManager.onVertexManagerEventReceived(getVMEvent(0, "v1", 2));
+    verify(ctx, times(1)).reconfigureVertex(
+      eq(0), any(VertexLocationHint.class), edgePropertiesCaptor.capture());
+  }
+
+  @Test(timeout = 5000)
+  public void testDisableGrouping() throws Exception {
+    when(ctx.getInputVertexEdgeProperties()).thenReturn(getEdgePropertyMap(2));
+    setSrcParallelism(ctx, 1, 2, 3);
+
+    CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder();
+    builder.setIsPartitioned(false).addSources("v0").addSources("v1")
+      .setMaxParallelism(30).setMinOpsPerWorker(1).setEnableGrouping(false);
+    vertexManager.initialize(builder.build());
+
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
+
+    vertexManager.onVertexManagerEventReceived(getVMEvent(250, "v0", 0));
+    vertexManager.onVertexManagerEventReceived(getVMEvent(200, "v1", 0));
+    verify(ctx, times(1)).reconfigureVertex(
+      eq(6), any(VertexLocationHint.class), edgePropertiesCaptor.capture());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestGrouper.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestGrouper.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestGrouper.java
index 481bd7e..08b2efa 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestGrouper.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestGrouper.java
@@ -30,12 +30,12 @@ public class TestGrouper {
   @Test(timeout = 5000)
   public void testEvenlyGrouping() {
     grouper.init(4, 2);
-    assertEquals(0, grouper.getFirstTaskInGroup(0));
-    assertEquals(2, grouper.getFirstTaskInGroup(1));
-    assertEquals(2, grouper.getNumTasksInGroup(0));
-    assertEquals(2, grouper.getNumTasksInGroup(1));
-    assertEquals(1, grouper.getLastTaskInGroup(0));
-    assertEquals(3, grouper.getLastTaskInGroup(1));
+    assertEquals(0, grouper.getFirstItemInGroup(0));
+    assertEquals(2, grouper.getFirstItemInGroup(1));
+    assertEquals(2, grouper.getNumItemsInGroup(0));
+    assertEquals(2, grouper.getNumItemsInGroup(1));
+    assertEquals(1, grouper.getLastItemInGroup(0));
+    assertEquals(3, grouper.getLastItemInGroup(1));
     assertEquals(0, grouper.getGroupId(1));
     assertEquals(1, grouper.getGroupId(2));
     assertTrue(grouper.isInGroup(2, 1));
@@ -45,12 +45,12 @@ public class TestGrouper {
   @Test(timeout = 5000)
   public void testUnevenlyGrouping() {
     grouper.init(5, 2);
-    assertEquals(0, grouper.getFirstTaskInGroup(0));
-    assertEquals(2, grouper.getFirstTaskInGroup(1));
-    assertEquals(2, grouper.getNumTasksInGroup(0));
-    assertEquals(3, grouper.getNumTasksInGroup(1));
-    assertEquals(1, grouper.getLastTaskInGroup(0));
-    assertEquals(4, grouper.getLastTaskInGroup(1));
+    assertEquals(0, grouper.getFirstItemInGroup(0));
+    assertEquals(2, grouper.getFirstItemInGroup(1));
+    assertEquals(2, grouper.getNumItemsInGroup(0));
+    assertEquals(3, grouper.getNumItemsInGroup(1));
+    assertEquals(1, grouper.getLastItemInGroup(0));
+    assertEquals(4, grouper.getLastItemInGroup(1));
     assertEquals(0, grouper.getGroupId(1));
     assertEquals(1, grouper.getGroupId(3));
     assertTrue(grouper.isInGroup(3, 1));
@@ -60,9 +60,9 @@ public class TestGrouper {
   @Test(timeout = 5000)
   public void testSingleGroup() {
     grouper.init(4, 1);
-    assertEquals(0, grouper.getFirstTaskInGroup(0));
-    assertEquals(4, grouper.getNumTasksInGroup(0));
-    assertEquals(3, grouper.getLastTaskInGroup(0));
+    assertEquals(0, grouper.getFirstItemInGroup(0));
+    assertEquals(4, grouper.getNumItemsInGroup(0));
+    assertEquals(3, grouper.getLastItemInGroup(0));
     assertEquals(0, grouper.getGroupId(0));
     assertEquals(0, grouper.getGroupId(3));
     assertTrue(grouper.isInGroup(3, 0));
@@ -71,9 +71,9 @@ public class TestGrouper {
   @Test(timeout = 5000)
   public void testNoGrouping() {
     grouper.init(2, 2);
-    assertEquals(0, grouper.getFirstTaskInGroup(0));
-    assertEquals(1, grouper.getNumTasksInGroup(0));
-    assertEquals(0, grouper.getLastTaskInGroup(0));
+    assertEquals(0, grouper.getFirstItemInGroup(0));
+    assertEquals(1, grouper.getNumItemsInGroup(0));
+    assertEquals(0, grouper.getLastItemInGroup(0));
     assertEquals(0, grouper.getGroupId(0));
     assertTrue(grouper.isInGroup(0, 0));
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/CartesianProduct.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/CartesianProduct.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/CartesianProduct.java
new file mode 100644
index 0000000..6096f96
--- /dev/null
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/CartesianProduct.java
@@ -0,0 +1,385 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.examples;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.DataSourceDescriptor;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.InputInitializerDescriptor;
+import org.apache.tez.dag.api.OutputCommitterDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.api.client.VertexStatus;
+import org.apache.tez.examples.TezExampleBase;
+import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
+import org.apache.tez.runtime.api.AbstractLogicalOutput;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.api.InputInitializer;
+import org.apache.tez.runtime.api.InputInitializerContext;
+import org.apache.tez.runtime.api.OutputCommitter;
+import org.apache.tez.runtime.api.OutputCommitterContext;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.api.Writer;
+import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import org.apache.tez.runtime.api.events.InputInitializerEvent;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.cartesianproduct.CartesianProductConfig;
+import org.apache.tez.runtime.library.cartesianproduct.CartesianProductEdgeManager;
+import org.apache.tez.runtime.library.cartesianproduct.CartesianProductVertexManager;
+import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig;
+import org.apache.tez.runtime.library.partitioner.RoundRobinPartitioner;
+import org.apache.tez.runtime.library.processor.SimpleProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This job has three vertices: two Tokenizers and one JoinProcessor. Each Tokenizer handles one
+ * input directory and generates tokens. CustomPartitioner separates tokens into 2 partitions
+ * according to the parity of token's first char. Then JoinProcessor does cartesian product of
+ * partitioned token sets.
+ */
+public class CartesianProduct extends TezExampleBase {
+  private static final int srcParallelism = 1;
+  private static final int numRecordPerSrc = 10;
+  private static final String INPUT = "Input1";
+  private static final String OUTPUT = "Output";
+  private static final String VERTEX1 = "Vertex1";
+  private static final String VERTEX2 = "Vertex2";
+  private static final String VERTEX3 = "Vertex3";
+  private static final Logger LOG = LoggerFactory.getLogger(CartesianProduct.class);
+  private static final String[] sourceVertices = new String[] {VERTEX1, VERTEX2};
+
+  public static class TokenProcessor extends SimpleProcessor {
+    public TokenProcessor(ProcessorContext context) {
+      super(context);
+    }
+
+    @Override
+    public void run() throws Exception {
+      Preconditions.checkArgument(getInputs().size() == 1);
+      Preconditions.checkArgument(getOutputs().size() == 1);
+      KeyValueReader kvReader = (KeyValueReader) getInputs().get(INPUT).getReader();
+      KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(VERTEX3).getWriter();
+      while (kvReader.next()) {
+        Object key = kvReader.getCurrentKey();
+        Object value = kvReader.getCurrentValue();
+        kvWriter.write(new Text((String)key), new IntWritable(1));
+      }
+    }
+  }
+
+  public static class JoinProcessor extends SimpleMRProcessor {
+    public JoinProcessor(ProcessorContext context) {
+      super(context);
+    }
+
+    @Override
+    public void run() throws Exception {
+      KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(OUTPUT).getWriter();
+      KeyValueReader kvReader1 = (KeyValueReader) getInputs().get(VERTEX1).getReader();
+      KeyValueReader kvReader2 = (KeyValueReader) getInputs().get(VERTEX2).getReader();
+      Set<Object> leftSet = new HashSet<>();
+      Set<Object> rightSet = new HashSet<>();
+
+      while (kvReader1.next()) {
+        Text key = (Text)(kvReader1.getCurrentKey());
+        leftSet.add(new Text(key));
+      }
+      while (kvReader2.next()) {
+        Text key = (Text)(kvReader2.getCurrentKey());
+        rightSet.add(new Text(key));
+      }
+
+      for (Object l : leftSet) {
+        for (Object r : rightSet) {
+          kvWriter.write(l, r);
+        }
+      }
+    }
+  }
+
+  public static class FakeInputInitializer extends InputInitializer {
+
+    /**
+     * Constructor an instance of the InputInitializer. Classes extending this to create a
+     * InputInitializer, must provide the same constructor so that Tez can create an instance of
+     * the class at runtime.
+     *
+     * @param initializerContext initializer context which can be used to access the payload, vertex
+     *                           properties, etc
+     */
+    public FakeInputInitializer(InputInitializerContext initializerContext) {
+      super(initializerContext);
+    }
+
+    @Override
+    public List<Event> initialize() throws Exception {
+      List<Event> list = new ArrayList<>();
+      list.add(InputConfigureVertexTasksEvent.create(srcParallelism, null, null));
+      for (int i = 0; i < srcParallelism; i++) {
+        list.add(InputDataInformationEvent.createWithObjectPayload(i, null));
+      }
+      return list;
+    }
+
+    @Override
+    public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws Exception {
+
+    }
+  }
+
+  public static class FakeInput extends AbstractLogicalInput {
+
+    /**
+     * Constructor an instance of the LogicalInput. Classes extending this one to create a
+     * LogicalInput, must provide the same constructor so that Tez can create an instance of the
+     * class at runtime.
+     *
+     * @param inputContext      the {@link InputContext} which provides
+     *                          the Input with context information within the running task.
+     * @param numPhysicalInputs the number of physical inputs that the logical input will
+     */
+    public FakeInput(InputContext inputContext, int numPhysicalInputs) {
+      super(inputContext, numPhysicalInputs);
+    }
+
+    @Override
+    public List<Event> initialize() throws Exception {
+      getContext().requestInitialMemory(0, null);
+      getContext().inputIsReady();
+      return null;
+    }
+
+    @Override
+    public void handleEvents(List<Event> inputEvents) throws Exception {
+
+    }
+
+    @Override
+    public List<Event> close() throws Exception {
+      return null;
+    }
+
+    @Override
+    public void start() throws Exception {
+
+    }
+
+    @Override
+    public Reader getReader() throws Exception {
+      return new KeyValueReader() {
+        String[] keys = new String[numRecordPerSrc];
+
+        int i = -1;
+
+        @Override
+        public boolean next() throws IOException {
+          if (i == -1) {
+            for (int j = 0; j < numRecordPerSrc; j++) {
+              keys[j] = ""+j;
+            }
+          }
+          i++;
+          return i < keys.length;
+        }
+
+        @Override
+        public Object getCurrentKey() throws IOException {
+          return keys[i];
+        }
+
+        @Override
+        public Object getCurrentValue() throws IOException {
+          return keys[i];
+        }
+      };
+    }
+  }
+
+  public static class FakeOutputCommitter extends OutputCommitter {
+
+    /**
+     * Constructor an instance of the OutputCommitter. Classes extending this to create a
+     * OutputCommitter, must provide the same constructor so that Tez can create an instance of
+     * the class at runtime.
+     *
+     * @param committerContext committer context which can be used to access the payload, vertex
+     *                         properties, etc
+     */
+    public FakeOutputCommitter(OutputCommitterContext committerContext) {
+      super(committerContext);
+    }
+
+    @Override
+    public void initialize() throws Exception {
+
+    }
+
+    @Override
+    public void setupOutput() throws Exception {
+
+    }
+
+    @Override
+    public void commitOutput() throws Exception {
+
+    }
+
+    @Override
+    public void abortOutput(VertexStatus.State finalState) throws Exception {
+
+    }
+  }
+
+  public static class FakeOutput extends AbstractLogicalOutput {
+
+    /**
+     * Constructor an instance of the LogicalOutput. Classes extending this one to create a
+     * LogicalOutput, must provide the same constructor so that Tez can create an instance of the
+     * class at runtime.
+     *
+     * @param outputContext      the {@link OutputContext} which
+     *                           provides
+     *                           the Output with context information within the running task.
+     * @param numPhysicalOutputs the number of physical outputs that the logical output will
+     */
+    public FakeOutput(OutputContext outputContext, int numPhysicalOutputs) {
+      super(outputContext, numPhysicalOutputs);
+    }
+
+    @Override
+    public List<Event> initialize() throws Exception {
+      getContext().requestInitialMemory(0, null);
+      return null;
+    }
+
+    @Override
+    public void handleEvents(List<Event> outputEvents) {
+
+    }
+
+    @Override
+    public List<Event> close() throws Exception {
+      return null;
+    }
+
+    @Override
+    public void start() throws Exception {
+
+    }
+
+    @Override
+    public Writer getWriter() throws Exception {
+      return new KeyValueWriter() {
+        @Override
+        public void write(Object key, Object value) throws IOException {
+          System.out.println(key + " XXX " + value);
+        }
+      };
+    }
+  }
+
+  private DAG createDAG(TezConfiguration tezConf) throws IOException {
+    InputDescriptor inputDescriptor = InputDescriptor.create(FakeInput.class.getName());
+    InputInitializerDescriptor inputInitializerDescriptor =
+      InputInitializerDescriptor.create(FakeInputInitializer.class.getName());
+    DataSourceDescriptor dataSourceDescriptor =
+      DataSourceDescriptor.create(inputDescriptor, inputInitializerDescriptor, null);
+
+    Vertex v1 = Vertex.create(VERTEX1, ProcessorDescriptor.create(TokenProcessor.class.getName()));
+    v1.addDataSource(INPUT, dataSourceDescriptor);
+    Vertex v2 = Vertex.create(VERTEX2, ProcessorDescriptor.create(TokenProcessor.class.getName()));
+    v2.addDataSource(INPUT, dataSourceDescriptor);
+
+    OutputDescriptor outputDescriptor = OutputDescriptor.create(FakeOutput.class.getName());
+    OutputCommitterDescriptor outputCommitterDescriptor =
+      OutputCommitterDescriptor.create(FakeOutputCommitter.class.getName());
+    DataSinkDescriptor dataSinkDescriptor =
+      DataSinkDescriptor.create(outputDescriptor, outputCommitterDescriptor, null);
+
+    CartesianProductConfig cartesianProductConfig =
+      new CartesianProductConfig(Arrays.asList(sourceVertices));
+    UserPayload userPayload = cartesianProductConfig.toUserPayload(tezConf);
+
+    Vertex v3 = Vertex.create(VERTEX3, ProcessorDescriptor.create(JoinProcessor.class.getName()));
+    v3.addDataSink(OUTPUT, dataSinkDescriptor);
+    v3.setVertexManagerPlugin(
+      VertexManagerPluginDescriptor.create(CartesianProductVertexManager.class.getName())
+                                   .setUserPayload(userPayload));
+
+    EdgeManagerPluginDescriptor edgeManagerDescriptor =
+      EdgeManagerPluginDescriptor.create(CartesianProductEdgeManager.class.getName());
+    edgeManagerDescriptor.setUserPayload(userPayload);
+    UnorderedPartitionedKVEdgeConfig edgeConf =
+      UnorderedPartitionedKVEdgeConfig.newBuilder(Text.class.getName(), IntWritable.class.getName(),
+        RoundRobinPartitioner.class.getName()).build();
+    EdgeProperty edgeProperty = edgeConf.createDefaultCustomEdgeProperty(edgeManagerDescriptor);
+
+    return DAG.create("CrossProduct").addVertex(v1).addVertex(v2).addVertex(v3)
+      .addEdge(Edge.create(v1, v3, edgeProperty)).addEdge(Edge.create(v2, v3, edgeProperty));
+  }
+
+  @Override
+  protected void printUsage() {}
+
+  @Override
+  protected int validateArgs(String[] otherArgs) {
+    return 0;
+  }
+
+  @Override
+  protected int runJob(String[] args, TezConfiguration tezConf,
+      TezClient tezClient) throws Exception {
+    DAG dag = createDAG(tezConf);
+    return runDag(dag, isCountersLog(), LOG);
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new CartesianProduct(), args);
+    System.exit(res);
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
index 90d2825..cdbdf13 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
@@ -81,6 +81,8 @@ public class ExampleDriver {
           "Filters lines by the specified word using OneToOne edge");
       pgd.addClass("multiplecommitsExample", MultipleCommitsExample.class,
           "Job with multiple commits in both vertex group and vertex");
+      pgd.addClass("cartesianproduct", CartesianProduct.class,
+          "Cartesian Product Example");
       exitCode = pgd.run(argv);
     }
     catch(Throwable e){

http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
index af1cb6f..e2fc53f 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
@@ -858,8 +858,11 @@ public class TestFaultTolerance {
     String[] sourceVertices = {"v1", "v2"};
     CartesianProductConfig cartesianProductConfig =
       new CartesianProductConfig(Arrays.asList(sourceVertices));
+    TezConfiguration tezConf = new TezConfiguration();
+    tezConf.setInt(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_NUM_PARTITIONS, 1);
+    tezConf.setBoolean(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_ENABLE_GROUPING, false);
     UserPayload cartesianProductPayload =
-      cartesianProductConfig.toUserPayload(new TezConfiguration());
+      cartesianProductConfig.toUserPayload(tezConf);
 
     v3.setVertexManagerPlugin(
       VertexManagerPluginDescriptor.create(CartesianProductVertexManager.class.getName())

http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
index 69dea7e..8b292ab 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
@@ -22,6 +22,8 @@ import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.tez.dag.api.OutputDescriptor;
@@ -86,6 +88,15 @@ public class TestOutput extends AbstractLogicalOutput {
       DataMovementEvent event = DataMovementEvent.create(i, result);
       events.add(event);
     }
+
+    ShuffleUserPayloads.VertexManagerEventPayloadProto.Builder vmBuilder =
+      ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder().setNumRecord(1);
+
+    VertexManagerEvent vmEvent = VertexManagerEvent.create(
+      getContext().getDestinationVertexName(),
+      vmBuilder.build().toByteString().asReadOnlyByteBuffer());
+
+    events.add(vmEvent);
     return events;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
index be9b0bf..2dfc76d 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
@@ -54,6 +54,8 @@ import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.client.StatusGetOpts;
 import org.apache.tez.dag.api.client.VertexStatus;
+import org.apache.tez.mapreduce.examples.CartesianProduct;
+import org.apache.tez.runtime.library.cartesianproduct.CartesianProductVertexManager;
 import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
 import org.apache.tez.runtime.library.partitioner.HashPartitioner;
 import org.slf4j.Logger;
@@ -1333,4 +1335,15 @@ public class TestTezJobs {
       tezClient.stop();
     }
   }
+
+  @Test(timeout = 60000)
+  public void testCartesianProduct() throws Exception {
+    LOG.info("Running CartesianProduct Test");
+    CartesianProduct job = new CartesianProduct();
+
+    TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+    tezConf.setInt(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_MAX_PARALLELISM, 10);
+    tezConf.setInt(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_MIN_OPS_PER_WORKER, 25);
+    Assert.assertEquals("CartesianProduct failed", job.run(tezConf, null, null), 0);
+  }
 }


Mime
View raw message