drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ve...@apache.org
Subject [6/8] drill git commit: DRILL-133: LocalExchange planning and exec.
Date Mon, 09 Mar 2015 08:23:53 GMT
http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedDeMuxExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedDeMuxExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedDeMuxExchange.java
new file mode 100644
index 0000000..0bc6678
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedDeMuxExchange.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.config;
+
+import java.util.Collections;
+
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Receiver;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ * UnorderedDeMuxExchange is a version of DeMuxExchange where the incoming batches are not sorted.
+ */
+@JsonTypeName("unordered-demux-exchange")
+public class UnorderedDeMuxExchange extends AbstractDeMuxExchange {
+
+  public UnorderedDeMuxExchange(@JsonProperty("child") PhysicalOperator child, @JsonProperty("expr") LogicalExpression expr) {
+    super(child, expr);
+  }
+
+  @Override
+  public Receiver getReceiver(int minorFragmentId) {
+    createSenderReceiverMapping();
+
+    MinorFragmentEndpoint sender = receiverToSenderMapping.get(minorFragmentId);
+    if (sender == null) {
+      throw new IllegalStateException(String.format("Failed to find sender for receiver [%d]", minorFragmentId));
+    }
+
+    return new UnorderedReceiver(this.senderMajorFragmentId, Collections.singletonList(sender));
+  }
+
+  @Override
+  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+    return new UnorderedDeMuxExchange(child, expr);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedMuxExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedMuxExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedMuxExchange.java
new file mode 100644
index 0000000..3028ee3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedMuxExchange.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.config;
+
+import java.util.List;
+
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Receiver;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ * UnorderedMuxExchange is a version of MuxExchange where the incoming batches are not sorted.
+ */
+@JsonTypeName("unordered-mux-exchange")
+public class UnorderedMuxExchange extends AbstractMuxExchange {
+
+  public UnorderedMuxExchange(@JsonProperty("child") PhysicalOperator child) {
+    super(child);
+  }
+
+  @Override
+  public Receiver getReceiver(int minorFragmentId) {
+    createSenderReceiverMapping();
+
+    List<MinorFragmentEndpoint> senders = receiverToSenderMapping.get(minorFragmentId);
+    if (senders == null || senders.size() <= 0) {
+      throw new IllegalStateException(String.format("Failed to find senders for receiver [%d]", minorFragmentId));
+    }
+
+    return new UnorderedReceiver(senderMajorFragmentId, senders);
+  }
+
+  @Override
+  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+    return new UnorderedMuxExchange(child);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java
index 3a4dd0e..e741dd4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java
@@ -19,13 +19,12 @@ package org.apache.drill.exec.physical.config;
 
 import java.util.List;
 
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
 import org.apache.drill.exec.physical.base.AbstractReceiver;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
@@ -33,19 +32,10 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 public class UnorderedReceiver extends AbstractReceiver{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnorderedReceiver.class);
 
-  private List<DrillbitEndpoint> senders;
-
   @JsonCreator
   public UnorderedReceiver(@JsonProperty("sender-major-fragment") int oppositeMajorFragmentId,
-                        @JsonProperty("senders") List<DrillbitEndpoint> senders) {
-    super(oppositeMajorFragmentId);
-    this.senders = senders;
-  }
-
-  @Override
-  @JsonProperty("senders")
-  public List<DrillbitEndpoint> getProvidingEndpoints() {
-    return senders;
+                           @JsonProperty("senders") List<MinorFragmentEndpoint> senders) {
+    super(oppositeMajorFragmentId, senders);
   }
 
   @Override
@@ -62,9 +52,4 @@ public class UnorderedReceiver extends AbstractReceiver{
   public int getOperatorType() {
     return CoreOperatorType.UNORDERED_RECEIVER_VALUE;
   }
-
-  @JsonIgnore
-  public int getNumSenders() {
-    return senders.size();
-  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 812c89c..1ef7bbd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -22,14 +22,11 @@ import io.netty.buffer.ByteBuf;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.config.SingleSender;
-import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
-import org.apache.drill.exec.physical.impl.materialize.VectorRecordMaterializer;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.record.FragmentWritableBatch;
@@ -48,10 +45,12 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
     return new SingleSenderRootExec(context, children.iterator().next(), config);
   }
 
-
-
   private static class SingleSenderRootExec extends BaseRootExec {
     static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SingleSenderRootExec.class);
+
+    private final SendingAccountor sendCount = new SendingAccountor();
+    private final FragmentHandle oppositeHandle;
+
     private RecordBatch incoming;
     private DataTunnel tunnel;
     private FragmentHandle handle;
@@ -60,7 +59,6 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
     private FragmentContext context;
     private volatile boolean ok = true;
     private volatile boolean done = false;
-    private final SendingAccountor sendCount = new SendingAccountor();
 
     public enum Metric implements MetricDef {
       BYTES_SENT;
@@ -79,8 +77,12 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
       this.handle = context.getHandle();
       this.config = config;
       this.recMajor = config.getOppositeMajorFragmentId();
-      FragmentHandle opposite = handle.toBuilder().setMajorFragmentId(config.getOppositeMajorFragmentId()).setMinorFragmentId(0).build();
       this.tunnel = context.getDataTunnel(config.getDestination());
+      oppositeHandle = handle.toBuilder()
+          .setMajorFragmentId(config.getOppositeMajorFragmentId())
+          .setMinorFragmentId(config.getOppositeMinorFragmentId())
+          .build();
+      tunnel = context.getDataTunnel(config.getDestination());
       this.context = context;
     }
 
@@ -103,8 +105,9 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
       switch (out) {
       case STOP:
       case NONE:
-        FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLastWithSchema(handle.getQueryId(), handle.getMajorFragmentId(),
-                handle.getMinorFragmentId(), recMajor, 0, incoming.getSchema());
+        FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLastWithSchema(handle.getQueryId(),
+            handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, oppositeHandle.getMinorFragmentId(),
+            incoming.getSchema());
         sendCount.increment();
         stats.startWait();
         try {
@@ -117,7 +120,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
       case OK_NEW_SCHEMA:
       case OK:
         FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(),
-                handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
+                handle.getMinorFragmentId(), recMajor, oppositeHandle.getMinorFragmentId(), incoming.getWritableBatch());
         updateStats(batch);
         sendCount.increment();
         stats.startWait();

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
index c255033..d17fdd4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
@@ -21,18 +21,16 @@ import io.netty.buffer.ByteBuf;
 
 import java.util.List;
 
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
 import org.apache.drill.exec.physical.config.BroadcastSender;
 import org.apache.drill.exec.physical.impl.BaseRootExec;
 import org.apache.drill.exec.physical.impl.SendingAccountor;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.GeneralRPCProtos;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.record.FragmentWritableBatch;
@@ -52,9 +50,9 @@ import com.google.common.collect.ArrayListMultimap;
  */
 public class BroadcastSenderRootExec extends BaseRootExec {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BroadcastSenderRootExec.class);
+  private final StatusHandler statusHandler = new StatusHandler();
   private final FragmentContext context;
   private final BroadcastSender config;
-
   private final int[][] receivingMinorFragments;
   private final DataTunnel[] tunnels;
   private final ExecProtos.FragmentHandle handle;
@@ -79,11 +77,11 @@ public class BroadcastSenderRootExec extends BaseRootExec {
     this.incoming = incoming;
     this.config = config;
     this.handle = context.getHandle();
-    List<DrillbitEndpoint> destinations = config.getDestinations();
+    List<MinorFragmentEndpoint> destinations = config.getDestinations();
     ArrayListMultimap<DrillbitEndpoint, Integer> dests = ArrayListMultimap.create();
 
-    for(int i = 0; i < destinations.size(); ++i) {
-      dests.put(destinations.get(i), i);
+    for(MinorFragmentEndpoint destination : destinations) {
+      dests.put(destination.getEndpoint(), destination.getId());
     }
 
     int destCount = dests.keySet().size();
@@ -102,12 +100,8 @@ public class BroadcastSenderRootExec extends BaseRootExec {
       tunnels[i] = context.getDataTunnel(ep);
       i++;
     }
-
-
   }
 
-
-
   @Override
   public boolean innerNext() {
     if(!ok) {
@@ -121,7 +115,12 @@ public class BroadcastSenderRootExec extends BaseRootExec {
       case STOP:
       case NONE:
         for (int i = 0; i < tunnels.length; ++i) {
-          FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLast(handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), receivingMinorFragments[i]);
+          FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLast(
+              handle.getQueryId(),
+              handle.getMajorFragmentId(),
+              handle.getMinorFragmentId(),
+              config.getOppositeMajorFragmentId(),
+              receivingMinorFragments[i]);
           stats.startWait();
           try {
             tunnels[i].sendRecordBatch(this.statusHandler, b2);
@@ -129,9 +128,7 @@ public class BroadcastSenderRootExec extends BaseRootExec {
           } finally {
             stats.stopWait();
           }
-
         }
-
         return false;
 
       case OK_NEW_SCHEMA:
@@ -141,7 +138,14 @@ public class BroadcastSenderRootExec extends BaseRootExec {
           writableBatch.retainBuffers(tunnels.length - 1);
         }
         for (int i = 0; i < tunnels.length; ++i) {
-          FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), receivingMinorFragments[i], writableBatch);
+          FragmentWritableBatch batch = new FragmentWritableBatch(
+              false,
+              handle.getQueryId(),
+              handle.getMajorFragmentId(),
+              handle.getMinorFragmentId(),
+              config.getOppositeMajorFragmentId(),
+              receivingMinorFragments[i],
+              writableBatch);
           updateStats(batch);
           stats.startWait();
           try {
@@ -173,7 +177,6 @@ public class BroadcastSenderRootExec extends BaseRootExec {
       incoming.cleanup();
   }
 
-  private StatusHandler statusHandler = new StatusHandler();
   private class StatusHandler extends BaseRpcOutcomeListener<GeneralRPCProtos.Ack> {
     volatile RpcException ex;
     private final SendingAccountor sendCount = new SendingAccountor();
@@ -193,5 +196,4 @@ public class BroadcastSenderRootExec extends BaseRootExec {
       this.ex = ex;
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 804671e..e230fd2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -46,6 +46,7 @@ import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
 import org.apache.drill.exec.physical.config.MergingReceiverPOP;
 import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
@@ -488,15 +489,15 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
             .setMajorFragmentId(config.getOppositeMajorFragmentId())
             .setQueryId(context.getHandle().getQueryId())
             .build();
-    for (int i = 0; i < config.getNumSenders(); i++) {
+    for (MinorFragmentEndpoint providingEndpoint : config.getProvidingEndpoints()) {
       FragmentHandle sender = FragmentHandle.newBuilder(handlePrototype)
-              .setMinorFragmentId(i)
+              .setMinorFragmentId(providingEndpoint.getId())
               .build();
       FinishedReceiver finishedReceiver = FinishedReceiver.newBuilder()
               .setReceiver(context.getHandle())
               .setSender(sender)
               .build();
-      context.getControlTunnel(config.getProvidingEndpoints().get(i)).informReceiverFinished(new OutcomeListener(), finishedReceiver);
+      context.getControlTunnel(providingEndpoint.getEndpoint()).informReceiverFinished(new OutcomeListener(), finishedReceiver);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 200e78e..ccbd289 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -34,10 +34,10 @@ import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
 import org.apache.drill.exec.physical.impl.BaseRootExec;
 import org.apache.drill.exec.physical.impl.SendingAccountor;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.FragmentWritableBatch;
@@ -268,17 +268,16 @@ public class PartitionSenderRootExec extends BaseRootExec {
 
   public void sendEmptyBatch(boolean isLast) {
     FragmentHandle handle = context.getHandle();
-    int fieldId = 0;
     StatusHandler statusHandler = new StatusHandler(sendCount, context);
-    for (DrillbitEndpoint endpoint : popConfig.getDestinations()) {
-      DataTunnel tunnel = context.getDataTunnel(endpoint);
+    for (MinorFragmentEndpoint destination : popConfig.getDestinations()) {
+      DataTunnel tunnel = context.getDataTunnel(destination.getEndpoint());
       FragmentWritableBatch writableBatch = FragmentWritableBatch.getEmptyBatchWithSchema(
           isLast,
           handle.getQueryId(),
           handle.getMajorFragmentId(),
           handle.getMinorFragmentId(),
           operator.getOppositeMajorFragmentId(),
-          fieldId,
+          destination.getId(),
           incoming.getSchema());
       stats.startWait();
       try {
@@ -287,7 +286,6 @@ public class PartitionSenderRootExec extends BaseRootExec {
         stats.stopWait();
       }
       sendCount.increment();
-      fieldId++;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index 79076cf..1d9088a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -31,10 +31,10 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
 import org.apache.drill.exec.physical.impl.SendingAccountor;
 import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec.Metric;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -93,12 +93,12 @@ public abstract class PartitionerTemplate implements Partitioner {
       outgoingRecordBatchSize = (DEFAULT_RECORD_BATCH_SIZE + 1)/2 - 1;
     }
 
-    int fieldId = 0;
-    for (DrillbitEndpoint endpoint : popConfig.getDestinations()) {
-      FragmentHandle opposite = context.getHandle().toBuilder().setMajorFragmentId(popConfig.getOppositeMajorFragmentId()).setMinorFragmentId(fieldId).build();
+    for (MinorFragmentEndpoint destination : popConfig.getDestinations()) {
+      FragmentHandle opposite = context.getHandle().toBuilder()
+          .setMajorFragmentId(popConfig.getOppositeMajorFragmentId())
+          .setMinorFragmentId(destination.getId()).build();
       outgoingBatches.add(new OutgoingRecordBatch(stats, sendingAccountor, popConfig,
-          context.getDataTunnel(endpoint), context, oContext.getAllocator(), fieldId, statusHandler));
-      fieldId++;
+          context.getDataTunnel(destination.getEndpoint()), context, oContext.getAllocator(), destination.getId(), statusHandler));
     }
 
     for (OutgoingRecordBatch outgoingRecordBatch : outgoingBatches) {

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index 52b892e..389d668 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -30,6 +30,7 @@ import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.ops.OpProfileDef;
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
 import org.apache.drill.exec.physical.config.UnorderedReceiver;
 import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
@@ -209,15 +210,15 @@ public class UnorderedReceiverBatch implements RecordBatch {
             .setMajorFragmentId(config.getOppositeMajorFragmentId())
             .setQueryId(context.getHandle().getQueryId())
             .build();
-    for (int i = 0; i < config.getNumSenders(); i++) {
+    for (MinorFragmentEndpoint providingEndpoint : config.getProvidingEndpoints()) {
       FragmentHandle sender = FragmentHandle.newBuilder(handlePrototype)
-              .setMinorFragmentId(i)
+              .setMinorFragmentId(providingEndpoint.getId())
               .build();
       FinishedReceiver finishedReceiver = FinishedReceiver.newBuilder()
               .setReceiver(context.getHandle())
               .setSender(sender)
               .build();
-      context.getControlTunnel(config.getProvidingEndpoints().get(i)).informReceiverFinished(new OutcomeListener(), finishedReceiver);
+      context.getControlTunnel(providingEndpoint.getEndpoint()).informReceiverFinished(new OutcomeListener(), finishedReceiver);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java
index ac63bde..2436a0e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java
@@ -30,22 +30,26 @@ public class Fragment implements Iterable<Fragment.ExchangeFragmentPair> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Fragment.class);
 
   private PhysicalOperator root;
-  private Exchange sendingExchange;
+  private ExchangeFragmentPair sendingExchange;
   private final List<ExchangeFragmentPair> receivingExchangePairs = Lists.newLinkedList();
-  private Stats stats = new Stats();
 
+  /**
+   * Set the given operator as root operator of this fragment. If root operator is already set,
+   * then this method call is a no-op.
+   * @param o new root operator
+   */
   public void addOperator(PhysicalOperator o) {
     if (root == null) {
       root = o;
     }
   }
 
-  public void addSendExchange(Exchange e) throws ForemanSetupException{
+  public void addSendExchange(Exchange e, Fragment sendingToFragment) throws ForemanSetupException{
     if (sendingExchange != null) {
       throw new ForemanSetupException("Fragment was trying to add a second SendExchange.  ");
     }
     addOperator(e);
-    sendingExchange = e;
+    sendingExchange = new ExchangeFragmentPair(e, sendingToFragment);
   }
 
   public void addReceiveExchange(Exchange e, Fragment fragment) {
@@ -66,6 +70,14 @@ public class Fragment implements Iterable<Fragment.ExchangeFragmentPair> {
   }
 
   public Exchange getSendingExchange() {
+    if (sendingExchange != null) {
+      return sendingExchange.exchange;
+    }
+
+    return null;
+  }
+
+  public ExchangeFragmentPair getSendingExchangePair() {
     return sendingExchange;
   }
 
@@ -73,10 +85,6 @@ public class Fragment implements Iterable<Fragment.ExchangeFragmentPair> {
 //    return visitor.visit(this, extra);
 //  }
 
-  public Stats getStats() {
-    return stats;
-  }
-
   public class ExchangeFragmentPair {
     private Exchange exchange;
     private Fragment node;
@@ -117,8 +125,7 @@ public class Fragment implements Iterable<Fragment.ExchangeFragmentPair> {
     int result = 1;
     result = prime * result + ((receivingExchangePairs == null) ? 0 : receivingExchangePairs.hashCode());
     result = prime * result + ((root == null) ? 0 : root.hashCode());
-    result = prime * result + ((sendingExchange == null) ? 0 : sendingExchange.hashCode());
-    result = prime * result + ((stats == null) ? 0 : stats.hashCode());
+    result = prime * result + ((sendingExchange == null) ? 0 : sendingExchange.getExchange().hashCode());
     return result;
   }
 
@@ -155,20 +162,14 @@ public class Fragment implements Iterable<Fragment.ExchangeFragmentPair> {
     } else if (!sendingExchange.equals(other.sendingExchange)) {
       return false;
     }
-    if (stats == null) {
-      if (other.stats != null) {
-        return false;
-      }
-    } else if (!stats.equals(other.stats)) {
-      return false;
-    }
+
     return true;
   }
 
   @Override
   public String toString() {
     return "FragmentNode [root=" + root + ", sendingExchange=" + sendingExchange + ", receivingExchangePairs="
-        + receivingExchangePairs + ", stats=" + stats + "]";
+        + receivingExchangePairs + "]";
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java
index 8756e5b..0271692 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java
@@ -20,7 +20,6 @@ package org.apache.drill.exec.planner.fragment;
 import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
 import org.apache.drill.exec.physical.base.Exchange;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.SubScan;
 import org.apache.drill.exec.work.foreman.ForemanSetupException;
 
 /**
@@ -29,8 +28,9 @@ import org.apache.drill.exec.work.foreman.ForemanSetupException;
 public class MakeFragmentsVisitor extends AbstractPhysicalVisitor<Fragment, Fragment, ForemanSetupException> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MakeFragmentsVisitor.class);
 
+  public final static MakeFragmentsVisitor INSTANCE = new MakeFragmentsVisitor();
 
-  public MakeFragmentsVisitor() {
+  private MakeFragmentsVisitor() {
   }
 
   @Override
@@ -41,18 +41,12 @@ public class MakeFragmentsVisitor extends AbstractPhysicalVisitor<Fragment, Frag
     }
     Fragment next = getNextBuilder();
     value.addReceiveExchange(exchange, next);
-    next.addSendExchange(exchange);
+    next.addSendExchange(exchange, value);
     exchange.getChild().accept(this, next);
     return value;
   }
 
   @Override
-  public Fragment visitSubScan(SubScan subScan, Fragment value) throws ForemanSetupException {
-    // TODO - implement this
-    return super.visitOp(subScan, value);
-  }
-
-  @Override
   public Fragment visitOp(PhysicalOperator op, Fragment value)  throws ForemanSetupException{
 //    logger.debug("Visiting Other {}", op);
     value = ensureBuilder(value);

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
index 961b603..9b0944e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
@@ -35,6 +35,10 @@ import com.google.common.collect.Lists;
 public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Materializer.IndexedFragmentNode, ExecutionSetupException>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Materializer.class);
 
+  public static final Materializer INSTANCE = new Materializer();
+
+  private Materializer() {
+  }
 
   @Override
   public PhysicalOperator visitExchange(Exchange exchange, IndexedFragmentNode iNode) throws ExecutionSetupException {

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java
new file mode 100644
index 0000000..75a009e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.fragment;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Captures parallelization parameters for a given operator/fragments. It consists of min and max width of
+ * parallelization and affinity to drillbit endpoints.
+ */
+public class ParallelizationInfo {
+
+  /* Default parallelization width is [1, Integer.MAX_VALUE] and no endpoint affinity. */
+  public static final ParallelizationInfo UNLIMITED_WIDTH_NO_ENDPOINT_AFFINITY =
+      ParallelizationInfo.create(1, Integer.MAX_VALUE);
+
+  private final Map<DrillbitEndpoint, EndpointAffinity> affinityMap;
+  private final int minWidth;
+  private final int maxWidth;
+
+  private ParallelizationInfo(int minWidth, int maxWidth, Map<DrillbitEndpoint, EndpointAffinity> affinityMap) {
+    this.minWidth = minWidth;
+    this.maxWidth = maxWidth;
+    this.affinityMap = ImmutableMap.copyOf(affinityMap);
+  }
+
+  public static ParallelizationInfo create(int minWidth, int maxWidth) {
+    return create(minWidth, maxWidth, ImmutableList.<EndpointAffinity>of());
+  }
+
+  public static ParallelizationInfo create(int minWidth, int maxWidth, List<EndpointAffinity> endpointAffinities) {
+    Map<DrillbitEndpoint, EndpointAffinity> affinityMap = Maps.newHashMap();
+
+    for(EndpointAffinity epAffinity : endpointAffinities) {
+      affinityMap.put(epAffinity.getEndpoint(), epAffinity);
+    }
+
+    return new ParallelizationInfo(minWidth, maxWidth, affinityMap);
+  }
+
+  public int getMinWidth() {
+    return minWidth;
+  }
+
+  public int getMaxWidth() {
+    return maxWidth;
+  }
+
+  public Map<DrillbitEndpoint, EndpointAffinity> getEndpointAffinityMap() {
+    return affinityMap;
+  }
+
+  @Override
+  public String toString() {
+    return getDigest(minWidth, maxWidth, affinityMap);
+  }
+
+  private static String getDigest(int minWidth, int maxWidth, Map<DrillbitEndpoint, EndpointAffinity> affinityMap) {
+    StringBuilder sb = new StringBuilder();
+    sb.append(String.format("[minWidth = %d, maxWidth=%d, epAff=[", minWidth, maxWidth));
+    sb.append(Joiner.on(",").join(affinityMap.values()));
+    sb.append("]]");
+
+    return sb.toString();
+  }
+
+  /**
+   * Collects/merges one or more ParallelizationInfo instances.
+   */
+  public static class ParallelizationInfoCollector {
+    private int minWidth = 1;
+    private int maxWidth = Integer.MAX_VALUE;
+    private final Map<DrillbitEndpoint, EndpointAffinity> affinityMap = Maps.newHashMap();
+
+    public void add(ParallelizationInfo parallelizationInfo) {
+      this.minWidth = Math.max(minWidth, parallelizationInfo.minWidth);
+      this.maxWidth = Math.min(maxWidth, parallelizationInfo.maxWidth);
+
+      Map<DrillbitEndpoint, EndpointAffinity> affinityMap = parallelizationInfo.getEndpointAffinityMap();
+      for(Map.Entry<DrillbitEndpoint, EndpointAffinity> epAff : affinityMap.entrySet()) {
+        addEndpointAffinity(epAff.getValue());
+      }
+    }
+
+    public void addMaxWidth(int newMaxWidth) {
+      this.maxWidth = Math.min(maxWidth, newMaxWidth);
+    }
+
+    public void addEndpointAffinities(List<EndpointAffinity> endpointAffinities) {
+      for(EndpointAffinity epAff : endpointAffinities) {
+        addEndpointAffinity(epAff);
+      }
+    }
+
+    // Helper method to add the given EndpointAffinity to the global affinity map
+    private void addEndpointAffinity(EndpointAffinity epAff) {
+      if (affinityMap.containsKey(epAff.getEndpoint())) {
+        affinityMap.get(epAff.getEndpoint()).addAffinity(epAff.getAffinity());
+      } else {
+        affinityMap.put(epAff.getEndpoint(), epAff);
+      }
+    }
+
+    /**
+     * Get a ParallelizationInfo instance based on the current state of collected info.
+     */
+    public ParallelizationInfo get() {
+      return new ParallelizationInfo(minWidth, maxWidth, affinityMap);
+    }
+
+    @Override
+    public String toString() {
+      return getDigest(minWidth, maxWidth, affinityMap);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java
index 8cc6c85..3e0f35a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java
@@ -27,21 +27,10 @@ import com.google.common.collect.Maps;
 public class PlanningSet implements Iterable<Wrapper> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PlanningSet.class);
 
-  private Map<Fragment, Wrapper> fragmentMap = Maps.newHashMap();
+  private final Map<Fragment, Wrapper> fragmentMap = Maps.newHashMap();
   private int majorFragmentIdIndex = 0;
 
-  PlanningSet() {
-  }
-
-  public void addAffinity(Fragment n, DrillbitEndpoint endpoint, float affinity) {
-    get(n).addEndpointAffinity(endpoint, affinity);
-  }
-
-  public void setWidth(Fragment n, int width) {
-    get(n).setWidth(width);
-  }
-
-  Wrapper get(Fragment node) {
+  public Wrapper get(Fragment node) {
     Wrapper wrapper = fragmentMap.get(node);
     if (wrapper == null) {
 

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
index 0ece367..f8d1803 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
@@ -18,17 +18,30 @@
 package org.apache.drill.exec.planner.fragment;
 
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Ordering;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.util.DrillStringUtils;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.expr.fn.impl.DateUtility;
 import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.physical.EndpointAffinity;
 import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
+import org.apache.drill.exec.physical.base.Exchange.ParallelizationDependency;
 import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair;
 import org.apache.drill.exec.planner.fragment.Materializer.IndexedFragmentNode;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
@@ -36,8 +49,8 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.options.OptionList;
+import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.work.QueryWorkUnit;
-import org.apache.drill.exec.work.foreman.ForemanException;
 import org.apache.drill.exec.work.foreman.ForemanSetupException;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -51,22 +64,28 @@ import com.google.common.collect.Lists;
  * is done based on round robin assignment ordered by operator affinity (locality) to available execution Drillbits.
  */
 public class SimpleParallelizer {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleParallelizer.class);
 
+  private static final Ordering<EndpointAffinity> ENDPOINT_AFFINITY_ORDERING = Ordering.from(new Comparator<EndpointAffinity>() {
+    @Override
+    public int compare(EndpointAffinity o1, EndpointAffinity o2) {
+      // Sort in descending order of affinity values
+      return Double.compare(o2.getAffinity(), o1.getAffinity());
+    }
+  });
 
-
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleParallelizer.class);
-  private final Materializer materializer = new Materializer();
   private final long parallelizationThreshold;
   private final int maxWidthPerNode;
   private final int maxGlobalWidth;
-  private double affinityFactor;
+  private final double affinityFactor;
 
   public SimpleParallelizer(QueryContext context) {
-    long sliceTarget = context.getOptions().getOption(ExecConstants.SLICE_TARGET).num_val;
+    OptionManager optionManager = context.getOptions();
+    long sliceTarget = optionManager.getOption(ExecConstants.SLICE_TARGET).num_val;
     this.parallelizationThreshold = sliceTarget > 0 ? sliceTarget : 1;
-    this.maxWidthPerNode = context.getOptions().getOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY).num_val.intValue();
-    this.maxGlobalWidth = context.getOptions().getOption(ExecConstants.MAX_WIDTH_GLOBAL_KEY).num_val.intValue();
-    this.affinityFactor = context.getOptions().getOption(ExecConstants.AFFINITY_FACTOR_KEY).float_val.intValue();
+    this.maxWidthPerNode = optionManager.getOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY).num_val.intValue();
+    this.maxGlobalWidth = optionManager.getOption(ExecConstants.MAX_WIDTH_GLOBAL_KEY).num_val.intValue();
+    this.affinityFactor = optionManager.getOption(ExecConstants.AFFINITY_FACTOR_KEY).float_val.intValue();
   }
 
   public SimpleParallelizer(long parallelizationThreshold, int maxWidthPerNode, int maxGlobalWidth, double affinityFactor) {
@@ -78,27 +97,230 @@ public class SimpleParallelizer {
 
 
   /**
-   * Generate a set of assigned fragments based on the provided planningSet. Do not allow parallelization stages to go
-   * beyond the global max width.
+   * Generate a set of assigned fragments based on the provided fragment tree. Do not allow parallelization stages
+   * to go beyond the global max width.
    *
+   * @param options         Option list
    * @param foremanNode     The driving/foreman node for this query.  (this node)
    * @param queryId         The queryId for this query.
    * @param activeEndpoints The list of endpoints to consider for inclusion in planning this query.
    * @param reader          Tool used to read JSON plans
-   * @param rootNode        The root node of the PhysicalPlan that we will parallelizing.
-   * @param planningSet     The set of queries with collected statistics that we'll work with.
+   * @param rootFragment    The root node of the PhysicalPlan that we will be parallelizing.
+   * @param session         UserSession of user who launched this query.
    * @return The list of generated PlanFragment protobuf objects to be assigned out to the individual nodes.
-   * @throws ForemanException
+   * @throws ExecutionSetupException
+   */
+  public QueryWorkUnit getFragments(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId,
+      Collection<DrillbitEndpoint> activeEndpoints, PhysicalPlanReader reader, Fragment rootFragment,
+      UserSession session) throws ExecutionSetupException {
+
+    final PlanningSet planningSet = new PlanningSet();
+
+    initFragmentWrappers(rootFragment, planningSet);
+
+    final Set<Wrapper> leafFragments = constructFragmentDependencyGraph(planningSet);
+
+    // Start parallelizing from leaf fragments
+    for (Wrapper wrapper : leafFragments) {
+      parallelizeFragment(wrapper, planningSet, activeEndpoints);
+    }
+
+    return generateWorkUnit(options, foremanNode, queryId, reader, rootFragment, planningSet, session);
+  }
+
+  // For every fragment, create a Wrapper in PlanningSet.
+  @VisibleForTesting
+  public void initFragmentWrappers(Fragment rootFragment, PlanningSet planningSet) {
+    planningSet.get(rootFragment);
+
+    for(ExchangeFragmentPair fragmentPair : rootFragment) {
+      initFragmentWrappers(fragmentPair.getNode(), planningSet);
+    }
+  }
+
+  /**
+   * Based on the affinity of the Exchange that separates two fragments, setup fragment dependencies.
+   *
+   * @param planningSet
+   * @return Returns a list of leaf fragments in fragment dependency graph.
    */
-  public QueryWorkUnit getFragments(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId, Collection<DrillbitEndpoint> activeEndpoints,
-      PhysicalPlanReader reader, Fragment rootNode, PlanningSet planningSet, UserSession session) throws ExecutionSetupException {
-    assignEndpoints(activeEndpoints, planningSet);
-    return generateWorkUnit(options, foremanNode, queryId, reader, rootNode, planningSet, session);
+  private Set<Wrapper> constructFragmentDependencyGraph(PlanningSet planningSet) {
+
+    // Set up dependency of fragments based on the affinity of exchange that separates the fragments.
+    for(Wrapper currentFragmentWrapper : planningSet) {
+      ExchangeFragmentPair sendingExchange = currentFragmentWrapper.getNode().getSendingExchangePair();
+      if (sendingExchange != null) {
+        ParallelizationDependency dependency = sendingExchange.getExchange().getParallelizationDependency();
+        Wrapper receivingFragmentWrapper = planningSet.get(sendingExchange.getNode());
+
+        if (dependency == ParallelizationDependency.RECEIVER_DEPENDS_ON_SENDER) {
+          receivingFragmentWrapper.addFragmentDependency(currentFragmentWrapper);
+        } else if (dependency == ParallelizationDependency.SENDER_DEPENDS_ON_RECEIVER) {
+          currentFragmentWrapper.addFragmentDependency(receivingFragmentWrapper);
+        }
+      }
+    }
+
+    // Identify leaf fragments. Leaf fragments are fragments that have no other fragments depending on them for
+    // parallelization info. First assume all fragments are leaf fragments. Go through the fragments one by one and
+    // remove the fragment on which the current fragment depends on.
+    final Set<Wrapper> roots = Sets.newHashSet();
+    for(Wrapper w : planningSet) {
+      roots.add(w);
+    }
+
+    for(Wrapper wrapper : planningSet) {
+      final List<Wrapper> fragmentDependencies = wrapper.getFragmentDependencies();
+      if (fragmentDependencies != null && fragmentDependencies.size() > 0) {
+        for(Wrapper dependency : fragmentDependencies) {
+          if (roots.contains(dependency)) {
+            roots.remove(dependency);
+          }
+        }
+      }
+    }
+
+    return roots;
   }
 
-  private QueryWorkUnit generateWorkUnit(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId, PhysicalPlanReader reader, Fragment rootNode,
-                                         PlanningSet planningSet, UserSession session) throws ExecutionSetupException {
+  /**
+   * Helper method for parallelizing a given fragment. Dependent fragments are parallelized first before
+   * parallelizing the given fragment.
+   */
+  private void parallelizeFragment(Wrapper fragmentWrapper, PlanningSet planningSet,
+      Collection<DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException {
+    // If the fragment is already parallelized, return.
+    if (fragmentWrapper.isEndpointsAssignmentDone()) {
+      return;
+    }
+
+    // First parallelize fragments on which this fragment depends on.
+    final List<Wrapper> fragmentDependencies = fragmentWrapper.getFragmentDependencies();
+    if (fragmentDependencies != null && fragmentDependencies.size() > 0) {
+      for(Wrapper dependency : fragmentDependencies) {
+        parallelizeFragment(dependency, planningSet, activeEndpoints);
+      }
+    }
+
+    Fragment fragment = fragmentWrapper.getNode();
+
+    // Step 1: Find stats. Stats include various factors including cost of physical operators, parallelizability of
+    // work in physical operator and affinity of physical operator to certain nodes.
+    fragment.getRoot().accept(new StatsCollector(planningSet), fragmentWrapper);
+
+    // Step 2: Find the parallelization width of fragment
+
+    final Stats stats = fragmentWrapper.getStats();
+    final ParallelizationInfo parallelizationInfo = stats.getParallelizationInfo();
+
+    // 2.1 Use max cost of all operators in this fragment; this is consistent with the
+    //     calculation that ExcessiveExchangeRemover uses
+    // 2.1. Find the parallelization based on cost
+    int width = (int) Math.ceil(stats.getMaxCost() / parallelizationThreshold);
+
+    // 2.2. Cap the parallelization width by fragment level width limit and system level per query width limit
+    width = Math.min(width, Math.min(parallelizationInfo.getMaxWidth(), maxGlobalWidth));
+
+    // 2.3. Cap the parallelization width by system level per node width limit
+    width = Math.min(width, maxWidthPerNode * activeEndpoints.size());
 
+    // 2.4. Make sure width is at least the min width enforced by operators
+    width = Math.max(parallelizationInfo.getMinWidth(), width);
+
+    // 2.4. Make sure width is at most the max width enforced by operators
+    width = Math.min(parallelizationInfo.getMaxWidth(), width);
+
+    // 2.5 Finally make sure the width is at least one
+    width = Math.max(1, width);
+
+    fragmentWrapper.setWidth(width);
+
+    List<DrillbitEndpoint> assignedEndpoints = findEndpoints(activeEndpoints,
+        parallelizationInfo.getEndpointAffinityMap(), fragmentWrapper.getWidth());
+    fragmentWrapper.assignEndpoints(assignedEndpoints);
+  }
+
+  // Assign endpoints based on the given endpoint list, affinity map and width.
+  private List<DrillbitEndpoint> findEndpoints(Collection<DrillbitEndpoint> activeEndpoints,
+      Map<DrillbitEndpoint, EndpointAffinity> endpointAffinityMap, final int width)
+      throws PhysicalOperatorSetupException {
+
+    final List<DrillbitEndpoint> endpoints = Lists.newArrayList();
+
+    if (endpointAffinityMap.size() > 0) {
+      // Get EndpointAffinity list sorted in descending order of affinity values
+      List<EndpointAffinity> sortedAffinityList = ENDPOINT_AFFINITY_ORDERING.immutableSortedCopy(endpointAffinityMap.values());
+
+      // Find the number of mandatory nodes (nodes with +infinity affinity).
+      int numRequiredNodes = 0;
+      for(EndpointAffinity ep : sortedAffinityList) {
+        if (ep.isAssignmentRequired()) {
+          numRequiredNodes++;
+        } else {
+          // As the list is sorted in descending order of affinities, we don't need to go beyond the first occurrance
+          // of non-mandatory node
+          break;
+        }
+      }
+
+      if (width < numRequiredNodes) {
+        throw new PhysicalOperatorSetupException("Can not parallelize the fragment as the parallelization width is " +
+            "less than the number of mandatory nodes (nodes with +INFINITE affinity).");
+      }
+
+      // Find the maximum number of slots which should go to endpoints with affinity (See DRILL-825 for details)
+      int affinedSlots =
+          Math.max(1, (int) (affinityFactor * width / activeEndpoints.size())) * sortedAffinityList.size();
+
+      // Make sure affined slots is at least the number of mandatory nodes
+      affinedSlots = Math.max(affinedSlots, numRequiredNodes);
+
+      // Cap the affined slots to max parallelization width
+      affinedSlots = Math.min(affinedSlots, width);
+
+      Iterator<EndpointAffinity> affinedEPItr = Iterators.cycle(sortedAffinityList);
+
+      // Keep adding until we have selected "affinedSlots" number of endpoints.
+      while(endpoints.size() < affinedSlots) {
+        EndpointAffinity ea = affinedEPItr.next();
+        endpoints.add(ea.getEndpoint());
+      }
+    }
+
+    // add remaining endpoints if required
+    if (endpoints.size() < width) {
+      // Get a list of endpoints that are not part of the affinity endpoint list
+      List<DrillbitEndpoint> endpointsWithNoAffinity;
+      final Set<DrillbitEndpoint> endpointsWithAffinity = endpointAffinityMap.keySet();
+
+      if (endpointAffinityMap.size() > 0) {
+        endpointsWithNoAffinity = Lists.newArrayList();
+        for (DrillbitEndpoint ep : activeEndpoints) {
+          if (!endpointsWithAffinity.contains(ep)) {
+            endpointsWithNoAffinity.add(ep);
+          }
+        }
+      } else {
+        endpointsWithNoAffinity = Lists.newArrayList(activeEndpoints); // Need to create a copy instead of an
+        // immutable copy, because we need to shuffle the list (next statement) and Collections.shuffle() doesn't
+        // support immutable copy as input.
+      }
+
+      // round robin with random start.
+      Collections.shuffle(endpointsWithNoAffinity, ThreadLocalRandom.current());
+      Iterator<DrillbitEndpoint> otherEPItr =
+          Iterators.cycle(endpointsWithNoAffinity.size() > 0 ? endpointsWithNoAffinity : endpointsWithAffinity);
+      while (endpoints.size() < width) {
+        endpoints.add(otherEPItr.next());
+      }
+    }
+
+    return endpoints;
+  }
+
+  private QueryWorkUnit generateWorkUnit(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId,
+      PhysicalPlanReader reader, Fragment rootNode, PlanningSet planningSet,
+      UserSession session) throws ExecutionSetupException {
     List<PlanFragment> fragments = Lists.newArrayList();
 
     PlanFragment rootFragment = null;
@@ -111,14 +333,12 @@ public class SimpleParallelizer {
     // assigned before we can materialize, so we start a new loop here rather than utilizing the previous one.
     for (Wrapper wrapper : planningSet) {
       Fragment node = wrapper.getNode();
-      Stats stats = node.getStats();
       final PhysicalOperator physicalOperatorRoot = node.getRoot();
       boolean isRootNode = rootNode == node;
 
       if (isRootNode && wrapper.getWidth() != 1) {
-        throw new ForemanSetupException(
-            String.format(
-                "Failure while trying to setup fragment.  The root fragment must always have parallelization one.  In the current case, the width was set to %d.",
+        throw new ForemanSetupException(String.format("Failure while trying to setup fragment. " +
+                "The root fragment must always have parallelization one. In the current case, the width was set to %d.",
                 wrapper.getWidth()));
       }
       // a fragment is self driven if it doesn't rely on any other exchanges.
@@ -128,7 +348,7 @@ public class SimpleParallelizer {
       for (int minorFragmentId = 0; minorFragmentId < wrapper.getWidth(); minorFragmentId++) {
         IndexedFragmentNode iNode = new IndexedFragmentNode(minorFragmentId, wrapper);
         wrapper.resetAllocation();
-        PhysicalOperator op = physicalOperatorRoot.accept(materializer, iNode);
+        PhysicalOperator op = physicalOperatorRoot.accept(Materializer.INSTANCE, iNode);
         Preconditions.checkArgument(op instanceof FragmentRoot);
         FragmentRoot root = (FragmentRoot) op;
 
@@ -175,32 +395,4 @@ public class SimpleParallelizer {
 
     return new QueryWorkUnit(rootOperator, rootFragment, fragments);
   }
-
-  private void assignEndpoints(Collection<DrillbitEndpoint> allNodes, PlanningSet planningSet) throws PhysicalOperatorSetupException {
-    // for each node, set the width based on the parallelization threshold and cluster width.
-    for (Wrapper wrapper : planningSet) {
-
-      Stats stats = wrapper.getStats();
-
-      // Use max cost of all operators in this fragment; this is consistent with the
-      // calculation that ExcessiveExchangeRemover uses
-      double targetSlices = stats.getMaxCost()/parallelizationThreshold;
-      int targetIntSlices = (int) Math.ceil(targetSlices);
-
-      // figure out width.
-      int width = Math.min(targetIntSlices, Math.min(stats.getMaxWidth(), maxGlobalWidth));
-
-
-      width = Math.min(width, maxWidthPerNode*allNodes.size());
-
-      if (width < 1) {
-        width = 1;
-      }
-//      logger.debug("Setting width {} on fragment {}", width, wrapper);
-      wrapper.setWidth(width);
-      // figure out endpoint assignments. also informs the exchanges about their respective endpoints.
-      wrapper.assignEndpoints(allNodes, affinityFactor);
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java
index 85a7b86..e61b38f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java
@@ -18,31 +18,41 @@
 package org.apache.drill.exec.planner.fragment;
 
 
-public class Stats {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Stats.class);
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.planner.fragment.ParallelizationInfo.ParallelizationInfoCollector;
+
+import java.util.List;
 
-  private int maxWidth = Integer.MAX_VALUE;
+public class Stats {
+  private final ParallelizationInfoCollector collector = new ParallelizationInfoCollector();
   private double maxCost = 0.0;
 
-  public void addMaxWidth(int maxWidth){
-    this.maxWidth = Math.min(this.maxWidth, maxWidth);
+  public void addParallelizationInfo(ParallelizationInfo parallelizationInfo) {
+    collector.add(parallelizationInfo);
   }
 
   public void addCost(double cost){
     maxCost = Math.max(maxCost, cost);
   }
 
-  public int getMaxWidth() {
-    return maxWidth;
+  public void addMaxWidth(int maxWidth) {
+    collector.addMaxWidth(maxWidth);
+  }
+
+  public void addEndpointAffinities(List<EndpointAffinity> endpointAffinityList) {
+    collector.addEndpointAffinities(endpointAffinityList);
+  }
+
+  public ParallelizationInfo getParallelizationInfo() {
+    return collector.get();
   }
 
   @Override
   public String toString() {
-    return "Stats [maxWidth=" + maxWidth + ", maxCost=" + maxCost + "]";
+    return "Stats [maxCost=" + maxCost +", parallelizationInfo=" + collector.toString() + "]";
   }
 
   public double getMaxCost() {
     return maxCost;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
index 41ff678..1f56556 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
@@ -17,110 +17,94 @@
  */
 package org.apache.drill.exec.planner.fragment;
 
+import com.google.common.collect.Lists;
 import org.apache.drill.exec.physical.base.Exchange;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.HasAffinity;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.Store;
-import org.apache.drill.exec.physical.base.SubScan;
-import org.apache.drill.exec.physical.config.Limit;
-import org.apache.drill.exec.physical.config.WindowPOP;
 import org.apache.drill.exec.planner.AbstractOpWrapperVisitor;
 import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
-import com.google.common.base.Preconditions;
+import java.util.Collections;
+import java.util.List;
 
-public class StatsCollector {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StatsCollector.class);
-
-  private final static OpStatsCollector opStatCollector = new OpStatsCollector();
-
-  private StatsCollector() {
-  };
+/**
+ * Visitor to collect stats such as cost and parallelization info of operators within a fragment.
+ *
+ * All operators have cost associated with them, but only few type of operators such as scan,
+ * store and exchanges (both sending and receiving) have parallelization info associated with them.
+ */
+public class StatsCollector extends AbstractOpWrapperVisitor<Void, RuntimeException> {
+  private final PlanningSet planningSet;
 
-  private static void visit(PlanningSet planningSet, Fragment n) {
-    Preconditions.checkNotNull(planningSet);
-    Preconditions.checkNotNull(n);
+  public StatsCollector(final PlanningSet planningSet) {
+    this.planningSet = planningSet;
+  }
 
-    Wrapper wrapper = planningSet.get(n);
-    n.getRoot().accept(opStatCollector, wrapper);
-//    logger.debug("Set stats to {}", wrapper.getStats());
-    // receivers...
-    for (ExchangeFragmentPair child : n) {
-      // get the fragment node that feeds this node.
-      Fragment childNode = child.getNode();
-      visit(planningSet, childNode);
+  @Override
+  public Void visitSendingExchange(Exchange exchange, Wrapper wrapper) throws RuntimeException {
+    // Handle the sending side exchange
+    Wrapper receivingFragment = planningSet.get(wrapper.getNode().getSendingExchangePair().getNode());
+
+    // List to contain the endpoints where the fragment that receive data to this fragment are running.
+    List<DrillbitEndpoint> receiverEndpoints;
+    if (receivingFragment.isEndpointsAssignmentDone()) {
+      receiverEndpoints = receivingFragment.getAssignedEndpoints();
+    } else {
+      receiverEndpoints = Collections.emptyList();
     }
 
+    wrapper.getStats().addParallelizationInfo(exchange.getSenderParallelizationInfo(receiverEndpoints));
+    return visitOp(exchange, wrapper);
   }
 
-  public static PlanningSet collectStats(Fragment rootFragment) {
-    PlanningSet fps = new PlanningSet();
-    visit(fps, rootFragment);
-    return fps;
-  }
+  @Override
+  public Void visitReceivingExchange(Exchange exchange, Wrapper wrapper) throws RuntimeException {
+    // Handle the receiving side Exchange
 
-  private static class OpStatsCollector extends AbstractOpWrapperVisitor<Void, RuntimeException> {
+    final List<ExchangeFragmentPair> receivingExchangePairs = wrapper.getNode().getReceivingExchangePairs();
 
-    @Override
-    public Void visitSendingExchange(Exchange exchange, Wrapper wrapper) throws RuntimeException {
-      Stats stats = wrapper.getStats();
-      stats.addMaxWidth(exchange.getMaxSendWidth());
-      return super.visitSendingExchange(exchange, wrapper);
-    }
+    // List to contain the endpoints where the fragment that send dat to this fragment are running.
+    final List<DrillbitEndpoint> sendingEndpoints = Lists.newArrayList();
 
-    @Override
-    public Void visitReceivingExchange(Exchange exchange, Wrapper wrapper) throws RuntimeException {
-      Stats stats = wrapper.getStats();
-      stats.addMaxWidth(exchange.getMaxReceiveWidth()) ;
-      // no traversal since it would cross fragment boundary.
-      return null;
-    }
-
-    @Override
-    public Void visitGroupScan(GroupScan groupScan, Wrapper wrapper) {
-      Stats stats = wrapper.getStats();
-      stats.addMaxWidth(groupScan.getMaxParallelizationWidth());
-      return super.visitGroupScan(groupScan, wrapper);
+    for(ExchangeFragmentPair pair : receivingExchangePairs) {
+      if (pair.getExchange() == exchange) {
+        Wrapper sendingFragment = planningSet.get(pair.getNode());
+        if (sendingFragment.isEndpointsAssignmentDone()) {
+          sendingEndpoints.addAll(sendingFragment.getAssignedEndpoints());
+        }
+      }
     }
 
-    @Override
-    public Void visitSubScan(SubScan subScan, Wrapper wrapper) throws RuntimeException {
-      // TODO - implement this
-      return visitOp(subScan, wrapper);
-    }
+    wrapper.getStats().addParallelizationInfo(exchange.getReceiverParallelizationInfo(sendingEndpoints));
+    // no traversal since it would cross current fragment boundary.
+    return null;
+  }
 
-    @Override
-    public Void visitStore(Store store, Wrapper wrapper) {
-      Stats stats = wrapper.getStats();
-      stats.addMaxWidth(store.getMaxWidth());
-      return super.visitStore(store, wrapper);
-    }
+  @Override
+  public Void visitGroupScan(GroupScan groupScan, Wrapper wrapper) {
+    wrapper.getStats().addMaxWidth(groupScan.getMaxParallelizationWidth());
+    return super.visitGroupScan(groupScan, wrapper);
+  }
 
-    @Override
-    public Void visitLimit(Limit limit, Wrapper wrapper) throws RuntimeException {
-      // TODO: Implement this
-      return visitOp(limit, wrapper);
-    }
+  @Override
+  public Void visitStore(Store store, Wrapper wrapper) {
+    wrapper.getStats().addMaxWidth(store.getMaxWidth());
+    return super.visitStore(store, wrapper);
+  }
 
-    @Override
-    public Void visitOp(PhysicalOperator op, Wrapper wrapper) {
-      if(op instanceof HasAffinity){
-        wrapper.addEndpointAffinity(((HasAffinity)op).getOperatorAffinity());
-      }
-      Stats stats = wrapper.getStats();
-      stats.addCost(op.getCost());
-      for (PhysicalOperator child : op) {
-        child.accept(this, wrapper);
-      }
-      return null;
+  @Override
+  public Void visitOp(PhysicalOperator op, Wrapper wrapper) {
+    final Stats stats = wrapper.getStats();
+    if (op instanceof HasAffinity) {
+      stats.addEndpointAffinities(((HasAffinity)op).getOperatorAffinity());
     }
-
-    @Override
-    public Void visitWindowFrame(WindowPOP window, Wrapper value) throws RuntimeException {
-      return visitOp(window, value);
+    stats.addCost(op.getCost());
+    for (PhysicalOperator child : op) {
+      child.accept(this, wrapper);
     }
-
+    return null;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
index 86b395e..75b1e3b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
@@ -17,14 +17,9 @@
  */
 package org.apache.drill.exec.planner.fragment;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ThreadLocalRandom;
 
-import org.apache.drill.exec.physical.EndpointAffinity;
+import com.google.common.collect.ImmutableList;
 import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
 import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
 import org.apache.drill.exec.physical.base.Exchange;
@@ -36,9 +31,7 @@ import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 
 /**
  * A wrapping class that allows us to add additional information to each fragment node for planning purposes.
@@ -51,13 +44,15 @@ public class Wrapper {
   private int width = -1;
   private final Stats stats;
   private boolean endpointsAssigned;
-  private Map<DrillbitEndpoint, EndpointAffinity> endpointAffinityMap = Maps.newHashMap();
   private long initialAllocation = 0;
   private long maxAllocation = 0;
 
+  // List of fragments this particular fragment depends on for determining its parallelization and endpoint assignments.
+  private final List<Wrapper> fragmentDependencies = Lists.newArrayList();
+
   // a list of assigned endpoints. Technically, there could repeated endpoints in this list if we'd like to assign the
   // same fragment multiple times to the same endpoint.
-  private List<DrillbitEndpoint> endpoints = Lists.newLinkedList();
+  private final List<DrillbitEndpoint> endpoints = Lists.newLinkedList();
 
   public Wrapper(Fragment node, int majorFragmentId) {
     this.majorFragmentId = majorFragmentId;
@@ -74,25 +69,6 @@ public class Wrapper {
     maxAllocation = 0;
   }
 
-  public void addEndpointAffinity(List<EndpointAffinity> affinities){
-    Preconditions.checkState(!endpointsAssigned);
-    for(EndpointAffinity ea : affinities){
-      addEndpointAffinity(ea.getEndpoint(), ea.getAffinity());
-    }
-  }
-
-  public void addEndpointAffinity(DrillbitEndpoint endpoint, float affinity) {
-    Preconditions.checkState(!endpointsAssigned);
-    Preconditions.checkNotNull(endpoint);
-    EndpointAffinity ea = endpointAffinityMap.get(endpoint);
-    if (ea == null) {
-      ea = new EndpointAffinity(endpoint);
-      endpointAffinityMap.put(endpoint, ea);
-    }
-
-    ea.addAffinity(affinity);
-  }
-
   public int getMajorFragmentId() {
     return majorFragmentId;
   }
@@ -160,34 +136,12 @@ public class Wrapper {
 
   }
 
-  public void assignEndpoints(Collection<DrillbitEndpoint> allEndpoints, double affinityFactor) throws PhysicalOperatorSetupException {
+  public void assignEndpoints(List<DrillbitEndpoint> assignedEndpoints) throws
+      PhysicalOperatorSetupException {
     Preconditions.checkState(!endpointsAssigned);
     endpointsAssigned = true;
 
-    if (endpointAffinityMap.size() > 0) {
-      List<EndpointAffinity> affinedEPs = Lists.newArrayList(endpointAffinityMap.values());
-      // get nodes with highest affinity.
-      Collections.sort(affinedEPs);
-      Iterator<EndpointAffinity> affinedEPItr = Iterators.cycle(Lists.reverse(affinedEPs));
-      /** Maximum number of slots which should go to endpoints with affinity */
-      int affinedSlots = Math.min((Math.max(1, (int) (affinityFactor*width/allEndpoints.size())) * affinedEPs.size()), width);
-      while(endpoints.size() < affinedSlots) {
-        EndpointAffinity ea = affinedEPItr.next();
-        DrillbitEndpoint endpoint = ea.getEndpoint();
-        endpoints.add(endpoint);
-      }
-    }
-    // add other endpoints if required
-    if (endpoints.size() < width) {
-      List<DrillbitEndpoint> all = Lists.newArrayList(allEndpoints);
-      all.removeAll(endpointAffinityMap.keySet());
-      // round robin with random start.
-      Collections.shuffle(all, ThreadLocalRandom.current());
-      Iterator<DrillbitEndpoint> otherEPItr = Iterators.cycle(all.size() > 0 ? all : endpointAffinityMap.keySet());
-      while (endpoints.size() < width) {
-        endpoints.add(otherEPItr.next());
-      }
-    }
+    endpoints.addAll(assignedEndpoints);
 
     // Set scan and store endpoints.
     AssignEndpointsToScanAndStore visitor = new AssignEndpointsToScanAndStore();
@@ -209,9 +163,38 @@ public class Wrapper {
     return "FragmentWrapper [majorFragmentId=" + majorFragmentId + ", width=" + width + ", stats=" + stats + "]";
   }
 
+  public List<DrillbitEndpoint> getAssignedEndpoints() {
+    Preconditions.checkState(endpointsAssigned);
+    return ImmutableList.copyOf(endpoints);
+  }
+
   public DrillbitEndpoint getAssignedEndpoint(int minorFragmentId) {
     Preconditions.checkState(endpointsAssigned);
-    return this.endpoints.get(minorFragmentId);
+    return endpoints.get(minorFragmentId);
   }
 
+  /**
+   * Add a parallelization dependency on given fragment.
+   *
+   * @param dependsOn
+   */
+  public void addFragmentDependency(Wrapper dependsOn) {
+    fragmentDependencies.add(dependsOn);
+  }
+
+  /**
+   * Is the endpoints assignment done for this fragment?
+   * @return
+   */
+  public boolean isEndpointsAssignmentDone() {
+    return endpointsAssigned;
+  }
+
+  /**
+   * Get the list of fragements this particular fragment depends for determining its
+   * @return
+   */
+  public List<Wrapper> getFragmentDependencies() {
+    return ImmutableList.copyOf(fragmentDependencies);
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index ede0683..abbc910 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -47,6 +47,8 @@ public class PlannerSettings implements Context{
   public static final OptionValidator BROADCAST_THRESHOLD = new PositiveLongValidator("planner.broadcast_threshold", MAX_BROADCAST_THRESHOLD, 10000000);
   public static final OptionValidator BROADCAST_FACTOR = new RangeDoubleValidator("planner.broadcast_factor", 0, Double.MAX_VALUE, 1.0d);
   public static final OptionValidator JOIN_ROW_COUNT_ESTIMATE_FACTOR = new RangeDoubleValidator("planner.join.row_count_estimate_factor", 0, Double.MAX_VALUE, 1.0d);
+  public static final OptionValidator MUX_EXCHANGE = new BooleanValidator("planner.enable_mux_exchange", true);
+  public static final OptionValidator DEMUX_EXCHANGE = new BooleanValidator("planner.enable_demux_exchange", false);
   public static final OptionValidator PRODUCER_CONSUMER = new BooleanValidator("planner.add_producer_consumer", false);
   public static final OptionValidator PRODUCER_CONSUMER_QUEUE_SIZE = new LongValidator("planner.producer_consumer_queue_size", 10);
   public static final OptionValidator HASH_SINGLE_KEY = new BooleanValidator("planner.enable_hash_single_key", true);

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnorderedDeMuxExchangePrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnorderedDeMuxExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnorderedDeMuxExchangePrel.java
new file mode 100644
index 0000000..79253c7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnorderedDeMuxExchangePrel.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.planner.physical;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.UnorderedDeMuxExchange;
+import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelTraitSet;
+
+public class UnorderedDeMuxExchangePrel extends ExchangePrel {
+
+  private final List<DistributionField> fields;
+
+  public UnorderedDeMuxExchangePrel(RelOptCluster cluster, RelTraitSet traits, RelNode child, List<DistributionField> fields) {
+    super(cluster, traits, child);
+    this.fields = fields;
+  }
+
+  @Override
+  public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+    return new UnorderedDeMuxExchangePrel(getCluster(), traitSet, sole(inputs), fields);
+  }
+
+  @Override
+  public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
+    Prel child = (Prel) this.getChild();
+
+    PhysicalOperator childPOP = child.getPhysicalOperator(creator);
+
+    UnorderedDeMuxExchange p = new UnorderedDeMuxExchange(childPOP, PrelUtil.getHashExpression(this.fields, getChild().getRowType()));
+    return creator.addMetadata(this, p);
+  }
+
+  @Override
+  public SelectionVectorMode getEncoding() {
+    return SelectionVectorMode.NONE;
+  }
+
+  @Override
+  public SelectionVectorMode[] getSupportedEncodings() {
+    // DeMuxExchangePrel accepts vectors with all types SelectionVectors as input.
+    return SelectionVectorMode.ALL;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnorderedMuxExchangePrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnorderedMuxExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnorderedMuxExchangePrel.java
new file mode 100644
index 0000000..8ab05a0
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnorderedMuxExchangePrel.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.planner.physical;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.UnorderedMuxExchange;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelTraitSet;
+
+public class UnorderedMuxExchangePrel extends ExchangePrel {
+
+  public UnorderedMuxExchangePrel(RelOptCluster cluster, RelTraitSet traits, RelNode child) {
+    super(cluster, traits, child);
+  }
+
+  @Override
+  public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+    return new UnorderedMuxExchangePrel(getCluster(), traitSet, sole(inputs));
+  }
+
+  @Override
+  public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
+    Prel child = (Prel) this.getChild();
+
+    PhysicalOperator childPOP = child.getPhysicalOperator(creator);
+
+    UnorderedMuxExchange p = new UnorderedMuxExchange(childPOP);
+    return creator.addMetadata(this, p);
+  }
+
+  @Override
+  public SelectionVectorMode getEncoding() {
+    return SelectionVectorMode.NONE;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java
new file mode 100644
index 0000000..8793849
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical.visitor;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.exec.planner.physical.ExchangePrel;
+import org.apache.drill.exec.planner.physical.HashToRandomExchangePrel;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.physical.UnorderedDeMuxExchangePrel;
+import org.apache.drill.exec.planner.physical.UnorderedMuxExchangePrel;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.eigenbase.rel.RelNode;
+
+import java.util.Collections;
+import java.util.List;
+
+public class InsertLocalExchangeVisitor extends BasePrelVisitor<Prel, Void, RuntimeException> {
+
+  private final boolean isMuxEnabled;
+  private final boolean isDeMuxEnabled;
+
+  public static Prel insertLocalExchanges(Prel prel, OptionManager options) {
+    boolean isMuxEnabled = options.getOption(PlannerSettings.MUX_EXCHANGE.getOptionName()).bool_val;
+    boolean isDeMuxEnabled = options.getOption(PlannerSettings.DEMUX_EXCHANGE.getOptionName()).bool_val;
+
+    if (isMuxEnabled || isDeMuxEnabled) {
+      return prel.accept(new InsertLocalExchangeVisitor(isMuxEnabled, isDeMuxEnabled), null);
+    }
+
+    return prel;
+  }
+
+  public InsertLocalExchangeVisitor(boolean isMuxEnabled, boolean isDeMuxEnabled) {
+    this.isMuxEnabled = isMuxEnabled;
+    this.isDeMuxEnabled = isDeMuxEnabled;
+  }
+
+  @Override
+  public Prel visitExchange(ExchangePrel prel, Void value) throws RuntimeException {
+    Prel child = ((Prel)prel.getChild()).accept(this, null);
+    // Whenever we encounter a HashToRandomExchangePrel:
+    //   If MuxExchange is enabled, insert a UnorderedMuxExchangePrel before HashToRandomExchangePrel.
+    //   If DeMuxExchange is enabled, insert a UnorderedDeMuxExchangePrel after HashToRandomExchangePrel.
+    if (prel instanceof HashToRandomExchangePrel) {
+      Prel newPrel = child;
+      if (isMuxEnabled) {
+        newPrel = new UnorderedMuxExchangePrel(prel.getCluster(), prel.getTraitSet(), child);
+      }
+
+      newPrel = new HashToRandomExchangePrel(prel.getCluster(),
+          prel.getTraitSet(), newPrel, ((HashToRandomExchangePrel) prel).getFields());
+
+      if (isDeMuxEnabled) {
+        HashToRandomExchangePrel hashExchangePrel = (HashToRandomExchangePrel) newPrel;
+        // Insert a DeMuxExchange to narrow down the number of receivers
+        newPrel = new UnorderedDeMuxExchangePrel(prel.getCluster(), prel.getTraitSet(), hashExchangePrel,
+            hashExchangePrel.getFields());
+      }
+
+      return newPrel;
+    }
+
+    return (Prel)prel.copy(prel.getTraitSet(), Collections.singletonList(((RelNode)child)));
+  }
+
+  @Override
+  public Prel visitPrel(Prel prel, Void value) throws RuntimeException {
+    List<RelNode> children = Lists.newArrayList();
+    for(Prel child : prel){
+      children.add(child.accept(this, null));
+    }
+    return (Prel) prel.copy(prel.getTraitSet(), children);
+  }
+}


Mime
View raw message