drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From arina-ielchiieva <...@git.apache.org>
Subject [GitHub] drill pull request #928: DRILL-5716: Queue-driven memory allocation
Date Thu, 05 Oct 2017 15:28:06 GMT
Github user arina-ielchiieva commented on a diff in the pull request:

    https://github.com/apache/drill/pull/928#discussion_r142954623
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java
---
    @@ -0,0 +1,342 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.work.foreman.rm;
    +
    +import java.util.concurrent.TimeUnit;
    +
    +import javax.xml.bind.annotation.XmlRootElement;
    +
    +import org.apache.drill.exec.ExecConstants;
    +import org.apache.drill.exec.coord.ClusterCoordinator;
    +import org.apache.drill.exec.coord.DistributedSemaphore;
    +import org.apache.drill.exec.coord.DistributedSemaphore.DistributedLease;
    +import org.apache.drill.exec.proto.UserBitShared.QueryId;
    +import org.apache.drill.exec.proto.helper.QueryIdHelper;
    +import org.apache.drill.exec.server.DrillbitContext;
    +import org.apache.drill.exec.server.options.SystemOptionManager;
    +
    +/**
    + * Distributed query queue which uses a Zookeeper distributed semaphore to
    + * control queuing across the cluster. The distributed queue is actually two
    + * queues: one for "small" queries, another for "large" queries. Query size is
    + * determined by the Planner's estimate of query cost.
    + * <p>
    + * This queue is configured using system options:
    + * <dl>
    + * <dt><tt>exec.queue.enable</tt>
    + * <dt>
    + * <dd>Set to true to enable the distributed queue.</dd>
    + * <dt><tt>exec.queue.large</tt>
    + * <dt>
    + * <dd>The maximum number of large queries to admit. Additional
    + * queries wait in the queue.</dd>
    + * <dt><tt>exec.queue.small</tt>
    + * <dt>
    + * <dd>The maximum number of small queries to admit. Additional
    + * queries wait in the queue.</dd>
    + * <dt><tt>exec.queue.threshold</tt>
    + * <dt>
    + * <dd>The cost threshold. Queries below this size are small, at
    + * or above this size are large..</dd>
    + * <dt><tt>exec.queue.timeout_millis</tt>
    + * <dt>
    + * <dd>The maximum time (in milliseconds) a query will wait in the
    + * queue before failing.</dd>
    + * </dl>
    + * <p>
    + * The above values are refreshed every five seconds. This aids performance
    + * a bit in systems with very high query arrival rates.
    + */
    +
    +public class DistributedQueryQueue implements QueryQueue {
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedQueryQueue.class);
    +
    +  private class DistributedQueueLease implements QueueLease {
    +    private final QueryId queryId;
    +    private DistributedLease lease;
    +    private final String queueName;
    +
    +    /**
    +     * Memory allocated to the query. Though all queries in the queue use
    +     * the same memory allocation rules, those rules can change at any time
    +     * as the user changes system options. This value captures the value
    +     * calculated at the time that this lease was granted.
    +     */
    +    private long queryMemory;
    +
    +    public DistributedQueueLease(QueryId queryId, String queueName,
    +                    DistributedLease lease, long queryMemory) {
    +      this.queryId = queryId;
    +      this.queueName = queueName;
    +      this.lease = lease;
    +      this.queryMemory = queryMemory;
    +    }
    +
    +    @Override
    +    public String toString() {
    +      return String.format("Lease for %s queue to query %s",
    +          queueName, QueryIdHelper.getQueryId(queryId));
    +    }
    +
    +    @Override
    +    public long queryMemoryPerNode() { return queryMemory; }
    +
    +    @Override
    +    public void release() {
    +      DistributedQueryQueue.this.release(this);
    +    }
    +
    +    @Override
    +    public String queueName() { return queueName; }
    +  }
    +
    +  /**
    +   * Exposes a snapshot of internal state information for use in status
    +   * reporting, such as in the UI.
    +   */
    +
    +  @XmlRootElement
    +  public static class ZKQueueInfo {
    +    public final int smallQueueSize;
    +    public final int largeQueueSize;
    +    public final double queueThreshold;
    +    public final long memoryPerNode;
    +    public final long memoryPerSmallQuery;
    +    public final long memoryPerLargeQuery;
    +
    +    public ZKQueueInfo(DistributedQueryQueue queue) {
    +      smallQueueSize = queue.configSet.smallQueueSize;
    +      largeQueueSize = queue.configSet.largeQueueSize;
    +      queueThreshold = queue.configSet.queueThreshold;
    +      memoryPerNode = queue.memoryPerNode;
    +      memoryPerSmallQuery = queue.memoryPerSmallQuery;
    +      memoryPerLargeQuery = queue.memoryPerLargeQuery;
    +    }
    +  }
    +
    +  public interface StatusAdapter {
    +    boolean inShutDown();
    +  }
    +
    +  /**
    +   * Holds runtime configuration options. Allows polling the options
    +   * for changes, and easily detecting changes.
    +   */
    +
    +  private static class ConfigSet {
    +    private final long queueThreshold;
    +    private final long queueTimeout;
    +    private final int largeQueueSize;
    +    private final int smallQueueSize;
    +    private final double largeToSmallRatio;
    +    private final double reserveMemoryRatio;
    +    private final long minimumOperatorMemory;
    +
    +    public ConfigSet(SystemOptionManager optionManager) {
    +      queueThreshold = optionManager.getOption(ExecConstants.QUEUE_THRESHOLD_SIZE);
    +      queueTimeout = optionManager.getOption(ExecConstants.QUEUE_TIMEOUT);
    +      largeQueueSize = (int) optionManager.getOption(ExecConstants.LARGE_QUEUE_SIZE);
    +      smallQueueSize = (int) optionManager.getOption(ExecConstants.SMALL_QUEUE_SIZE);
    +      largeToSmallRatio = optionManager.getOption(ExecConstants.QUEUE_MEMORY_RATIO);
    +      reserveMemoryRatio = optionManager.getOption(ExecConstants.QUEUE_MEMORY_RESERVE);
    +      minimumOperatorMemory = optionManager.getOption(ExecConstants.MIN_MEMORY_PER_BUFFERED_OP);
    +    }
    +
    +    @Override
    +    public boolean equals(Object other) {
    +      if (other == null || ! (other instanceof ConfigSet)) {
    +        return false;
    +      }
    +      ConfigSet otherSet = (ConfigSet) other;
    +      return queueThreshold == otherSet.queueThreshold &&
    +             queueTimeout == otherSet.queueTimeout &&
    +             largeQueueSize == otherSet.largeQueueSize &&
    +             smallQueueSize == otherSet.smallQueueSize &&
    +             largeToSmallRatio == otherSet.largeToSmallRatio &&
    +             reserveMemoryRatio == otherSet.reserveMemoryRatio &&
    +             minimumOperatorMemory == otherSet.minimumOperatorMemory;
    +    }
    +  }
    +
    +  private long memoryPerNode;
    +  private SystemOptionManager optionManager;
    +  private ConfigSet configSet;
    +  private ClusterCoordinator clusterCoordinator;
    +  private long refreshTime;
    +  private long memoryPerSmallQuery;
    +  private long memoryPerLargeQuery;
    +  private final StatusAdapter statusAdapter;
    +
    +  public DistributedQueryQueue(DrillbitContext context, StatusAdapter adapter) {
    +    statusAdapter = adapter;
    +    optionManager = context.getOptionManager();
    +    clusterCoordinator = context.getClusterCoordinator();
    +  }
    +
    +  @Override
    +  public void setMemoryPerNode(long memoryPerNode) {
    +    this.memoryPerNode = memoryPerNode;
    +    refreshConfig();
    +  }
    +
    +  @Override
    +  public long defaultQueryMemoryPerNode(double cost) {
    +    return (cost < configSet.queueThreshold)
    +        ? memoryPerSmallQuery
    +        : memoryPerLargeQuery;
    +  }
    +
    +  @Override
    +  public long minimumOperatorMemory() { return configSet.minimumOperatorMemory; }
    +
    +  /**
    +   * This limits the number of "small" and "large" queries that a Drill cluster will
run
    +   * simultaneously, if queuing is enabled. If the query is unable to run, this will
block
    +   * until it can. Beware that this is called under run(), and so will consume a Thread
    +   * while it waits for the required distributed semaphore.
    +   *
    +   * @param queryId query identifier
    +   * @param totalCost the query plan
    --- End diff --
    
    `totalCost` -> `cost`


---

Mime
View raw message