Return-Path: X-Original-To: apmail-drill-commits-archive@www.apache.org Delivered-To: apmail-drill-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CB18017E51 for ; Thu, 19 Mar 2015 01:20:39 +0000 (UTC) Received: (qmail 13655 invoked by uid 500); 19 Mar 2015 01:20:39 -0000 Delivered-To: apmail-drill-commits-archive@drill.apache.org Received: (qmail 13588 invoked by uid 500); 19 Mar 2015 01:20:39 -0000 Mailing-List: contact commits-help@drill.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: commits@drill.apache.org Delivered-To: mailing list commits@drill.apache.org Received: (qmail 13517 invoked by uid 99); 19 Mar 2015 01:20:39 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Mar 2015 01:20:39 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6DB23E18FB; Thu, 19 Mar 2015 01:20:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: venki@apache.org To: commits@drill.apache.org Message-Id: <94f193549ec74f7eb84c9f9738908c93@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: drill git commit: DRILL-2210 Introducing multithreading capability to PartitonerSender Date: Thu, 19 Mar 2015 01:20:39 +0000 (UTC) Repository: drill Updated Branches: refs/heads/master 54df129ca -> 49d316a1c DRILL-2210 Introducing multithreading capability to PartitonerSender Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/49d316a1 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/49d316a1 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/49d316a1 Branch: refs/heads/master Commit: 49d316a1cb22f79061e246b5e197547dac730232 Parents: 54df129 Author: Yuliya Feldman Authored: Tue Feb 17 00:09:07 2015 -0800 Committer: vkorukanti Committed: Wed Mar 18 17:40:37 2015 -0700 ---------------------------------------------------------------------- .../apache/drill/exec/compile/CodeCompiler.java | 13 +- .../apache/drill/exec/ops/FragmentContext.java | 8 + .../apache/drill/exec/ops/OperatorStats.java | 81 ++++ .../exec/physical/config/IteratorValidator.java | 2 +- .../exec/physical/impl/SendingAccountor.java | 9 +- .../PartitionSenderRootExec.java | 91 ++++- .../impl/partitionsender/Partitioner.java | 11 +- .../partitionsender/PartitionerDecorator.java | 282 +++++++++++++ .../partitionsender/PartitionerTemplate.java | 54 ++- .../exec/planner/fragment/Materializer.java | 3 + .../exec/planner/physical/PlannerSettings.java | 3 + .../drill/exec/server/DrillbitContext.java | 10 +- .../server/options/SystemOptionManager.java | 3 + .../org/apache/drill/exec/work/WorkManager.java | 8 +- .../exec/physical/impl/TestOptiqPlans.java | 2 +- .../partitionsender/TestPartitionSender.java | 392 +++++++++++++++++++ 16 files changed, 938 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/main/java/org/apache/drill/exec/compile/CodeCompiler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/CodeCompiler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/CodeCompiler.java index 57a6660..f0147ef 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/CodeCompiler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/CodeCompiler.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.compile; import java.io.IOException; +import java.util.List; import java.util.concurrent.ExecutionException; import org.apache.drill.common.config.DrillConfig; @@ -32,6 +33,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import com.google.common.collect.Lists; public class CodeCompiler { // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CodeCompiler.class); @@ -53,10 +55,19 @@ public class CodeCompiler { @SuppressWarnings("unchecked") public T getImplementationClass(final CodeGenerator cg) throws ClassTransformationException, IOException { + return (T) getImplementationClass(cg, 1).get(0); + } + + @SuppressWarnings("unchecked") + public List getImplementationClass(final CodeGenerator cg, int instanceNumber) throws ClassTransformationException, IOException { cg.generate(); try { final GeneratedClassEntry ce = cache.get(cg); - return (T) ce.clazz.newInstance(); + List tList = Lists.newArrayList(); + for ( int i = 0; i < instanceNumber; i++) { + tList.add((T) ce.clazz.newInstance()); + } + return tList; } catch (ExecutionException | InstantiationException | IllegalAccessException e) { throw new ClassTransformationException(e); } http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java index 9fc9ad1..2a6660e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java @@ -222,6 +222,14 @@ public class FragmentContext implements AutoCloseable, UdfUtilities { return context.getCompiler().getImplementationClass(cg); } + public List getImplementationClass(ClassGenerator cg, int instanceCount) throws ClassTransformationException, IOException { + return getImplementationClass(cg.getCodeGenerator(), instanceCount); + } + + public List getImplementationClass(CodeGenerator cg, int instanceCount) throws ClassTransformationException, IOException { + return context.getCompiler().getImplementationClass(cg, instanceCount); + } + /** * Get the user connection associated with this fragment. This return null unless this is a root fragment. * @return The RPC connection to the query submitter. http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java index 0e9da0e..3f671e0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java @@ -17,6 +17,8 @@ */ package org.apache.drill.exec.ops; +import java.util.Iterator; + import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.proto.UserBitShared.MetricValue; import org.apache.drill.exec.proto.UserBitShared.OperatorProfile; @@ -24,6 +26,8 @@ import org.apache.drill.exec.proto.UserBitShared.StreamProfile; import com.carrotsearch.hppc.IntDoubleOpenHashMap; import com.carrotsearch.hppc.IntLongOpenHashMap; +import com.carrotsearch.hppc.cursors.IntDoubleCursor; +import com.carrotsearch.hppc.cursors.IntLongCursor; public class OperatorStats { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorStats.class); @@ -53,16 +57,39 @@ public class OperatorStats { private long waitMark; private long schemas; + private int inputCount; public OperatorStats(OpProfileDef def, BufferAllocator allocator){ this(def.getOperatorId(), def.getOperatorType(), def.getIncomingCount(), allocator); } + /** + * Copy constructor to be able to create a copy of existing stats object shell and use it independently + * this is useful if stats have to be updated in different threads, since it is not really + * possible to update such stats as waitNanos, setupNanos and processingNanos across threads + * @param original - OperatorStats object to create a copy from + * @param isClean - flag to indicate whether to start with clean state indicators or inherit those from original object + */ + public OperatorStats(OperatorStats original, boolean isClean) { + this(original.operatorId, original.operatorType, original.inputCount, original.allocator); + + if ( !isClean ) { + inProcessing = original.inProcessing; + inSetup = original.inSetup; + inWait = original.inWait; + + processingMark = original.processingMark; + setupMark = original.setupMark; + waitMark = original.waitMark; + } + } + private OperatorStats(int operatorId, int operatorType, int inputCount, BufferAllocator allocator) { super(); this.allocator = allocator; this.operatorId = operatorId; this.operatorType = operatorType; + this.inputCount = inputCount; this.recordsReceivedByInput = new long[inputCount]; this.batchesReceivedByInput = new long[inputCount]; this.schemaCountByInput = new long[inputCount]; @@ -71,6 +98,44 @@ public class OperatorStats { private String assertionError(String msg){ return String.format("Failure while %s for operator id %d. Currently have states of processing:%s, setup:%s, waiting:%s.", msg, operatorId, inProcessing, inSetup, inWait); } + /** + * OperatorStats merger - to merge stats from other OperatorStats + * this is needed in case some processing is multithreaded that needs to have + * separate OperatorStats to deal with + * WARN - this will only work for metrics that can be added + * @param from - OperatorStats from where to merge to "this" + * @return OperatorStats - for convenience so one can merge multiple stats in one go + */ + public OperatorStats mergeMetrics(OperatorStats from) { + final IntLongOpenHashMap fromMetrics = from.longMetrics; + + final Iterator iter = fromMetrics.iterator(); + while (iter.hasNext()) { + final IntLongCursor next = iter.next(); + longMetrics.putOrAdd(next.key, next.value, next.value); + } + + final IntDoubleOpenHashMap fromDMetrics = from.doubleMetrics; + final Iterator iterD = fromDMetrics.iterator(); + + while (iterD.hasNext()) { + final IntDoubleCursor next = iterD.next(); + doubleMetrics.putOrAdd(next.key, next.value, next.value); + } + return this; + } + + /** + * Clear stats + */ + public void clear() { + processingNanos = 0l; + setupNanos = 0l; + waitNanos = 0l; + longMetrics.clear(); + doubleMetrics.clear(); + } + public void startSetup() { assert !inSetup : assertionError("starting setup"); stopProcessing(); @@ -183,4 +248,20 @@ public class OperatorStats { doubleMetrics.put(metric.metricId(), value); } + public long getWaitNanos() { + return waitNanos; + } + + /** + * Adjust waitNanos based on client calculations + * @param waitNanosOffset - could be negative as well as positive + */ + public void adjustWaitNanos(long waitNanosOffset) { + this.waitNanos += waitNanosOffset; + } + + public long getProcessingNanos() { + return processingNanos; + } + } http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java index 64cf7c5..b8ecae4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java @@ -26,7 +26,7 @@ public class IteratorValidator extends AbstractSingle{ public IteratorValidator(PhysicalOperator child) { super(child); - + setCost(child.getCost()); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java index 7af7b65..3920f9c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.physical.impl; import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; /** * Account for whether all messages sent have been completed. Necessary before finishing a task so we don't think @@ -28,11 +29,11 @@ import java.util.concurrent.Semaphore; public class SendingAccountor { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SendingAccountor.class); - private int batchesSent = 0; + private final AtomicInteger batchesSent = new AtomicInteger(0); private Semaphore wait = new Semaphore(0); public void increment() { - batchesSent++; + batchesSent.incrementAndGet(); } public void decrement() { @@ -41,8 +42,8 @@ public class SendingAccountor { public synchronized void waitForSendComplete() { try { - wait.acquire(batchesSent); - batchesSent = 0; + wait.acquire(batchesSent.get()); + batchesSent.set(0); } catch (InterruptedException e) { logger.warn("Failure while waiting for send complete.", e); } http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index abf9cbc..6a73cdd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicIntegerArray; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.ClassGenerator; @@ -34,10 +35,12 @@ import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.physical.MinorFragmentEndpoint; import org.apache.drill.exec.physical.config.HashPartitionSender; import org.apache.drill.exec.physical.impl.BaseRootExec; import org.apache.drill.exec.physical.impl.SendingAccountor; +import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; @@ -46,8 +49,10 @@ import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch.IterOutcome; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.rpc.data.DataTunnel; +import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.vector.CopyUtil; +import com.google.common.annotations.VisibleForTesting; import com.sun.codemodel.JExpr; import com.sun.codemodel.JExpression; import com.sun.codemodel.JType; @@ -57,13 +62,15 @@ public class PartitionSenderRootExec extends BaseRootExec { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionSenderRootExec.class); private RecordBatch incoming; private HashPartitionSender operator; - private Partitioner partitioner; + private PartitionerDecorator partitioner; + private FragmentContext context; private boolean ok = true; private final SendingAccountor sendCount = new SendingAccountor(); private final int outGoingBatchCount; private final HashPartitionSender popConfig; private final StatusHandler statusHandler; + private final double cost; private final AtomicIntegerArray remainingReceivers; private final AtomicInteger remaingReceiverCount; @@ -72,6 +79,8 @@ public class PartitionSenderRootExec extends BaseRootExec { long minReceiverRecordCount = Long.MAX_VALUE; long maxReceiverRecordCount = Long.MIN_VALUE; + protected final int numberPartitions; + protected final int actualPartitions; public enum Metric implements MetricDef { BATCHES_SENT, @@ -79,7 +88,9 @@ public class PartitionSenderRootExec extends BaseRootExec { MIN_RECORDS, MAX_RECORDS, N_RECEIVERS, - BYTES_SENT; + BYTES_SENT, + SENDING_THREADS_COUNT, + COST; @Override public int metricId() { @@ -99,8 +110,32 @@ public class PartitionSenderRootExec extends BaseRootExec { this.statusHandler = new StatusHandler(sendCount, context); this.remainingReceivers = new AtomicIntegerArray(outGoingBatchCount); this.remaingReceiverCount = new AtomicInteger(outGoingBatchCount); - stats.setLongStat(Metric.N_RECEIVERS, outGoingBatchCount); + // Algorithm to figure out number of threads to parallelize output + // numberOfRows/sliceTarget/numReceivers/threadfactor + // threadFactor = 4 by default + // one more param to put a limit on number max number of threads: default 32 + this.cost = operator.getChild().getCost(); + final OptionManager optMgr = context.getOptions(); + long sliceTarget = optMgr.getOption(ExecConstants.SLICE_TARGET).num_val; + int threadFactor = optMgr.getOption(PlannerSettings.PARTITION_SENDER_THREADS_FACTOR.getOptionName()).num_val.intValue(); + int tmpParts = 1; + if ( sliceTarget != 0 && outGoingBatchCount != 0 ) { + tmpParts = (int) Math.round((((cost / (sliceTarget*1.0)) / (outGoingBatchCount*1.0)) / (threadFactor*1.0))); + if ( tmpParts < 1) { + tmpParts = 1; + } + } + final int imposedThreads = optMgr.getOption(PlannerSettings.PARTITION_SENDER_SET_THREADS.getOptionName()).num_val.intValue(); + if (imposedThreads > 0 ) { + this.numberPartitions = imposedThreads; + } else { + this.numberPartitions = Math.min(tmpParts, optMgr.getOption(PlannerSettings.PARTITION_SENDER_MAX_THREADS.getOptionName()).num_val.intValue()); + } + logger.info("Preliminary number of sending threads is: " + numberPartitions); + this.actualPartitions = outGoingBatchCount > numberPartitions ? numberPartitions : outGoingBatchCount; + this.stats.setLongStat(Metric.SENDING_THREADS_COUNT, actualPartitions); + this.stats.setDoubleStat(Metric.COST, this.cost); } @Override @@ -189,8 +224,28 @@ public class PartitionSenderRootExec extends BaseRootExec { } } - private void createPartitioner() throws SchemaChangeException { + @VisibleForTesting + protected void createPartitioner() throws SchemaChangeException { + final int divisor = Math.max(1, outGoingBatchCount/actualPartitions); + final int longTail = outGoingBatchCount % actualPartitions; + + final List subPartitioners = createClassInstances(actualPartitions); + int startIndex = 0; + int endIndex = 0; + for (int i = 0; i < actualPartitions; i++) { + startIndex = endIndex; + endIndex = (i < actualPartitions - 1 ) ? startIndex + divisor : outGoingBatchCount; + if ( i < longTail ) { + endIndex++; + } + final OperatorStats partitionStats = new OperatorStats(stats, true); + subPartitioners.get(i).setup(context, incoming, popConfig, partitionStats, sendCount, oContext, statusHandler, + startIndex, endIndex); + } + partitioner = new PartitionerDecorator(subPartitioners, stats, context); + } + private List createClassInstances(int actualPartitions) throws SchemaChangeException { // set up partitioning function final LogicalExpression expr = operator.getExpr(); final ErrorCollector collector = new ErrorCollectorImpl(); @@ -218,8 +273,8 @@ public class PartitionSenderRootExec extends BaseRootExec { try { // compile and setup generated code - partitioner = context.getImplementationClass(cg); - partitioner.setup(context, incoming, popConfig, stats, sendCount, oContext, statusHandler); + List subPartitioners = context.getImplementationClass(cg, actualPartitions); + return subPartitioners; } catch (ClassTransformationException | IOException e) { throw new SchemaChangeException("Failure while attempting to load generated class", e); @@ -228,13 +283,14 @@ public class PartitionSenderRootExec extends BaseRootExec { /** * Find min and max record count seen across the outgoing batches and put them in stats. - * @param outgoing */ - private void updateAggregateStats(List outgoing) { - for (PartitionOutgoingBatch o : outgoing) { - long totalRecords = o.getTotalRecords(); - minReceiverRecordCount = Math.min(minReceiverRecordCount, totalRecords); - maxReceiverRecordCount = Math.max(maxReceiverRecordCount, totalRecords); + private void updateAggregateStats() { + for (Partitioner part : partitioner.getPartitioners() ) { + for (PartitionOutgoingBatch o : part.getOutgoingBatches()) { + long totalRecords = o.getTotalRecords(); + minReceiverRecordCount = Math.min(minReceiverRecordCount, totalRecords); + maxReceiverRecordCount = Math.max(maxReceiverRecordCount, totalRecords); + } } stats.setLongStat(Metric.MIN_RECORDS, minReceiverRecordCount); stats.setLongStat(Metric.MAX_RECORDS, maxReceiverRecordCount); @@ -242,9 +298,9 @@ public class PartitionSenderRootExec extends BaseRootExec { @Override public void receivingFragmentFinished(FragmentHandle handle) { - int id = handle.getMinorFragmentId(); + final int id = handle.getMinorFragmentId(); if (remainingReceivers.compareAndSet(id, 0, 1)) { - partitioner.getOutgoingBatches().get(handle.getMinorFragmentId()).terminate(); + partitioner.getOutgoingBatches(id).terminate(); int remaining = remaingReceiverCount.decrementAndGet(); if (remaining == 0) { done = true; @@ -256,7 +312,7 @@ public class PartitionSenderRootExec extends BaseRootExec { logger.debug("Partition sender stopping."); ok = false; if (partitioner != null) { - updateAggregateStats(partitioner.getOutgoingBatches()); + updateAggregateStats(); partitioner.clear(); } sendCount.waitForSendComplete(); @@ -299,4 +355,9 @@ public class PartitionSenderRootExec extends BaseRootExec { } stats.addLongStat(Metric.BATCHES_SENT, 1); } + + @VisibleForTesting + protected PartitionerDecorator getPartitioner() { + return partitioner; + } } http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java index 5ed9c39..9d6e98f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java @@ -37,13 +37,22 @@ public interface Partitioner { OperatorStats stats, SendingAccountor sendingAccountor, OperatorContext oContext, - StatusHandler statusHandler) throws SchemaChangeException; + StatusHandler statusHandler, + int start, int count) throws SchemaChangeException; public abstract void partitionBatch(RecordBatch incoming) throws IOException; public abstract void flushOutgoingBatches(boolean isLastBatch, boolean schemaChanged) throws IOException; public abstract void initialize(); public abstract void clear(); public abstract List getOutgoingBatches(); + /** + * Method to get PartitionOutgoingBatch based on the fact that there can be > 1 Partitioner + * @param minorFragmentIndex + * @return PartitionOutgoingBatch that matches index within Partitioner. This method can + * return null if index does not fall within boundary of this Partitioner + */ + public abstract PartitionOutgoingBatch getOutgoingBatch(int index); + public abstract OperatorStats getStats(); public static TemplateClassDefinition TEMPLATE_DEFINITION = new TemplateClassDefinition<>(Partitioner.class, PartitionerTemplate.class); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java new file mode 100644 index 0000000..c3261dc --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java @@ -0,0 +1,282 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.partitionsender; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; + +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorStats; +import org.apache.drill.exec.record.RecordBatch; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; + +/** + * Decorator class to hide multiple Partitioner existence from the caller + * since this class involves multithreaded processing of incoming batches + * as well as flushing it needs special handling of OperatorStats - stats + * since stats are not suitable for use in multithreaded environment + * The algorithm to figure out processing versus wait time is based on following formula: + * totalWaitTime = totalAllPartitionersProcessingTime - max(sum(processingTime) by partitioner) + */ +public class PartitionerDecorator { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionerDecorator.class); + + private List partitioners; + private final OperatorStats stats; + private final String tName; + private final String childThreadPrefix; + private final ExecutorService executor; + + + public PartitionerDecorator(List partitioners, OperatorStats stats, FragmentContext context) { + this.partitioners = partitioners; + this.stats = stats; + this.executor = context.getDrillbitContext().getExecutor(); + this.tName = Thread.currentThread().getName(); + this.childThreadPrefix = "Partitioner-" + tName + "-"; + } + + /** + * partitionBatch - decorator method to call real Partitioner(s) to process incoming batch + * uses either threading or not threading approach based on number Partitioners + * @param incoming + * @throws IOException + */ + public void partitionBatch(final RecordBatch incoming) throws IOException { + executeMethodLogic(new PartitionBatchHandlingClass(incoming)); + } + + /** + * flushOutgoingBatches - decorator to call real Partitioner(s) flushOutgoingBatches + * @param isLastBatch + * @param schemaChanged + * @throws IOException + */ + public void flushOutgoingBatches(final boolean isLastBatch, final boolean schemaChanged) throws IOException { + executeMethodLogic(new FlushBatchesHandlingClass(isLastBatch, schemaChanged)); + } + + /** + * decorator method to call multiple Partitioners initialize() + */ + public void initialize() { + for (Partitioner part : partitioners ) { + part.initialize(); + } + } + + /** + * decorator method to call multiple Partitioners clear() + */ + public void clear() { + for (Partitioner part : partitioners ) { + part.clear(); + } + } + + /** + * Helper method to get PartitionOutgoingBatch based on the index + * since we may have more then one Partitioner + * As number of Partitioners should be very small AND this method it used very rarely, + * so it is OK to loop in order to find right partitioner + * @param index - index of PartitionOutgoingBatch + * @return PartitionOutgoingBatch + */ + public PartitionOutgoingBatch getOutgoingBatches(int index) { + for (Partitioner part : partitioners ) { + PartitionOutgoingBatch outBatch = part.getOutgoingBatch(index); + if ( outBatch != null ) { + return outBatch; + } + } + return null; + } + + @VisibleForTesting + protected List getPartitioners() { + return partitioners; + } + + /** + * Helper to execute the different methods wrapped into same logic + * @param iface + * @throws IOException + */ + protected void executeMethodLogic(final GeneralExecuteIface iface) throws IOException { + if (partitioners.size() == 1 ) { + // no need for threads + final OperatorStats localStatsSingle = partitioners.get(0).getStats(); + localStatsSingle.clear(); + localStatsSingle.startProcessing(); + try { + iface.execute(partitioners.get(0)); + } finally { + localStatsSingle.stopProcessing(); + stats.mergeMetrics(localStatsSingle); + // since main stats did not have any wait time - adjust based of partitioner stats wait time + // main stats processing time started recording in BaseRootExec + stats.adjustWaitNanos(localStatsSingle.getWaitNanos()); + } + return; + } + + long maxProcessTime = 0l; + // start waiting on main stats to adjust by sum(max(processing)) at the end + stats.startWait(); + final CountDownLatch latch = new CountDownLatch(partitioners.size()); + final List runnables = Lists.newArrayList(); + try { + int i = 0; + for (final Partitioner part : partitioners ) { + runnables.add(new CustomRunnable(childThreadPrefix, latch, iface, part)); + executor.submit(runnables.get(i++)); + } + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + IOException excep = null; + for (final CustomRunnable runnable : runnables ) { + IOException myException = runnable.getException(); + if ( myException != null ) { + if ( excep == null ) { + excep = myException; + } else { + excep.addSuppressed(myException); + } + } + final OperatorStats localStats = runnable.getPart().getStats(); + long currentProcessingNanos = localStats.getProcessingNanos(); + // find out max Partitioner processing time + maxProcessTime = (currentProcessingNanos > maxProcessTime) ? currentProcessingNanos : maxProcessTime; + stats.mergeMetrics(localStats); + } + if ( excep != null ) { + throw excep; + } + } finally { + stats.stopWait(); + // scale down main stats wait time based on calculated processing time + // since we did not wait for whole duration of above execution + stats.adjustWaitNanos(-maxProcessTime); + } + + } + + /** + * Helper interface to generalize functionality executed in the thread + * since it is absolutely the same for partitionBatch and flushOutgoingBatches + * protected is for testing purposes + */ + protected interface GeneralExecuteIface { + public void execute(Partitioner partitioner) throws IOException; + } + + /** + * Class to handle running partitionBatch method + * + */ + private static class PartitionBatchHandlingClass implements GeneralExecuteIface { + + private final RecordBatch incoming; + + public PartitionBatchHandlingClass(RecordBatch incoming) { + this.incoming = incoming; + } + + @Override + public void execute(Partitioner part) throws IOException { + part.partitionBatch(incoming); + } + } + + /** + * Class to handle running flushOutgoingBatches method + * + */ + private static class FlushBatchesHandlingClass implements GeneralExecuteIface { + + private final boolean isLastBatch; + private final boolean schemaChanged; + + public FlushBatchesHandlingClass(boolean isLastBatch, boolean schemaChanged) { + this.isLastBatch = isLastBatch; + this.schemaChanged = schemaChanged; + } + + @Override + public void execute(Partitioner part) throws IOException { + part.flushOutgoingBatches(isLastBatch, schemaChanged); + } + } + + /** + * Helper class to wrap Runnable with customized naming + * Exception handling + * + */ + private static class CustomRunnable implements Runnable { + + private final String parentThreadName; + private final CountDownLatch latch; + private final GeneralExecuteIface iface; + private final Partitioner part; + private volatile IOException exp; + + public CustomRunnable(String parentThreadName, CountDownLatch latch, GeneralExecuteIface iface, Partitioner part) { + this.parentThreadName = parentThreadName; + this.latch = latch; + this.iface = iface; + this.part = part; + } + + @Override + public void run() { + final Thread currThread = Thread.currentThread(); + final String currThreadName = currThread.getName(); + final OperatorStats localStats = part.getStats(); + try { + final String newThreadName = parentThreadName + currThread.getId(); + currThread.setName(newThreadName); + localStats.clear(); + localStats.startProcessing(); + iface.execute(part); + } catch (IOException e) { + exp = e; + } finally { + localStats.stopProcessing(); + currThread.setName(currThreadName); + latch.countDown(); + } + } + + public IOException getException() { + return this.exp; + } + + public Partitioner getPart() { + return part; + } + } + } http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java index 93d719c..33d6f95 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java @@ -61,6 +61,9 @@ public abstract class PartitionerTemplate implements Partitioner { private SelectionVector2 sv2; private SelectionVector4 sv4; private RecordBatch incoming; + private OperatorStats stats; + private int start; + private int end; private List outgoingBatches = Lists.newArrayList(); private int outgoingRecordBatchSize = DEFAULT_RECORD_BATCH_SIZE; @@ -74,15 +77,27 @@ public abstract class PartitionerTemplate implements Partitioner { } @Override + public PartitionOutgoingBatch getOutgoingBatch(int index) { + if ( index >= start && index < end) { + return outgoingBatches.get(index - start); + } + return null; + } + + @Override public final void setup(FragmentContext context, RecordBatch incoming, HashPartitionSender popConfig, OperatorStats stats, SendingAccountor sendingAccountor, OperatorContext oContext, - StatusHandler statusHandler) throws SchemaChangeException { + StatusHandler statusHandler, + int start, int end) throws SchemaChangeException { this.incoming = incoming; + this.stats = stats; + this.start = start; + this.end = end; doSetup(context, incoming, null); // Half the outgoing record batch size if the number of senders exceeds 1000 to reduce the total amount of memory @@ -92,9 +107,15 @@ public abstract class PartitionerTemplate implements Partitioner { outgoingRecordBatchSize = (DEFAULT_RECORD_BATCH_SIZE + 1)/2 - 1; } + int fieldId = 0; for (MinorFragmentEndpoint destination : popConfig.getDestinations()) { - outgoingBatches.add(new OutgoingRecordBatch(stats, sendingAccountor, popConfig, + // create outgoingBatches only for subset of Destination Points + if ( fieldId >= start && fieldId < end ) { + logger.debug("start: {}, count: {}, fieldId: {}", start, end, fieldId); + outgoingBatches.add(new OutgoingRecordBatch(stats, sendingAccountor, popConfig, context.getDataTunnel(destination.getEndpoint()), context, oContext.getAllocator(), destination.getId(), statusHandler)); + } + fieldId++; } for (OutgoingRecordBatch outgoingRecordBatch : outgoingBatches) { @@ -119,6 +140,11 @@ public abstract class PartitionerTemplate implements Partitioner { } } + @Override + public OperatorStats getStats() { + return stats; + } + /** * Flush each outgoing record batch, and optionally reset the state of each outgoing record * batch (on schema change). Note that the schema is updated based on incoming at the time @@ -150,24 +176,21 @@ public abstract class PartitionerTemplate implements Partitioner { switch(svMode) { case NONE: for (int recordId = 0; recordId < incoming.getRecordCount(); ++recordId) { - OutgoingRecordBatch outgoingBatch = outgoingBatches.get(doEval(recordId)); - outgoingBatch.copy(recordId); + doCopy(recordId); } break; case TWO_BYTE: for (int recordId = 0; recordId < incoming.getRecordCount(); ++recordId) { int svIndex = sv2.getIndex(recordId); - OutgoingRecordBatch outgoingBatch = outgoingBatches.get(doEval(svIndex)); - outgoingBatch.copy(svIndex); + doCopy(svIndex); } break; case FOUR_BYTE: for (int recordId = 0; recordId < incoming.getRecordCount(); ++recordId) { int svIndex = sv4.get(recordId); - OutgoingRecordBatch outgoingBatch = outgoingBatches.get(doEval(svIndex)); - outgoingBatch.copy(svIndex); + doCopy(svIndex); } break; @@ -176,6 +199,20 @@ public abstract class PartitionerTemplate implements Partitioner { } } + /** + * Helper method to copy data based on partition + * @param svIndex + * @param incoming + * @throws IOException + */ + private void doCopy(int svIndex) throws IOException { + int index = doEval(svIndex); + if ( index >= start && index < end) { + OutgoingRecordBatch outgoingBatch = outgoingBatches.get(index - start); + outgoingBatch.copy(svIndex); + } + } + @Override public void clear() { for (OutgoingRecordBatch outgoingRecordBatch : outgoingBatches) { @@ -372,5 +409,6 @@ public abstract class PartitionerTemplate implements Partitioner { public void clear(){ vectorContainer.clear(); } + } } http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java index 9b0944e..edec7e4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java @@ -49,6 +49,7 @@ public class Materializer extends AbstractPhysicalVisitor operators = physicalPlan.getSortedOperators(false); + + // get HashToRandomExchange physical operator + HashToRandomExchange hashToRandomExchange = null; + for ( PhysicalOperator operator : operators) { + if ( operator instanceof HashToRandomExchange) { + hashToRandomExchange = (HashToRandomExchange) operator; + break; + } + } + + final OptionList options = new OptionList(); + // try multiple scenarios with different set of options + options.add(OptionValue.createLong(OptionType.SESSION, "planner.slice_target", 1)); + testThreadsHelper(hashToRandomExchange, drillbitContext, options, + incoming, registry, planReader, planningSet, rootFragment, 2); + + options.clear(); + options.add(OptionValue.createLong(OptionType.SESSION, "planner.slice_target", 1)); + options.add(OptionValue.createLong(OptionType.SESSION, "planner.partitioner_sender_max_threads", 10)); + hashToRandomExchange.setCost(1000); + testThreadsHelper(hashToRandomExchange, drillbitContext, options, + incoming, registry, planReader, planningSet, rootFragment, 10); + + options.clear(); + options.add(OptionValue.createLong(OptionType.SESSION, "planner.slice_target", 1000)); + options.add(OptionValue.createLong(OptionType.SESSION, "planner.partitioner_sender_threads_factor",2)); + hashToRandomExchange.setCost(14000); + testThreadsHelper(hashToRandomExchange, drillbitContext, options, + incoming, registry, planReader, planningSet, rootFragment, 2); + } + + /** + * Core of the testing + * @param hashToRandomExchange + * @param drillbitContext + * @param options + * @param incoming + * @param registry + * @param planReader + * @param planningSet + * @param rootFragment + * @param expectedThreadsCount + * @throws Exception + */ + private void testThreadsHelper(HashToRandomExchange hashToRandomExchange, DrillbitContext drillbitContext, OptionList options, + RecordBatch incoming, FunctionImplementationRegistry registry, PhysicalPlanReader planReader, PlanningSet planningSet, Fragment rootFragment, + int expectedThreadsCount) throws Exception { + + long queryStartTime = System.currentTimeMillis(); + int timeZone = DateUtility.getIndex(System.getProperty("user.timezone")); + QueryDateTimeInfo queryDateTimeInfo = new QueryDateTimeInfo(queryStartTime, timeZone); + + final QueryWorkUnit qwu = PARALLELIZER.getFragments(options, drillbitContext.getEndpoint(), + QueryId.getDefaultInstance(), + drillbitContext.getBits(), planReader, rootFragment, USER_SESSION, queryDateTimeInfo); + + final List mfEndPoints = PhysicalOperatorUtil.getIndexOrderedEndpoints(Lists.newArrayList(drillbitContext.getBits())); + + for(PlanFragment planFragment : qwu.getFragments()) { + if (!planFragment.getFragmentJson().contains("hash-partition-sender")) { + continue; + } + MockPartitionSenderRootExec partionSenderRootExec = null; + FragmentContext context = null; + try { + context = new FragmentContext(drillbitContext, planFragment, null, registry); + final int majorFragmentId = planFragment.getHandle().getMajorFragmentId(); + final HashPartitionSender partSender = new HashPartitionSender(majorFragmentId, hashToRandomExchange, hashToRandomExchange.getExpression(), mfEndPoints); + partionSenderRootExec = new MockPartitionSenderRootExec(context, incoming, partSender); + assertEquals("Number of threads calculated", expectedThreadsCount, partionSenderRootExec.getNumberPartitions()); + + partionSenderRootExec.createPartitioner(); + final PartitionerDecorator partDecor = partionSenderRootExec.getPartitioner(); + assertNotNull(partDecor); + + List partitioners = partDecor.getPartitioners(); + assertNotNull(partitioners); + final int actualThreads = DRILLBITS_COUNT > expectedThreadsCount ? expectedThreadsCount : DRILLBITS_COUNT; + assertEquals("Number of partitioners", actualThreads, partitioners.size()); + + for ( int i = 0; i < mfEndPoints.size(); i++) { + assertNotNull("PartitionOutgoingBatch", partDecor.getOutgoingBatches(i)); + } + + // check distribution of PartitionOutgoingBatch - should be even distribution + boolean isFirst = true; + int prevBatchCountSize = 0; + int batchCountSize = 0; + for (Partitioner part : partitioners ) { + final List outBatch = (List) part.getOutgoingBatches(); + batchCountSize = outBatch.size(); + if ( !isFirst ) { + assertTrue(Math.abs(batchCountSize - prevBatchCountSize) <= 1); + } else { + isFirst = false; + } + prevBatchCountSize = batchCountSize; + } + + partionSenderRootExec.getStats().startProcessing(); + try { + partDecor.partitionBatch(incoming); + } finally { + partionSenderRootExec.getStats().stopProcessing(); + } + if ( actualThreads == 1 ) { + assertEquals("With single thread parent and child waitNanos should match", partitioners.get(0).getStats().getWaitNanos(), partionSenderRootExec.getStats().getWaitNanos()); + } + + // testing values distribution + partitioners = partDecor.getPartitioners(); + isFirst = true; + // since we have fake Nullvector distribution is skewed + for (Partitioner part : partitioners ) { + final List outBatches = (List) part.getOutgoingBatches(); + for (PartitionOutgoingBatch partOutBatch : outBatches ) { + final int recordCount = ((VectorAccessible) partOutBatch).getRecordCount(); + if ( isFirst ) { + assertEquals("RecordCount", 100, recordCount); + isFirst = false; + } else { + assertEquals("RecordCount", 0, recordCount); + } + } + } + // test exceptions within threads + // test stats merging + partionSenderRootExec.getStats().startProcessing(); + try { + partDecor.executeMethodLogic(new InjectExceptionTest()); + fail("Should throw IOException here"); + } catch (IOException ioe) { + final OperatorProfile.Builder oPBuilder = OperatorProfile.newBuilder(); + partionSenderRootExec.getStats().addAllMetrics(oPBuilder); + final List metrics = oPBuilder.getMetricList(); + for ( MetricValue metric : metrics) { + if ( Metric.BYTES_SENT.metricId() == metric.getMetricId() ) { + assertEquals("Should add metricValue irrespective of exception", 5*actualThreads, metric.getLongValue()); + } + if (Metric.SENDING_THREADS_COUNT.metricId() == metric.getMetricId()) { + assertEquals(actualThreads, metric.getLongValue()); + } + } + assertEquals(actualThreads-1, ioe.getSuppressed().length); + } finally { + partionSenderRootExec.getStats().stopProcessing(); + } + } finally { + // cleanup + partionSenderRootExec.close(); + context.close(); + } + } + } + + @Test + /** + * Testing partitioners distribution algorithm + * @throws Exception + */ + public void testAlgorithm() throws Exception { + int outGoingBatchCount; + int numberPartitions; + int k = 0; + final Random rand = new Random(); + while ( k < 1000 ) { + outGoingBatchCount = rand.nextInt(1000)+1; + numberPartitions = rand.nextInt(32)+1; + final int actualPartitions = outGoingBatchCount > numberPartitions ? numberPartitions : outGoingBatchCount; + final int divisor = Math.max(1, outGoingBatchCount/actualPartitions); + + final int longTail = outGoingBatchCount % actualPartitions; + int startIndex = 0; + int endIndex = 0; + for (int i = 0; i < actualPartitions; i++) { + startIndex = endIndex; + endIndex = startIndex + divisor; + if ( i < longTail ) { + endIndex++; + } + } + assertTrue("endIndex can not be > outGoingBatchCount", endIndex == outGoingBatchCount ); + k++; + } + } + + /** + * Helper class to expose some functionality of PartitionSenderRootExec + * + */ + private static class MockPartitionSenderRootExec extends PartitionSenderRootExec { + + public MockPartitionSenderRootExec(FragmentContext context, + RecordBatch incoming, HashPartitionSender operator) + throws OutOfMemoryException { + super(context, incoming, operator); + } + + public void close() { + oContext.close(); + } + + public int getNumberPartitions() { + return numberPartitions; + } + + public OperatorStats getStats() { + return this.stats; + } + } + + /** + * Helper class to inject exceptions in the threads + * + */ + private static class InjectExceptionTest implements GeneralExecuteIface { + + @Override + public void execute(Partitioner partitioner) throws IOException { + // throws IOException + partitioner.getStats().addLongStat(Metric.BYTES_SENT, 5); + throw new IOException("Test exception handling"); + } + } +}