Return-Path: X-Original-To: apmail-drill-commits-archive@www.apache.org Delivered-To: apmail-drill-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EBB3017A68 for ; Mon, 9 Mar 2015 08:23:48 +0000 (UTC) Received: (qmail 43646 invoked by uid 500); 9 Mar 2015 08:23:48 -0000 Delivered-To: apmail-drill-commits-archive@drill.apache.org Received: (qmail 43563 invoked by uid 500); 9 Mar 2015 08:23:48 -0000 Mailing-List: contact commits-help@drill.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: commits@drill.apache.org Delivered-To: mailing list commits@drill.apache.org Received: (qmail 43378 invoked by uid 99); 9 Mar 2015 08:23:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 09 Mar 2015 08:23:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 558C1E0FC2; Mon, 9 Mar 2015 08:23:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: venki@apache.org To: commits@drill.apache.org Date: Mon, 09 Mar 2015 08:23:53 -0000 Message-Id: <1956ebdab2bd4712ab681af14025acfc@git.apache.org> In-Reply-To: <25efca217ddd43018d7e438a402c068d@git.apache.org> References: <25efca217ddd43018d7e438a402c068d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [6/8] drill git commit: DRILL-133: LocalExchange planning and exec. http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedDeMuxExchange.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedDeMuxExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedDeMuxExchange.java new file mode 100644 index 0000000..0bc6678 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedDeMuxExchange.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.config; + +import java.util.Collections; + +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.exec.physical.MinorFragmentEndpoint; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.Receiver; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; + +/** + * UnorderedDeMuxExchange is a version of DeMuxExchange where the incoming batches are not sorted. + */ +@JsonTypeName("unordered-demux-exchange") +public class UnorderedDeMuxExchange extends AbstractDeMuxExchange { + + public UnorderedDeMuxExchange(@JsonProperty("child") PhysicalOperator child, @JsonProperty("expr") LogicalExpression expr) { + super(child, expr); + } + + @Override + public Receiver getReceiver(int minorFragmentId) { + createSenderReceiverMapping(); + + MinorFragmentEndpoint sender = receiverToSenderMapping.get(minorFragmentId); + if (sender == null) { + throw new IllegalStateException(String.format("Failed to find sender for receiver [%d]", minorFragmentId)); + } + + return new UnorderedReceiver(this.senderMajorFragmentId, Collections.singletonList(sender)); + } + + @Override + protected PhysicalOperator getNewWithChild(PhysicalOperator child) { + return new UnorderedDeMuxExchange(child, expr); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedMuxExchange.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedMuxExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedMuxExchange.java new file mode 100644 index 0000000..3028ee3 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedMuxExchange.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.config; + +import java.util.List; + +import org.apache.drill.exec.physical.MinorFragmentEndpoint; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.Receiver; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; + +/** + * UnorderedMuxExchange is a version of MuxExchange where the incoming batches are not sorted. + */ +@JsonTypeName("unordered-mux-exchange") +public class UnorderedMuxExchange extends AbstractMuxExchange { + + public UnorderedMuxExchange(@JsonProperty("child") PhysicalOperator child) { + super(child); + } + + @Override + public Receiver getReceiver(int minorFragmentId) { + createSenderReceiverMapping(); + + List senders = receiverToSenderMapping.get(minorFragmentId); + if (senders == null || senders.size() <= 0) { + throw new IllegalStateException(String.format("Failed to find senders for receiver [%d]", minorFragmentId)); + } + + return new UnorderedReceiver(senderMajorFragmentId, senders); + } + + @Override + protected PhysicalOperator getNewWithChild(PhysicalOperator child) { + return new UnorderedMuxExchange(child); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java index 3a4dd0e..e741dd4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java @@ -19,13 +19,12 @@ package org.apache.drill.exec.physical.config; import java.util.List; +import org.apache.drill.exec.physical.MinorFragmentEndpoint; import org.apache.drill.exec.physical.base.AbstractReceiver; import org.apache.drill.exec.physical.base.PhysicalVisitor; -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; @@ -33,19 +32,10 @@ import com.fasterxml.jackson.annotation.JsonTypeName; public class UnorderedReceiver extends AbstractReceiver{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnorderedReceiver.class); - private List senders; - @JsonCreator public UnorderedReceiver(@JsonProperty("sender-major-fragment") int oppositeMajorFragmentId, - @JsonProperty("senders") List senders) { - super(oppositeMajorFragmentId); - this.senders = senders; - } - - @Override - @JsonProperty("senders") - public List getProvidingEndpoints() { - return senders; + @JsonProperty("senders") List senders) { + super(oppositeMajorFragmentId, senders); } @Override @@ -62,9 +52,4 @@ public class UnorderedReceiver extends AbstractReceiver{ public int getOperatorType() { return CoreOperatorType.UNORDERED_RECEIVER_VALUE; } - - @JsonIgnore - public int getNumSenders() { - return senders.size(); - } } http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java index 812c89c..1ef7bbd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java @@ -22,14 +22,11 @@ import io.netty.buffer.ByteBuf; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.config.SingleSender; -import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch; -import org.apache.drill.exec.physical.impl.materialize.VectorRecordMaterializer; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; import org.apache.drill.exec.record.FragmentWritableBatch; @@ -48,10 +45,12 @@ public class SingleSenderCreator implements RootCreator{ return new SingleSenderRootExec(context, children.iterator().next(), config); } - - private static class SingleSenderRootExec extends BaseRootExec { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SingleSenderRootExec.class); + + private final SendingAccountor sendCount = new SendingAccountor(); + private final FragmentHandle oppositeHandle; + private RecordBatch incoming; private DataTunnel tunnel; private FragmentHandle handle; @@ -60,7 +59,6 @@ public class SingleSenderCreator implements RootCreator{ private FragmentContext context; private volatile boolean ok = true; private volatile boolean done = false; - private final SendingAccountor sendCount = new SendingAccountor(); public enum Metric implements MetricDef { BYTES_SENT; @@ -79,8 +77,12 @@ public class SingleSenderCreator implements RootCreator{ this.handle = context.getHandle(); this.config = config; this.recMajor = config.getOppositeMajorFragmentId(); - FragmentHandle opposite = handle.toBuilder().setMajorFragmentId(config.getOppositeMajorFragmentId()).setMinorFragmentId(0).build(); this.tunnel = context.getDataTunnel(config.getDestination()); + oppositeHandle = handle.toBuilder() + .setMajorFragmentId(config.getOppositeMajorFragmentId()) + .setMinorFragmentId(config.getOppositeMinorFragmentId()) + .build(); + tunnel = context.getDataTunnel(config.getDestination()); this.context = context; } @@ -103,8 +105,9 @@ public class SingleSenderCreator implements RootCreator{ switch (out) { case STOP: case NONE: - FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLastWithSchema(handle.getQueryId(), handle.getMajorFragmentId(), - handle.getMinorFragmentId(), recMajor, 0, incoming.getSchema()); + FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLastWithSchema(handle.getQueryId(), + handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, oppositeHandle.getMinorFragmentId(), + incoming.getSchema()); sendCount.increment(); stats.startWait(); try { @@ -117,7 +120,7 @@ public class SingleSenderCreator implements RootCreator{ case OK_NEW_SCHEMA: case OK: FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), - handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch()); + handle.getMinorFragmentId(), recMajor, oppositeHandle.getMinorFragmentId(), incoming.getWritableBatch()); updateStats(batch); sendCount.increment(); stats.startWait(); http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java index c255033..d17fdd4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java @@ -21,18 +21,16 @@ import io.netty.buffer.ByteBuf; import java.util.List; -import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.MinorFragmentEndpoint; import org.apache.drill.exec.physical.config.BroadcastSender; import org.apache.drill.exec.physical.impl.BaseRootExec; import org.apache.drill.exec.physical.impl.SendingAccountor; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.ExecProtos; -import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.GeneralRPCProtos; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; import org.apache.drill.exec.record.FragmentWritableBatch; @@ -52,9 +50,9 @@ import com.google.common.collect.ArrayListMultimap; */ public class BroadcastSenderRootExec extends BaseRootExec { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BroadcastSenderRootExec.class); + private final StatusHandler statusHandler = new StatusHandler(); private final FragmentContext context; private final BroadcastSender config; - private final int[][] receivingMinorFragments; private final DataTunnel[] tunnels; private final ExecProtos.FragmentHandle handle; @@ -79,11 +77,11 @@ public class BroadcastSenderRootExec extends BaseRootExec { this.incoming = incoming; this.config = config; this.handle = context.getHandle(); - List destinations = config.getDestinations(); + List destinations = config.getDestinations(); ArrayListMultimap dests = ArrayListMultimap.create(); - for(int i = 0; i < destinations.size(); ++i) { - dests.put(destinations.get(i), i); + for(MinorFragmentEndpoint destination : destinations) { + dests.put(destination.getEndpoint(), destination.getId()); } int destCount = dests.keySet().size(); @@ -102,12 +100,8 @@ public class BroadcastSenderRootExec extends BaseRootExec { tunnels[i] = context.getDataTunnel(ep); i++; } - - } - - @Override public boolean innerNext() { if(!ok) { @@ -121,7 +115,12 @@ public class BroadcastSenderRootExec extends BaseRootExec { case STOP: case NONE: for (int i = 0; i < tunnels.length; ++i) { - FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLast(handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), receivingMinorFragments[i]); + FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLast( + handle.getQueryId(), + handle.getMajorFragmentId(), + handle.getMinorFragmentId(), + config.getOppositeMajorFragmentId(), + receivingMinorFragments[i]); stats.startWait(); try { tunnels[i].sendRecordBatch(this.statusHandler, b2); @@ -129,9 +128,7 @@ public class BroadcastSenderRootExec extends BaseRootExec { } finally { stats.stopWait(); } - } - return false; case OK_NEW_SCHEMA: @@ -141,7 +138,14 @@ public class BroadcastSenderRootExec extends BaseRootExec { writableBatch.retainBuffers(tunnels.length - 1); } for (int i = 0; i < tunnels.length; ++i) { - FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), receivingMinorFragments[i], writableBatch); + FragmentWritableBatch batch = new FragmentWritableBatch( + false, + handle.getQueryId(), + handle.getMajorFragmentId(), + handle.getMinorFragmentId(), + config.getOppositeMajorFragmentId(), + receivingMinorFragments[i], + writableBatch); updateStats(batch); stats.startWait(); try { @@ -173,7 +177,6 @@ public class BroadcastSenderRootExec extends BaseRootExec { incoming.cleanup(); } - private StatusHandler statusHandler = new StatusHandler(); private class StatusHandler extends BaseRpcOutcomeListener { volatile RpcException ex; private final SendingAccountor sendCount = new SendingAccountor(); @@ -193,5 +196,4 @@ public class BroadcastSenderRootExec extends BaseRootExec { this.ex = ex; } } - } http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index 804671e..e230fd2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -46,6 +46,7 @@ import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.MinorFragmentEndpoint; import org.apache.drill.exec.physical.config.MergingReceiverPOP; import org.apache.drill.exec.proto.BitControl.FinishedReceiver; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; @@ -488,15 +489,15 @@ public class MergingRecordBatch extends AbstractRecordBatch .setMajorFragmentId(config.getOppositeMajorFragmentId()) .setQueryId(context.getHandle().getQueryId()) .build(); - for (int i = 0; i < config.getNumSenders(); i++) { + for (MinorFragmentEndpoint providingEndpoint : config.getProvidingEndpoints()) { FragmentHandle sender = FragmentHandle.newBuilder(handlePrototype) - .setMinorFragmentId(i) + .setMinorFragmentId(providingEndpoint.getId()) .build(); FinishedReceiver finishedReceiver = FinishedReceiver.newBuilder() .setReceiver(context.getHandle()) .setSender(sender) .build(); - context.getControlTunnel(config.getProvidingEndpoints().get(i)).informReceiverFinished(new OutcomeListener(), finishedReceiver); + context.getControlTunnel(providingEndpoint.getEndpoint()).informReceiverFinished(new OutcomeListener(), finishedReceiver); } } http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index 200e78e..ccbd289 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -34,10 +34,10 @@ import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.MinorFragmentEndpoint; import org.apache.drill.exec.physical.config.HashPartitionSender; import org.apache.drill.exec.physical.impl.BaseRootExec; import org.apache.drill.exec.physical.impl.SendingAccountor; -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.FragmentWritableBatch; @@ -268,17 +268,16 @@ public class PartitionSenderRootExec extends BaseRootExec { public void sendEmptyBatch(boolean isLast) { FragmentHandle handle = context.getHandle(); - int fieldId = 0; StatusHandler statusHandler = new StatusHandler(sendCount, context); - for (DrillbitEndpoint endpoint : popConfig.getDestinations()) { - DataTunnel tunnel = context.getDataTunnel(endpoint); + for (MinorFragmentEndpoint destination : popConfig.getDestinations()) { + DataTunnel tunnel = context.getDataTunnel(destination.getEndpoint()); FragmentWritableBatch writableBatch = FragmentWritableBatch.getEmptyBatchWithSchema( isLast, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), operator.getOppositeMajorFragmentId(), - fieldId, + destination.getId(), incoming.getSchema()); stats.startWait(); try { @@ -287,7 +286,6 @@ public class PartitionSenderRootExec extends BaseRootExec { stats.stopWait(); } sendCount.increment(); - fieldId++; } } http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java index 79076cf..1d9088a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java @@ -31,10 +31,10 @@ import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.ops.OperatorStats; +import org.apache.drill.exec.physical.MinorFragmentEndpoint; import org.apache.drill.exec.physical.config.HashPartitionSender; import org.apache.drill.exec.physical.impl.SendingAccountor; import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec.Metric; -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; @@ -93,12 +93,12 @@ public abstract class PartitionerTemplate implements Partitioner { outgoingRecordBatchSize = (DEFAULT_RECORD_BATCH_SIZE + 1)/2 - 1; } - int fieldId = 0; - for (DrillbitEndpoint endpoint : popConfig.getDestinations()) { - FragmentHandle opposite = context.getHandle().toBuilder().setMajorFragmentId(popConfig.getOppositeMajorFragmentId()).setMinorFragmentId(fieldId).build(); + for (MinorFragmentEndpoint destination : popConfig.getDestinations()) { + FragmentHandle opposite = context.getHandle().toBuilder() + .setMajorFragmentId(popConfig.getOppositeMajorFragmentId()) + .setMinorFragmentId(destination.getId()).build(); outgoingBatches.add(new OutgoingRecordBatch(stats, sendingAccountor, popConfig, - context.getDataTunnel(endpoint), context, oContext.getAllocator(), fieldId, statusHandler)); - fieldId++; + context.getDataTunnel(destination.getEndpoint()), context, oContext.getAllocator(), destination.getId(), statusHandler)); } for (OutgoingRecordBatch outgoingRecordBatch : outgoingBatches) { http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java index 52b892e..389d668 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java @@ -30,6 +30,7 @@ import org.apache.drill.exec.ops.MetricDef; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.ops.OpProfileDef; +import org.apache.drill.exec.physical.MinorFragmentEndpoint; import org.apache.drill.exec.physical.config.UnorderedReceiver; import org.apache.drill.exec.proto.BitControl.FinishedReceiver; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; @@ -209,15 +210,15 @@ public class UnorderedReceiverBatch implements RecordBatch { .setMajorFragmentId(config.getOppositeMajorFragmentId()) .setQueryId(context.getHandle().getQueryId()) .build(); - for (int i = 0; i < config.getNumSenders(); i++) { + for (MinorFragmentEndpoint providingEndpoint : config.getProvidingEndpoints()) { FragmentHandle sender = FragmentHandle.newBuilder(handlePrototype) - .setMinorFragmentId(i) + .setMinorFragmentId(providingEndpoint.getId()) .build(); FinishedReceiver finishedReceiver = FinishedReceiver.newBuilder() .setReceiver(context.getHandle()) .setSender(sender) .build(); - context.getControlTunnel(config.getProvidingEndpoints().get(i)).informReceiverFinished(new OutcomeListener(), finishedReceiver); + context.getControlTunnel(providingEndpoint.getEndpoint()).informReceiverFinished(new OutcomeListener(), finishedReceiver); } } http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java index ac63bde..2436a0e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java @@ -30,22 +30,26 @@ public class Fragment implements Iterable { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Fragment.class); private PhysicalOperator root; - private Exchange sendingExchange; + private ExchangeFragmentPair sendingExchange; private final List receivingExchangePairs = Lists.newLinkedList(); - private Stats stats = new Stats(); + /** + * Set the given operator as root operator of this fragment. If root operator is already set, + * then this method call is a no-op. + * @param o new root operator + */ public void addOperator(PhysicalOperator o) { if (root == null) { root = o; } } - public void addSendExchange(Exchange e) throws ForemanSetupException{ + public void addSendExchange(Exchange e, Fragment sendingToFragment) throws ForemanSetupException{ if (sendingExchange != null) { throw new ForemanSetupException("Fragment was trying to add a second SendExchange. "); } addOperator(e); - sendingExchange = e; + sendingExchange = new ExchangeFragmentPair(e, sendingToFragment); } public void addReceiveExchange(Exchange e, Fragment fragment) { @@ -66,6 +70,14 @@ public class Fragment implements Iterable { } public Exchange getSendingExchange() { + if (sendingExchange != null) { + return sendingExchange.exchange; + } + + return null; + } + + public ExchangeFragmentPair getSendingExchangePair() { return sendingExchange; } @@ -73,10 +85,6 @@ public class Fragment implements Iterable { // return visitor.visit(this, extra); // } - public Stats getStats() { - return stats; - } - public class ExchangeFragmentPair { private Exchange exchange; private Fragment node; @@ -117,8 +125,7 @@ public class Fragment implements Iterable { int result = 1; result = prime * result + ((receivingExchangePairs == null) ? 0 : receivingExchangePairs.hashCode()); result = prime * result + ((root == null) ? 0 : root.hashCode()); - result = prime * result + ((sendingExchange == null) ? 0 : sendingExchange.hashCode()); - result = prime * result + ((stats == null) ? 0 : stats.hashCode()); + result = prime * result + ((sendingExchange == null) ? 0 : sendingExchange.getExchange().hashCode()); return result; } @@ -155,20 +162,14 @@ public class Fragment implements Iterable { } else if (!sendingExchange.equals(other.sendingExchange)) { return false; } - if (stats == null) { - if (other.stats != null) { - return false; - } - } else if (!stats.equals(other.stats)) { - return false; - } + return true; } @Override public String toString() { return "FragmentNode [root=" + root + ", sendingExchange=" + sendingExchange + ", receivingExchangePairs=" - + receivingExchangePairs + ", stats=" + stats + "]"; + + receivingExchangePairs + "]"; } } http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java index 8756e5b..0271692 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java @@ -20,7 +20,6 @@ package org.apache.drill.exec.planner.fragment; import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; import org.apache.drill.exec.physical.base.Exchange; import org.apache.drill.exec.physical.base.PhysicalOperator; -import org.apache.drill.exec.physical.base.SubScan; import org.apache.drill.exec.work.foreman.ForemanSetupException; /** @@ -29,8 +28,9 @@ import org.apache.drill.exec.work.foreman.ForemanSetupException; public class MakeFragmentsVisitor extends AbstractPhysicalVisitor { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MakeFragmentsVisitor.class); + public final static MakeFragmentsVisitor INSTANCE = new MakeFragmentsVisitor(); - public MakeFragmentsVisitor() { + private MakeFragmentsVisitor() { } @Override @@ -41,18 +41,12 @@ public class MakeFragmentsVisitor extends AbstractPhysicalVisitor{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Materializer.class); + public static final Materializer INSTANCE = new Materializer(); + + private Materializer() { + } @Override public PhysicalOperator visitExchange(Exchange exchange, IndexedFragmentNode iNode) throws ExecutionSetupException { http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java new file mode 100644 index 0000000..75a009e --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.fragment; + +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import org.apache.drill.exec.physical.EndpointAffinity; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; + +import java.util.List; +import java.util.Map; + +/** + * Captures parallelization parameters for a given operator/fragments. It consists of min and max width of + * parallelization and affinity to drillbit endpoints. + */ +public class ParallelizationInfo { + + /* Default parallelization width is [1, Integer.MAX_VALUE] and no endpoint affinity. */ + public static final ParallelizationInfo UNLIMITED_WIDTH_NO_ENDPOINT_AFFINITY = + ParallelizationInfo.create(1, Integer.MAX_VALUE); + + private final Map affinityMap; + private final int minWidth; + private final int maxWidth; + + private ParallelizationInfo(int minWidth, int maxWidth, Map affinityMap) { + this.minWidth = minWidth; + this.maxWidth = maxWidth; + this.affinityMap = ImmutableMap.copyOf(affinityMap); + } + + public static ParallelizationInfo create(int minWidth, int maxWidth) { + return create(minWidth, maxWidth, ImmutableList.of()); + } + + public static ParallelizationInfo create(int minWidth, int maxWidth, List endpointAffinities) { + Map affinityMap = Maps.newHashMap(); + + for(EndpointAffinity epAffinity : endpointAffinities) { + affinityMap.put(epAffinity.getEndpoint(), epAffinity); + } + + return new ParallelizationInfo(minWidth, maxWidth, affinityMap); + } + + public int getMinWidth() { + return minWidth; + } + + public int getMaxWidth() { + return maxWidth; + } + + public Map getEndpointAffinityMap() { + return affinityMap; + } + + @Override + public String toString() { + return getDigest(minWidth, maxWidth, affinityMap); + } + + private static String getDigest(int minWidth, int maxWidth, Map affinityMap) { + StringBuilder sb = new StringBuilder(); + sb.append(String.format("[minWidth = %d, maxWidth=%d, epAff=[", minWidth, maxWidth)); + sb.append(Joiner.on(",").join(affinityMap.values())); + sb.append("]]"); + + return sb.toString(); + } + + /** + * Collects/merges one or more ParallelizationInfo instances. + */ + public static class ParallelizationInfoCollector { + private int minWidth = 1; + private int maxWidth = Integer.MAX_VALUE; + private final Map affinityMap = Maps.newHashMap(); + + public void add(ParallelizationInfo parallelizationInfo) { + this.minWidth = Math.max(minWidth, parallelizationInfo.minWidth); + this.maxWidth = Math.min(maxWidth, parallelizationInfo.maxWidth); + + Map affinityMap = parallelizationInfo.getEndpointAffinityMap(); + for(Map.Entry epAff : affinityMap.entrySet()) { + addEndpointAffinity(epAff.getValue()); + } + } + + public void addMaxWidth(int newMaxWidth) { + this.maxWidth = Math.min(maxWidth, newMaxWidth); + } + + public void addEndpointAffinities(List endpointAffinities) { + for(EndpointAffinity epAff : endpointAffinities) { + addEndpointAffinity(epAff); + } + } + + // Helper method to add the given EndpointAffinity to the global affinity map + private void addEndpointAffinity(EndpointAffinity epAff) { + if (affinityMap.containsKey(epAff.getEndpoint())) { + affinityMap.get(epAff.getEndpoint()).addAffinity(epAff.getAffinity()); + } else { + affinityMap.put(epAff.getEndpoint(), epAff); + } + } + + /** + * Get a ParallelizationInfo instance based on the current state of collected info. + */ + public ParallelizationInfo get() { + return new ParallelizationInfo(minWidth, maxWidth, affinityMap); + } + + @Override + public String toString() { + return getDigest(minWidth, maxWidth, affinityMap); + } + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java index 8cc6c85..3e0f35a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java @@ -27,21 +27,10 @@ import com.google.common.collect.Maps; public class PlanningSet implements Iterable { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PlanningSet.class); - private Map fragmentMap = Maps.newHashMap(); + private final Map fragmentMap = Maps.newHashMap(); private int majorFragmentIdIndex = 0; - PlanningSet() { - } - - public void addAffinity(Fragment n, DrillbitEndpoint endpoint, float affinity) { - get(n).addEndpointAffinity(endpoint, affinity); - } - - public void setWidth(Fragment n, int width) { - get(n).setWidth(width); - } - - Wrapper get(Fragment node) { + public Wrapper get(Fragment node) { Wrapper wrapper = fragmentMap.get(node); if (wrapper == null) { http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java index 0ece367..f8d1803 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java @@ -18,17 +18,30 @@ package org.apache.drill.exec.planner.fragment; import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Sets; +import com.google.common.collect.Iterators; +import com.google.common.collect.Ordering; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.util.DrillStringUtils; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.expr.fn.impl.DateUtility; import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.physical.EndpointAffinity; import org.apache.drill.exec.physical.PhysicalOperatorSetupException; +import org.apache.drill.exec.physical.base.Exchange.ParallelizationDependency; import org.apache.drill.exec.physical.base.FragmentRoot; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.planner.PhysicalPlanReader; +import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair; import org.apache.drill.exec.planner.fragment.Materializer.IndexedFragmentNode; import org.apache.drill.exec.proto.BitControl.PlanFragment; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; @@ -36,8 +49,8 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.rpc.user.UserSession; import org.apache.drill.exec.server.options.OptionList; +import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.work.QueryWorkUnit; -import org.apache.drill.exec.work.foreman.ForemanException; import org.apache.drill.exec.work.foreman.ForemanSetupException; import com.fasterxml.jackson.core.JsonProcessingException; @@ -51,22 +64,28 @@ import com.google.common.collect.Lists; * is done based on round robin assignment ordered by operator affinity (locality) to available execution Drillbits. */ public class SimpleParallelizer { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleParallelizer.class); + private static final Ordering ENDPOINT_AFFINITY_ORDERING = Ordering.from(new Comparator() { + @Override + public int compare(EndpointAffinity o1, EndpointAffinity o2) { + // Sort in descending order of affinity values + return Double.compare(o2.getAffinity(), o1.getAffinity()); + } + }); - - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleParallelizer.class); - private final Materializer materializer = new Materializer(); private final long parallelizationThreshold; private final int maxWidthPerNode; private final int maxGlobalWidth; - private double affinityFactor; + private final double affinityFactor; public SimpleParallelizer(QueryContext context) { - long sliceTarget = context.getOptions().getOption(ExecConstants.SLICE_TARGET).num_val; + OptionManager optionManager = context.getOptions(); + long sliceTarget = optionManager.getOption(ExecConstants.SLICE_TARGET).num_val; this.parallelizationThreshold = sliceTarget > 0 ? sliceTarget : 1; - this.maxWidthPerNode = context.getOptions().getOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY).num_val.intValue(); - this.maxGlobalWidth = context.getOptions().getOption(ExecConstants.MAX_WIDTH_GLOBAL_KEY).num_val.intValue(); - this.affinityFactor = context.getOptions().getOption(ExecConstants.AFFINITY_FACTOR_KEY).float_val.intValue(); + this.maxWidthPerNode = optionManager.getOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY).num_val.intValue(); + this.maxGlobalWidth = optionManager.getOption(ExecConstants.MAX_WIDTH_GLOBAL_KEY).num_val.intValue(); + this.affinityFactor = optionManager.getOption(ExecConstants.AFFINITY_FACTOR_KEY).float_val.intValue(); } public SimpleParallelizer(long parallelizationThreshold, int maxWidthPerNode, int maxGlobalWidth, double affinityFactor) { @@ -78,27 +97,230 @@ public class SimpleParallelizer { /** - * Generate a set of assigned fragments based on the provided planningSet. Do not allow parallelization stages to go - * beyond the global max width. + * Generate a set of assigned fragments based on the provided fragment tree. Do not allow parallelization stages + * to go beyond the global max width. * + * @param options Option list * @param foremanNode The driving/foreman node for this query. (this node) * @param queryId The queryId for this query. * @param activeEndpoints The list of endpoints to consider for inclusion in planning this query. * @param reader Tool used to read JSON plans - * @param rootNode The root node of the PhysicalPlan that we will parallelizing. - * @param planningSet The set of queries with collected statistics that we'll work with. + * @param rootFragment The root node of the PhysicalPlan that we will be parallelizing. + * @param session UserSession of user who launched this query. * @return The list of generated PlanFragment protobuf objects to be assigned out to the individual nodes. - * @throws ForemanException + * @throws ExecutionSetupException + */ + public QueryWorkUnit getFragments(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId, + Collection activeEndpoints, PhysicalPlanReader reader, Fragment rootFragment, + UserSession session) throws ExecutionSetupException { + + final PlanningSet planningSet = new PlanningSet(); + + initFragmentWrappers(rootFragment, planningSet); + + final Set leafFragments = constructFragmentDependencyGraph(planningSet); + + // Start parallelizing from leaf fragments + for (Wrapper wrapper : leafFragments) { + parallelizeFragment(wrapper, planningSet, activeEndpoints); + } + + return generateWorkUnit(options, foremanNode, queryId, reader, rootFragment, planningSet, session); + } + + // For every fragment, create a Wrapper in PlanningSet. + @VisibleForTesting + public void initFragmentWrappers(Fragment rootFragment, PlanningSet planningSet) { + planningSet.get(rootFragment); + + for(ExchangeFragmentPair fragmentPair : rootFragment) { + initFragmentWrappers(fragmentPair.getNode(), planningSet); + } + } + + /** + * Based on the affinity of the Exchange that separates two fragments, setup fragment dependencies. + * + * @param planningSet + * @return Returns a list of leaf fragments in fragment dependency graph. */ - public QueryWorkUnit getFragments(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId, Collection activeEndpoints, - PhysicalPlanReader reader, Fragment rootNode, PlanningSet planningSet, UserSession session) throws ExecutionSetupException { - assignEndpoints(activeEndpoints, planningSet); - return generateWorkUnit(options, foremanNode, queryId, reader, rootNode, planningSet, session); + private Set constructFragmentDependencyGraph(PlanningSet planningSet) { + + // Set up dependency of fragments based on the affinity of exchange that separates the fragments. + for(Wrapper currentFragmentWrapper : planningSet) { + ExchangeFragmentPair sendingExchange = currentFragmentWrapper.getNode().getSendingExchangePair(); + if (sendingExchange != null) { + ParallelizationDependency dependency = sendingExchange.getExchange().getParallelizationDependency(); + Wrapper receivingFragmentWrapper = planningSet.get(sendingExchange.getNode()); + + if (dependency == ParallelizationDependency.RECEIVER_DEPENDS_ON_SENDER) { + receivingFragmentWrapper.addFragmentDependency(currentFragmentWrapper); + } else if (dependency == ParallelizationDependency.SENDER_DEPENDS_ON_RECEIVER) { + currentFragmentWrapper.addFragmentDependency(receivingFragmentWrapper); + } + } + } + + // Identify leaf fragments. Leaf fragments are fragments that have no other fragments depending on them for + // parallelization info. First assume all fragments are leaf fragments. Go through the fragments one by one and + // remove the fragment on which the current fragment depends on. + final Set roots = Sets.newHashSet(); + for(Wrapper w : planningSet) { + roots.add(w); + } + + for(Wrapper wrapper : planningSet) { + final List fragmentDependencies = wrapper.getFragmentDependencies(); + if (fragmentDependencies != null && fragmentDependencies.size() > 0) { + for(Wrapper dependency : fragmentDependencies) { + if (roots.contains(dependency)) { + roots.remove(dependency); + } + } + } + } + + return roots; } - private QueryWorkUnit generateWorkUnit(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId, PhysicalPlanReader reader, Fragment rootNode, - PlanningSet planningSet, UserSession session) throws ExecutionSetupException { + /** + * Helper method for parallelizing a given fragment. Dependent fragments are parallelized first before + * parallelizing the given fragment. + */ + private void parallelizeFragment(Wrapper fragmentWrapper, PlanningSet planningSet, + Collection activeEndpoints) throws PhysicalOperatorSetupException { + // If the fragment is already parallelized, return. + if (fragmentWrapper.isEndpointsAssignmentDone()) { + return; + } + + // First parallelize fragments on which this fragment depends on. + final List fragmentDependencies = fragmentWrapper.getFragmentDependencies(); + if (fragmentDependencies != null && fragmentDependencies.size() > 0) { + for(Wrapper dependency : fragmentDependencies) { + parallelizeFragment(dependency, planningSet, activeEndpoints); + } + } + + Fragment fragment = fragmentWrapper.getNode(); + + // Step 1: Find stats. Stats include various factors including cost of physical operators, parallelizability of + // work in physical operator and affinity of physical operator to certain nodes. + fragment.getRoot().accept(new StatsCollector(planningSet), fragmentWrapper); + + // Step 2: Find the parallelization width of fragment + + final Stats stats = fragmentWrapper.getStats(); + final ParallelizationInfo parallelizationInfo = stats.getParallelizationInfo(); + + // 2.1 Use max cost of all operators in this fragment; this is consistent with the + // calculation that ExcessiveExchangeRemover uses + // 2.1. Find the parallelization based on cost + int width = (int) Math.ceil(stats.getMaxCost() / parallelizationThreshold); + + // 2.2. Cap the parallelization width by fragment level width limit and system level per query width limit + width = Math.min(width, Math.min(parallelizationInfo.getMaxWidth(), maxGlobalWidth)); + + // 2.3. Cap the parallelization width by system level per node width limit + width = Math.min(width, maxWidthPerNode * activeEndpoints.size()); + // 2.4. Make sure width is at least the min width enforced by operators + width = Math.max(parallelizationInfo.getMinWidth(), width); + + // 2.4. Make sure width is at most the max width enforced by operators + width = Math.min(parallelizationInfo.getMaxWidth(), width); + + // 2.5 Finally make sure the width is at least one + width = Math.max(1, width); + + fragmentWrapper.setWidth(width); + + List assignedEndpoints = findEndpoints(activeEndpoints, + parallelizationInfo.getEndpointAffinityMap(), fragmentWrapper.getWidth()); + fragmentWrapper.assignEndpoints(assignedEndpoints); + } + + // Assign endpoints based on the given endpoint list, affinity map and width. + private List findEndpoints(Collection activeEndpoints, + Map endpointAffinityMap, final int width) + throws PhysicalOperatorSetupException { + + final List endpoints = Lists.newArrayList(); + + if (endpointAffinityMap.size() > 0) { + // Get EndpointAffinity list sorted in descending order of affinity values + List sortedAffinityList = ENDPOINT_AFFINITY_ORDERING.immutableSortedCopy(endpointAffinityMap.values()); + + // Find the number of mandatory nodes (nodes with +infinity affinity). + int numRequiredNodes = 0; + for(EndpointAffinity ep : sortedAffinityList) { + if (ep.isAssignmentRequired()) { + numRequiredNodes++; + } else { + // As the list is sorted in descending order of affinities, we don't need to go beyond the first occurrance + // of non-mandatory node + break; + } + } + + if (width < numRequiredNodes) { + throw new PhysicalOperatorSetupException("Can not parallelize the fragment as the parallelization width is " + + "less than the number of mandatory nodes (nodes with +INFINITE affinity)."); + } + + // Find the maximum number of slots which should go to endpoints with affinity (See DRILL-825 for details) + int affinedSlots = + Math.max(1, (int) (affinityFactor * width / activeEndpoints.size())) * sortedAffinityList.size(); + + // Make sure affined slots is at least the number of mandatory nodes + affinedSlots = Math.max(affinedSlots, numRequiredNodes); + + // Cap the affined slots to max parallelization width + affinedSlots = Math.min(affinedSlots, width); + + Iterator affinedEPItr = Iterators.cycle(sortedAffinityList); + + // Keep adding until we have selected "affinedSlots" number of endpoints. + while(endpoints.size() < affinedSlots) { + EndpointAffinity ea = affinedEPItr.next(); + endpoints.add(ea.getEndpoint()); + } + } + + // add remaining endpoints if required + if (endpoints.size() < width) { + // Get a list of endpoints that are not part of the affinity endpoint list + List endpointsWithNoAffinity; + final Set endpointsWithAffinity = endpointAffinityMap.keySet(); + + if (endpointAffinityMap.size() > 0) { + endpointsWithNoAffinity = Lists.newArrayList(); + for (DrillbitEndpoint ep : activeEndpoints) { + if (!endpointsWithAffinity.contains(ep)) { + endpointsWithNoAffinity.add(ep); + } + } + } else { + endpointsWithNoAffinity = Lists.newArrayList(activeEndpoints); // Need to create a copy instead of an + // immutable copy, because we need to shuffle the list (next statement) and Collections.shuffle() doesn't + // support immutable copy as input. + } + + // round robin with random start. + Collections.shuffle(endpointsWithNoAffinity, ThreadLocalRandom.current()); + Iterator otherEPItr = + Iterators.cycle(endpointsWithNoAffinity.size() > 0 ? endpointsWithNoAffinity : endpointsWithAffinity); + while (endpoints.size() < width) { + endpoints.add(otherEPItr.next()); + } + } + + return endpoints; + } + + private QueryWorkUnit generateWorkUnit(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId, + PhysicalPlanReader reader, Fragment rootNode, PlanningSet planningSet, + UserSession session) throws ExecutionSetupException { List fragments = Lists.newArrayList(); PlanFragment rootFragment = null; @@ -111,14 +333,12 @@ public class SimpleParallelizer { // assigned before we can materialize, so we start a new loop here rather than utilizing the previous one. for (Wrapper wrapper : planningSet) { Fragment node = wrapper.getNode(); - Stats stats = node.getStats(); final PhysicalOperator physicalOperatorRoot = node.getRoot(); boolean isRootNode = rootNode == node; if (isRootNode && wrapper.getWidth() != 1) { - throw new ForemanSetupException( - String.format( - "Failure while trying to setup fragment. The root fragment must always have parallelization one. In the current case, the width was set to %d.", + throw new ForemanSetupException(String.format("Failure while trying to setup fragment. " + + "The root fragment must always have parallelization one. In the current case, the width was set to %d.", wrapper.getWidth())); } // a fragment is self driven if it doesn't rely on any other exchanges. @@ -128,7 +348,7 @@ public class SimpleParallelizer { for (int minorFragmentId = 0; minorFragmentId < wrapper.getWidth(); minorFragmentId++) { IndexedFragmentNode iNode = new IndexedFragmentNode(minorFragmentId, wrapper); wrapper.resetAllocation(); - PhysicalOperator op = physicalOperatorRoot.accept(materializer, iNode); + PhysicalOperator op = physicalOperatorRoot.accept(Materializer.INSTANCE, iNode); Preconditions.checkArgument(op instanceof FragmentRoot); FragmentRoot root = (FragmentRoot) op; @@ -175,32 +395,4 @@ public class SimpleParallelizer { return new QueryWorkUnit(rootOperator, rootFragment, fragments); } - - private void assignEndpoints(Collection allNodes, PlanningSet planningSet) throws PhysicalOperatorSetupException { - // for each node, set the width based on the parallelization threshold and cluster width. - for (Wrapper wrapper : planningSet) { - - Stats stats = wrapper.getStats(); - - // Use max cost of all operators in this fragment; this is consistent with the - // calculation that ExcessiveExchangeRemover uses - double targetSlices = stats.getMaxCost()/parallelizationThreshold; - int targetIntSlices = (int) Math.ceil(targetSlices); - - // figure out width. - int width = Math.min(targetIntSlices, Math.min(stats.getMaxWidth(), maxGlobalWidth)); - - - width = Math.min(width, maxWidthPerNode*allNodes.size()); - - if (width < 1) { - width = 1; - } -// logger.debug("Setting width {} on fragment {}", width, wrapper); - wrapper.setWidth(width); - // figure out endpoint assignments. also informs the exchanges about their respective endpoints. - wrapper.assignEndpoints(allNodes, affinityFactor); - } - } - } http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java index 85a7b86..e61b38f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java @@ -18,31 +18,41 @@ package org.apache.drill.exec.planner.fragment; -public class Stats { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Stats.class); +import org.apache.drill.exec.physical.EndpointAffinity; +import org.apache.drill.exec.planner.fragment.ParallelizationInfo.ParallelizationInfoCollector; + +import java.util.List; - private int maxWidth = Integer.MAX_VALUE; +public class Stats { + private final ParallelizationInfoCollector collector = new ParallelizationInfoCollector(); private double maxCost = 0.0; - public void addMaxWidth(int maxWidth){ - this.maxWidth = Math.min(this.maxWidth, maxWidth); + public void addParallelizationInfo(ParallelizationInfo parallelizationInfo) { + collector.add(parallelizationInfo); } public void addCost(double cost){ maxCost = Math.max(maxCost, cost); } - public int getMaxWidth() { - return maxWidth; + public void addMaxWidth(int maxWidth) { + collector.addMaxWidth(maxWidth); + } + + public void addEndpointAffinities(List endpointAffinityList) { + collector.addEndpointAffinities(endpointAffinityList); + } + + public ParallelizationInfo getParallelizationInfo() { + return collector.get(); } @Override public String toString() { - return "Stats [maxWidth=" + maxWidth + ", maxCost=" + maxCost + "]"; + return "Stats [maxCost=" + maxCost +", parallelizationInfo=" + collector.toString() + "]"; } public double getMaxCost() { return maxCost; } - } http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java index 41ff678..1f56556 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java @@ -17,110 +17,94 @@ */ package org.apache.drill.exec.planner.fragment; +import com.google.common.collect.Lists; import org.apache.drill.exec.physical.base.Exchange; import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.base.HasAffinity; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.Store; -import org.apache.drill.exec.physical.base.SubScan; -import org.apache.drill.exec.physical.config.Limit; -import org.apache.drill.exec.physical.config.WindowPOP; import org.apache.drill.exec.planner.AbstractOpWrapperVisitor; import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; -import com.google.common.base.Preconditions; +import java.util.Collections; +import java.util.List; -public class StatsCollector { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StatsCollector.class); - - private final static OpStatsCollector opStatCollector = new OpStatsCollector(); - - private StatsCollector() { - }; +/** + * Visitor to collect stats such as cost and parallelization info of operators within a fragment. + * + * All operators have cost associated with them, but only few type of operators such as scan, + * store and exchanges (both sending and receiving) have parallelization info associated with them. + */ +public class StatsCollector extends AbstractOpWrapperVisitor { + private final PlanningSet planningSet; - private static void visit(PlanningSet planningSet, Fragment n) { - Preconditions.checkNotNull(planningSet); - Preconditions.checkNotNull(n); + public StatsCollector(final PlanningSet planningSet) { + this.planningSet = planningSet; + } - Wrapper wrapper = planningSet.get(n); - n.getRoot().accept(opStatCollector, wrapper); -// logger.debug("Set stats to {}", wrapper.getStats()); - // receivers... - for (ExchangeFragmentPair child : n) { - // get the fragment node that feeds this node. - Fragment childNode = child.getNode(); - visit(planningSet, childNode); + @Override + public Void visitSendingExchange(Exchange exchange, Wrapper wrapper) throws RuntimeException { + // Handle the sending side exchange + Wrapper receivingFragment = planningSet.get(wrapper.getNode().getSendingExchangePair().getNode()); + + // List to contain the endpoints where the fragment that receive data to this fragment are running. + List receiverEndpoints; + if (receivingFragment.isEndpointsAssignmentDone()) { + receiverEndpoints = receivingFragment.getAssignedEndpoints(); + } else { + receiverEndpoints = Collections.emptyList(); } + wrapper.getStats().addParallelizationInfo(exchange.getSenderParallelizationInfo(receiverEndpoints)); + return visitOp(exchange, wrapper); } - public static PlanningSet collectStats(Fragment rootFragment) { - PlanningSet fps = new PlanningSet(); - visit(fps, rootFragment); - return fps; - } + @Override + public Void visitReceivingExchange(Exchange exchange, Wrapper wrapper) throws RuntimeException { + // Handle the receiving side Exchange - private static class OpStatsCollector extends AbstractOpWrapperVisitor { + final List receivingExchangePairs = wrapper.getNode().getReceivingExchangePairs(); - @Override - public Void visitSendingExchange(Exchange exchange, Wrapper wrapper) throws RuntimeException { - Stats stats = wrapper.getStats(); - stats.addMaxWidth(exchange.getMaxSendWidth()); - return super.visitSendingExchange(exchange, wrapper); - } + // List to contain the endpoints where the fragment that send dat to this fragment are running. + final List sendingEndpoints = Lists.newArrayList(); - @Override - public Void visitReceivingExchange(Exchange exchange, Wrapper wrapper) throws RuntimeException { - Stats stats = wrapper.getStats(); - stats.addMaxWidth(exchange.getMaxReceiveWidth()) ; - // no traversal since it would cross fragment boundary. - return null; - } - - @Override - public Void visitGroupScan(GroupScan groupScan, Wrapper wrapper) { - Stats stats = wrapper.getStats(); - stats.addMaxWidth(groupScan.getMaxParallelizationWidth()); - return super.visitGroupScan(groupScan, wrapper); + for(ExchangeFragmentPair pair : receivingExchangePairs) { + if (pair.getExchange() == exchange) { + Wrapper sendingFragment = planningSet.get(pair.getNode()); + if (sendingFragment.isEndpointsAssignmentDone()) { + sendingEndpoints.addAll(sendingFragment.getAssignedEndpoints()); + } + } } - @Override - public Void visitSubScan(SubScan subScan, Wrapper wrapper) throws RuntimeException { - // TODO - implement this - return visitOp(subScan, wrapper); - } + wrapper.getStats().addParallelizationInfo(exchange.getReceiverParallelizationInfo(sendingEndpoints)); + // no traversal since it would cross current fragment boundary. + return null; + } - @Override - public Void visitStore(Store store, Wrapper wrapper) { - Stats stats = wrapper.getStats(); - stats.addMaxWidth(store.getMaxWidth()); - return super.visitStore(store, wrapper); - } + @Override + public Void visitGroupScan(GroupScan groupScan, Wrapper wrapper) { + wrapper.getStats().addMaxWidth(groupScan.getMaxParallelizationWidth()); + return super.visitGroupScan(groupScan, wrapper); + } - @Override - public Void visitLimit(Limit limit, Wrapper wrapper) throws RuntimeException { - // TODO: Implement this - return visitOp(limit, wrapper); - } + @Override + public Void visitStore(Store store, Wrapper wrapper) { + wrapper.getStats().addMaxWidth(store.getMaxWidth()); + return super.visitStore(store, wrapper); + } - @Override - public Void visitOp(PhysicalOperator op, Wrapper wrapper) { - if(op instanceof HasAffinity){ - wrapper.addEndpointAffinity(((HasAffinity)op).getOperatorAffinity()); - } - Stats stats = wrapper.getStats(); - stats.addCost(op.getCost()); - for (PhysicalOperator child : op) { - child.accept(this, wrapper); - } - return null; + @Override + public Void visitOp(PhysicalOperator op, Wrapper wrapper) { + final Stats stats = wrapper.getStats(); + if (op instanceof HasAffinity) { + stats.addEndpointAffinities(((HasAffinity)op).getOperatorAffinity()); } - - @Override - public Void visitWindowFrame(WindowPOP window, Wrapper value) throws RuntimeException { - return visitOp(window, value); + stats.addCost(op.getCost()); + for (PhysicalOperator child : op) { + child.accept(this, wrapper); } - + return null; } - } http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java index 86b395e..75b1e3b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java @@ -17,14 +17,9 @@ */ package org.apache.drill.exec.planner.fragment; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.concurrent.ThreadLocalRandom; -import org.apache.drill.exec.physical.EndpointAffinity; +import com.google.common.collect.ImmutableList; import org.apache.drill.exec.physical.PhysicalOperatorSetupException; import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; import org.apache.drill.exec.physical.base.Exchange; @@ -36,9 +31,7 @@ import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import com.google.common.base.Preconditions; -import com.google.common.collect.Iterators; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; /** * A wrapping class that allows us to add additional information to each fragment node for planning purposes. @@ -51,13 +44,15 @@ public class Wrapper { private int width = -1; private final Stats stats; private boolean endpointsAssigned; - private Map endpointAffinityMap = Maps.newHashMap(); private long initialAllocation = 0; private long maxAllocation = 0; + // List of fragments this particular fragment depends on for determining its parallelization and endpoint assignments. + private final List fragmentDependencies = Lists.newArrayList(); + // a list of assigned endpoints. Technically, there could repeated endpoints in this list if we'd like to assign the // same fragment multiple times to the same endpoint. - private List endpoints = Lists.newLinkedList(); + private final List endpoints = Lists.newLinkedList(); public Wrapper(Fragment node, int majorFragmentId) { this.majorFragmentId = majorFragmentId; @@ -74,25 +69,6 @@ public class Wrapper { maxAllocation = 0; } - public void addEndpointAffinity(List affinities){ - Preconditions.checkState(!endpointsAssigned); - for(EndpointAffinity ea : affinities){ - addEndpointAffinity(ea.getEndpoint(), ea.getAffinity()); - } - } - - public void addEndpointAffinity(DrillbitEndpoint endpoint, float affinity) { - Preconditions.checkState(!endpointsAssigned); - Preconditions.checkNotNull(endpoint); - EndpointAffinity ea = endpointAffinityMap.get(endpoint); - if (ea == null) { - ea = new EndpointAffinity(endpoint); - endpointAffinityMap.put(endpoint, ea); - } - - ea.addAffinity(affinity); - } - public int getMajorFragmentId() { return majorFragmentId; } @@ -160,34 +136,12 @@ public class Wrapper { } - public void assignEndpoints(Collection allEndpoints, double affinityFactor) throws PhysicalOperatorSetupException { + public void assignEndpoints(List assignedEndpoints) throws + PhysicalOperatorSetupException { Preconditions.checkState(!endpointsAssigned); endpointsAssigned = true; - if (endpointAffinityMap.size() > 0) { - List affinedEPs = Lists.newArrayList(endpointAffinityMap.values()); - // get nodes with highest affinity. - Collections.sort(affinedEPs); - Iterator affinedEPItr = Iterators.cycle(Lists.reverse(affinedEPs)); - /** Maximum number of slots which should go to endpoints with affinity */ - int affinedSlots = Math.min((Math.max(1, (int) (affinityFactor*width/allEndpoints.size())) * affinedEPs.size()), width); - while(endpoints.size() < affinedSlots) { - EndpointAffinity ea = affinedEPItr.next(); - DrillbitEndpoint endpoint = ea.getEndpoint(); - endpoints.add(endpoint); - } - } - // add other endpoints if required - if (endpoints.size() < width) { - List all = Lists.newArrayList(allEndpoints); - all.removeAll(endpointAffinityMap.keySet()); - // round robin with random start. - Collections.shuffle(all, ThreadLocalRandom.current()); - Iterator otherEPItr = Iterators.cycle(all.size() > 0 ? all : endpointAffinityMap.keySet()); - while (endpoints.size() < width) { - endpoints.add(otherEPItr.next()); - } - } + endpoints.addAll(assignedEndpoints); // Set scan and store endpoints. AssignEndpointsToScanAndStore visitor = new AssignEndpointsToScanAndStore(); @@ -209,9 +163,38 @@ public class Wrapper { return "FragmentWrapper [majorFragmentId=" + majorFragmentId + ", width=" + width + ", stats=" + stats + "]"; } + public List getAssignedEndpoints() { + Preconditions.checkState(endpointsAssigned); + return ImmutableList.copyOf(endpoints); + } + public DrillbitEndpoint getAssignedEndpoint(int minorFragmentId) { Preconditions.checkState(endpointsAssigned); - return this.endpoints.get(minorFragmentId); + return endpoints.get(minorFragmentId); } + /** + * Add a parallelization dependency on given fragment. + * + * @param dependsOn + */ + public void addFragmentDependency(Wrapper dependsOn) { + fragmentDependencies.add(dependsOn); + } + + /** + * Is the endpoints assignment done for this fragment? + * @return + */ + public boolean isEndpointsAssignmentDone() { + return endpointsAssigned; + } + + /** + * Get the list of fragements this particular fragment depends for determining its + * @return + */ + public List getFragmentDependencies() { + return ImmutableList.copyOf(fragmentDependencies); + } } http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java index ede0683..abbc910 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java @@ -47,6 +47,8 @@ public class PlannerSettings implements Context{ public static final OptionValidator BROADCAST_THRESHOLD = new PositiveLongValidator("planner.broadcast_threshold", MAX_BROADCAST_THRESHOLD, 10000000); public static final OptionValidator BROADCAST_FACTOR = new RangeDoubleValidator("planner.broadcast_factor", 0, Double.MAX_VALUE, 1.0d); public static final OptionValidator JOIN_ROW_COUNT_ESTIMATE_FACTOR = new RangeDoubleValidator("planner.join.row_count_estimate_factor", 0, Double.MAX_VALUE, 1.0d); + public static final OptionValidator MUX_EXCHANGE = new BooleanValidator("planner.enable_mux_exchange", true); + public static final OptionValidator DEMUX_EXCHANGE = new BooleanValidator("planner.enable_demux_exchange", false); public static final OptionValidator PRODUCER_CONSUMER = new BooleanValidator("planner.add_producer_consumer", false); public static final OptionValidator PRODUCER_CONSUMER_QUEUE_SIZE = new LongValidator("planner.producer_consumer_queue_size", 10); public static final OptionValidator HASH_SINGLE_KEY = new BooleanValidator("planner.enable_hash_single_key", true); http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnorderedDeMuxExchangePrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnorderedDeMuxExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnorderedDeMuxExchangePrel.java new file mode 100644 index 0000000..79253c7 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnorderedDeMuxExchangePrel.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.planner.physical; + +import java.io.IOException; +import java.util.List; + +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.config.UnorderedDeMuxExchange; +import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.eigenbase.rel.RelNode; +import org.eigenbase.relopt.RelOptCluster; +import org.eigenbase.relopt.RelTraitSet; + +public class UnorderedDeMuxExchangePrel extends ExchangePrel { + + private final List fields; + + public UnorderedDeMuxExchangePrel(RelOptCluster cluster, RelTraitSet traits, RelNode child, List fields) { + super(cluster, traits, child); + this.fields = fields; + } + + @Override + public RelNode copy(RelTraitSet traitSet, List inputs) { + return new UnorderedDeMuxExchangePrel(getCluster(), traitSet, sole(inputs), fields); + } + + @Override + public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException { + Prel child = (Prel) this.getChild(); + + PhysicalOperator childPOP = child.getPhysicalOperator(creator); + + UnorderedDeMuxExchange p = new UnorderedDeMuxExchange(childPOP, PrelUtil.getHashExpression(this.fields, getChild().getRowType())); + return creator.addMetadata(this, p); + } + + @Override + public SelectionVectorMode getEncoding() { + return SelectionVectorMode.NONE; + } + + @Override + public SelectionVectorMode[] getSupportedEncodings() { + // DeMuxExchangePrel accepts vectors with all types SelectionVectors as input. + return SelectionVectorMode.ALL; + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnorderedMuxExchangePrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnorderedMuxExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnorderedMuxExchangePrel.java new file mode 100644 index 0000000..8ab05a0 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnorderedMuxExchangePrel.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.planner.physical; + +import java.io.IOException; +import java.util.List; + +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.config.UnorderedMuxExchange; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.eigenbase.rel.RelNode; +import org.eigenbase.relopt.RelOptCluster; +import org.eigenbase.relopt.RelTraitSet; + +public class UnorderedMuxExchangePrel extends ExchangePrel { + + public UnorderedMuxExchangePrel(RelOptCluster cluster, RelTraitSet traits, RelNode child) { + super(cluster, traits, child); + } + + @Override + public RelNode copy(RelTraitSet traitSet, List inputs) { + return new UnorderedMuxExchangePrel(getCluster(), traitSet, sole(inputs)); + } + + @Override + public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException { + Prel child = (Prel) this.getChild(); + + PhysicalOperator childPOP = child.getPhysicalOperator(creator); + + UnorderedMuxExchange p = new UnorderedMuxExchange(childPOP); + return creator.addMetadata(this, p); + } + + @Override + public SelectionVectorMode getEncoding() { + return SelectionVectorMode.NONE; + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java new file mode 100644 index 0000000..8793849 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.physical.visitor; + +import com.google.common.collect.Lists; +import org.apache.drill.exec.planner.physical.ExchangePrel; +import org.apache.drill.exec.planner.physical.HashToRandomExchangePrel; +import org.apache.drill.exec.planner.physical.PlannerSettings; +import org.apache.drill.exec.planner.physical.Prel; +import org.apache.drill.exec.planner.physical.UnorderedDeMuxExchangePrel; +import org.apache.drill.exec.planner.physical.UnorderedMuxExchangePrel; +import org.apache.drill.exec.server.options.OptionManager; +import org.eigenbase.rel.RelNode; + +import java.util.Collections; +import java.util.List; + +public class InsertLocalExchangeVisitor extends BasePrelVisitor { + + private final boolean isMuxEnabled; + private final boolean isDeMuxEnabled; + + public static Prel insertLocalExchanges(Prel prel, OptionManager options) { + boolean isMuxEnabled = options.getOption(PlannerSettings.MUX_EXCHANGE.getOptionName()).bool_val; + boolean isDeMuxEnabled = options.getOption(PlannerSettings.DEMUX_EXCHANGE.getOptionName()).bool_val; + + if (isMuxEnabled || isDeMuxEnabled) { + return prel.accept(new InsertLocalExchangeVisitor(isMuxEnabled, isDeMuxEnabled), null); + } + + return prel; + } + + public InsertLocalExchangeVisitor(boolean isMuxEnabled, boolean isDeMuxEnabled) { + this.isMuxEnabled = isMuxEnabled; + this.isDeMuxEnabled = isDeMuxEnabled; + } + + @Override + public Prel visitExchange(ExchangePrel prel, Void value) throws RuntimeException { + Prel child = ((Prel)prel.getChild()).accept(this, null); + // Whenever we encounter a HashToRandomExchangePrel: + // If MuxExchange is enabled, insert a UnorderedMuxExchangePrel before HashToRandomExchangePrel. + // If DeMuxExchange is enabled, insert a UnorderedDeMuxExchangePrel after HashToRandomExchangePrel. + if (prel instanceof HashToRandomExchangePrel) { + Prel newPrel = child; + if (isMuxEnabled) { + newPrel = new UnorderedMuxExchangePrel(prel.getCluster(), prel.getTraitSet(), child); + } + + newPrel = new HashToRandomExchangePrel(prel.getCluster(), + prel.getTraitSet(), newPrel, ((HashToRandomExchangePrel) prel).getFields()); + + if (isDeMuxEnabled) { + HashToRandomExchangePrel hashExchangePrel = (HashToRandomExchangePrel) newPrel; + // Insert a DeMuxExchange to narrow down the number of receivers + newPrel = new UnorderedDeMuxExchangePrel(prel.getCluster(), prel.getTraitSet(), hashExchangePrel, + hashExchangePrel.getFields()); + } + + return newPrel; + } + + return (Prel)prel.copy(prel.getTraitSet(), Collections.singletonList(((RelNode)child))); + } + + @Override + public Prel visitPrel(Prel prel, Void value) throws RuntimeException { + List children = Lists.newArrayList(); + for(Prel child : prel){ + children.add(child.accept(this, null)); + } + return (Prel) prel.copy(prel.getTraitSet(), children); + } +}