Return-Path: X-Original-To: apmail-drill-commits-archive@www.apache.org Delivered-To: apmail-drill-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0679E19E67 for ; Wed, 13 Apr 2016 18:27:42 +0000 (UTC) Received: (qmail 11303 invoked by uid 500); 13 Apr 2016 18:27:42 -0000 Delivered-To: apmail-drill-commits-archive@drill.apache.org Received: (qmail 11264 invoked by uid 500); 13 Apr 2016 18:27:41 -0000 Mailing-List: contact commits-help@drill.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: commits@drill.apache.org Delivered-To: mailing list commits@drill.apache.org Received: (qmail 11254 invoked by uid 99); 13 Apr 2016 18:27:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 13 Apr 2016 18:27:41 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AF5CBDFC55; Wed, 13 Apr 2016 18:27:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: venki@apache.org To: commits@drill.apache.org Date: Wed, 13 Apr 2016 18:27:41 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] drill git commit: DRILL-4593: Remove OldAssignmentCreator in FileSystemPlugin Repository: drill Updated Branches: refs/heads/master 9f4fff800 -> e9b6e8f3d DRILL-4593: Remove OldAssignmentCreator in FileSystemPlugin + Remove dead code in ParquetGroupScan this closes #473 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/e9b6e8f3 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/e9b6e8f3 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/e9b6e8f3 Branch: refs/heads/master Commit: e9b6e8f3ddadbd308b85ed6d88bcf878147ee77e Parents: 10afc70 Author: vkorukanti Authored: Thu Apr 7 14:23:07 2016 -0700 Committer: vkorukanti Committed: Wed Apr 13 10:36:21 2016 -0700 ---------------------------------------------------------------------- .../drill/exec/store/kudu/KuduGroupScan.java | 2 +- .../org/apache/drill/exec/ExecConstants.java | 3 - .../server/options/SystemOptionManager.java | 1 - .../exec/store/dfs/easy/EasyGroupScan.java | 2 +- .../exec/store/parquet/ParquetGroupScan.java | 39 +---- .../exec/store/schedule/AssignmentCreator.java | 13 +- .../store/schedule/OldAssignmentCreator.java | 141 ------------------- .../drill/exec/store/store/TestAssignment.java | 2 +- 8 files changed, 7 insertions(+), 196 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/e9b6e8f3/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java index ff4295d..873f216 100644 --- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java +++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java @@ -190,7 +190,7 @@ public class KuduGroupScan extends AbstractGroupScan { */ @Override public void applyAssignments(List incomingEndpoints) { - assignments = AssignmentCreator.getMappings(incomingEndpoints, kuduWorkList, storagePlugin.getContext()); + assignments = AssignmentCreator.getMappings(incomingEndpoints, kuduWorkList); } http://git-wip-us.apache.org/repos/asf/drill/blob/e9b6e8f3/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 963934d..a490116 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -267,9 +267,6 @@ public interface ExecConstants { OptionValidator NEW_VIEW_DEFAULT_PERMS_VALIDATOR = new StringValidator(NEW_VIEW_DEFAULT_PERMS_KEY, "700"); - String USE_OLD_ASSIGNMENT_CREATOR = "exec.schedule.assignment.old"; - OptionValidator USE_OLD_ASSIGNMENT_CREATOR_VALIDATOR = new BooleanValidator(USE_OLD_ASSIGNMENT_CREATOR, false); - String CTAS_PARTITIONING_HASH_DISTRIBUTE = "store.partition.hash_distribute"; BooleanValidator CTAS_PARTITIONING_HASH_DISTRIBUTE_VALIDATOR = new BooleanValidator(CTAS_PARTITIONING_HASH_DISTRIBUTE, false); http://git-wip-us.apache.org/repos/asf/drill/blob/e9b6e8f3/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index a596d3a..0abdb76 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -124,7 +124,6 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea ExecConstants.HASH_AGG_TABLE_FACTOR, ExecConstants.AVERAGE_FIELD_WIDTH, ExecConstants.NEW_VIEW_DEFAULT_PERMS_VALIDATOR, - ExecConstants.USE_OLD_ASSIGNMENT_CREATOR_VALIDATOR, ExecConstants.CTAS_PARTITIONING_HASH_DISTRIBUTE_VALIDATOR, ExecConstants.ADMIN_USERS_VALIDATOR, ExecConstants.ADMIN_USER_GROUPS_VALIDATOR, http://git-wip-us.apache.org/repos/asf/drill/blob/e9b6e8f3/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java index ebea2f4..7a80db3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java @@ -195,7 +195,7 @@ public class EasyGroupScan extends AbstractFileGroupScan{ @Override public void applyAssignments(List incomingEndpoints) { - mappings = AssignmentCreator.getMappings(incomingEndpoints, chunks, formatPlugin.getContext()); + mappings = AssignmentCreator.getMappings(incomingEndpoints, chunks); } private void createMappings(List affinities) { http://git-wip-us.apache.org/repos/asf/drill/blob/e9b6e8f3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java index 47172cc..5950b74 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java @@ -34,7 +34,6 @@ import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.types.Types; -import org.apache.drill.exec.metrics.DrillMetrics; import org.apache.drill.exec.physical.EndpointAffinity; import org.apache.drill.exec.physical.PhysicalOperatorSetupException; import org.apache.drill.exec.physical.base.AbstractFileGroupScan; @@ -46,7 +45,6 @@ import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.store.ParquetOutputRecordWriter; import org.apache.drill.exec.store.StoragePluginRegistry; -import org.apache.drill.exec.store.TimedRunnable; import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.dfs.DrillPathFilter; import org.apache.drill.exec.store.dfs.FileSelection; @@ -59,7 +57,6 @@ import org.apache.drill.exec.store.parquet.Metadata.ParquetTableMetadataBase; import org.apache.drill.exec.store.parquet.Metadata.RowGroupMetadata; import org.apache.drill.exec.store.schedule.AffinityCreator; import org.apache.drill.exec.store.schedule.AssignmentCreator; -import org.apache.drill.exec.store.schedule.BlockMapBuilder; import org.apache.drill.exec.store.schedule.CompleteWork; import org.apache.drill.exec.store.schedule.EndpointByteMap; import org.apache.drill.exec.store.schedule.EndpointByteMapImpl; @@ -87,14 +84,12 @@ import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import org.joda.time.DateTimeUtils; -import com.codahale.metrics.MetricRegistry; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; -import com.google.common.base.Stopwatch; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; @@ -104,12 +99,8 @@ import com.google.common.collect.Sets; @JsonTypeName("parquet-scan") public class ParquetGroupScan extends AbstractFileGroupScan { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetGroupScan.class); - static final MetricRegistry metrics = DrillMetrics.getInstance(); - static final String READ_FOOTER_TIMER = MetricRegistry.name(ParquetGroupScan.class, "readFooter"); - private final List entries; - private final Stopwatch watch = Stopwatch.createUnstarted(); private final ParquetFormatPlugin formatPlugin; private final ParquetFormatConfig formatConfig; private final DrillFileSystem fs; @@ -716,8 +707,6 @@ public class ParquetGroupScan extends AbstractFileGroupScan { if (column.getNulls() != null) { Long newCount = rowCount - column.getNulls(); columnValueCounts.put(schemaPath, columnValueCounts.get(schemaPath) + newCount); - } else { - } } } else { @@ -790,36 +779,10 @@ public class ParquetGroupScan extends AbstractFileGroupScan { } } - private class BlockMapper extends TimedRunnable { - private final BlockMapBuilder bmb; - private final RowGroupInfo rgi; - - public BlockMapper(BlockMapBuilder bmb, RowGroupInfo rgi) { - super(); - this.bmb = bmb; - this.rgi = rgi; - } - - @Override - protected Void runInner() throws Exception { - EndpointByteMap ebm = bmb.getEndpointByteMap(rgi); - rgi.setEndpointByteMap(ebm); - return null; - } - - @Override - protected IOException convertToIOException(Exception e) { - return new IOException(String.format( - "Failure while trying to get block locations for file %s starting at %d.", rgi.getPath(), - rgi.getStart())); - } - - } - @Override public void applyAssignments(List incomingEndpoints) throws PhysicalOperatorSetupException { - this.mappings = AssignmentCreator.getMappings(incomingEndpoints, rowGroupInfos, formatPlugin.getContext()); + this.mappings = AssignmentCreator.getMappings(incomingEndpoints, rowGroupInfos); } @Override public ParquetRowGroupScan getSpecificScan(int minorFragmentId) { http://git-wip-us.apache.org/repos/asf/drill/blob/e9b6e8f3/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java index 632cf66..eed200e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java @@ -27,9 +27,7 @@ import java.util.Map.Entry; import java.util.concurrent.TimeUnit; import org.apache.drill.common.exceptions.DrillRuntimeException; -import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; -import org.apache.drill.exec.server.DrillbitContext; import com.carrotsearch.hppc.cursors.ObjectLongCursor; import com.google.common.base.Stopwatch; @@ -91,14 +89,9 @@ public class AssignmentCreator { * @param units the list of work units to be assigned * @return A multimap that maps each minor fragment id to a list of work units */ - public static ListMultimap getMappings(List incomingEndpoints, List units, DrillbitContext context) { - boolean useOldAssignmentCode = context == null ? false : context.getOptionManager().getOption(ExecConstants.USE_OLD_ASSIGNMENT_CREATOR).bool_val; - if (useOldAssignmentCode) { - return OldAssignmentCreator.getMappings(incomingEndpoints, units); - } else { - AssignmentCreator creator = new AssignmentCreator<>(incomingEndpoints, units); - return creator.getMappings(); - } + public static ListMultimap getMappings(List incomingEndpoints, List units) { + AssignmentCreator creator = new AssignmentCreator<>(incomingEndpoints, units); + return creator.getMappings(); } /** http://git-wip-us.apache.org/repos/asf/drill/blob/e9b6e8f3/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/OldAssignmentCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/OldAssignmentCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/OldAssignmentCreator.java deleted file mode 100644 index 48bb5f3..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/OldAssignmentCreator.java +++ /dev/null @@ -1,141 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.schedule; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; - -import com.google.common.base.Preconditions; -import com.google.common.base.Stopwatch; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.ListMultimap; -import com.google.common.collect.Lists; - -/** - * The OldAssignmentCreator is responsible for assigning a set of work units to the available slices. - */ -public class OldAssignmentCreator { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AssignmentCreator.class); - - static final double[] ASSIGNMENT_CUTOFFS = { 0.99, 0.50, 0.25, 0.00 }; - private final ArrayListMultimap mappings; - private final List endpoints; - - - - - /** - * Given a set of endpoints to assign work to, attempt to evenly assign work based on affinity of work units to - * Drillbits. - * - * @param incomingEndpoints - * The set of nodes to assign work to. Note that nodes can be listed multiple times if we want to have - * multiple slices on a node working on the task simultaneously. - * @param units - * The work units to assign. - * @return ListMultimap of Integer > List (based on their incoming order) to with - */ - public static ListMultimap getMappings(List incomingEndpoints, - List units) { - OldAssignmentCreator creator = new OldAssignmentCreator(incomingEndpoints, units); - return creator.mappings; - } - - OldAssignmentCreator(List incomingEndpoints, List units) { - logger.debug("Assigning {} units to {} endpoints", units.size(), incomingEndpoints.size()); - Stopwatch watch = Stopwatch.createUnstarted(); - - Preconditions.checkArgument(incomingEndpoints.size() <= units.size(), String.format("Incoming endpoints %d " - + "is greater than number of row groups %d", incomingEndpoints.size(), units.size())); - this.mappings = ArrayListMultimap.create(); - this.endpoints = Lists.newLinkedList(incomingEndpoints); - - ArrayList rowGroupList = new ArrayList<>(units); - for (double cutoff : ASSIGNMENT_CUTOFFS) { - scanAndAssign(rowGroupList, cutoff, false, false); - } - scanAndAssign(rowGroupList, 0.0, true, false); - scanAndAssign(rowGroupList, 0.0, true, true); - - logger.debug("Took {} ms to apply assignments", watch.elapsed(TimeUnit.MILLISECONDS)); - Preconditions.checkState(rowGroupList.isEmpty(), "All readEntries should be assigned by now, but some are still unassigned"); - Preconditions.checkState(!units.isEmpty()); - - } - - /** - * - * @param mappings - * the mapping between fragment/endpoint and rowGroup - * @param endpoints - * the list of drillbits, ordered by the corresponding fragment - * @param workunits - * the list of rowGroups to assign - * @param requiredPercentage - * the percentage of max bytes required to make an assignment - * @param assignAll - * if true, will assign even if no affinity - */ - private void scanAndAssign(List workunits, double requiredPercentage, boolean assignAllToEmpty, boolean assignAll) { - Collections.sort(workunits); - int fragmentPointer = 0; - final boolean requireAffinity = requiredPercentage > 0; - int maxAssignments = (int) (workunits.size() / endpoints.size()); - - if (maxAssignments < 1) { - maxAssignments = 1; - } - - for (Iterator iter = workunits.iterator(); iter.hasNext();) { - T unit = iter.next(); - for (int i = 0; i < endpoints.size(); i++) { - int minorFragmentId = (fragmentPointer + i) % endpoints.size(); - DrillbitEndpoint currentEndpoint = endpoints.get(minorFragmentId); - EndpointByteMap endpointByteMap = unit.getByteMap(); - boolean haveAffinity = endpointByteMap.isSet(currentEndpoint); - - if (assignAll - || (assignAllToEmpty && !mappings.containsKey(minorFragmentId)) - || (!endpointByteMap.isEmpty() && (!requireAffinity || haveAffinity) - && (!mappings.containsKey(minorFragmentId) || mappings.get(minorFragmentId).size() < maxAssignments) && (!requireAffinity || endpointByteMap - .get(currentEndpoint) >= endpointByteMap.getMaxBytes() * requiredPercentage))) { - - mappings.put(minorFragmentId, unit); - logger.debug("Assigned unit: {} to minorFragmentId: {}", unit, minorFragmentId); - // logger.debug("Assigned rowGroup {} to minorFragmentId {} endpoint {}", rowGroupInfo.getRowGroupIndex(), - // minorFragmentId, endpoints.get(minorFragmentId).getAddress()); - // if (bytesPerEndpoint.get(currentEndpoint) != null) { - // // assignmentAffinityStats.update(bytesPerEndpoint.get(currentEndpoint) / rowGroupInfo.getLength()); - // } else { - // // assignmentAffinityStats.update(0); - // } - iter.remove(); - fragmentPointer = (minorFragmentId + 1) % endpoints.size(); - break; - } - } - - } - } - -} http://git-wip-us.apache.org/repos/asf/drill/blob/e9b6e8f3/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java index 1efc793..65d8cf7 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java @@ -63,7 +63,7 @@ public class TestAssignment { incomingEndpoints.add(incomingEndpointsIterator.next()); } - ListMultimap mappings = AssignmentCreator.getMappings(incomingEndpoints, chunks, null); + ListMultimap mappings = AssignmentCreator.getMappings(incomingEndpoints, chunks); System.out.println(mappings.keySet().size()); for (int i = 0; i < width; i++) { Assert.assertTrue("no mapping for entry " + i, mappings.get(i) != null && mappings.get(i).size() > 0);