drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ilooner <...@git.apache.org>
Subject [GitHub] drill pull request #984: DRILL-5783 Made a unit test for generated Priority ...
Date Thu, 19 Oct 2017 00:05:51 GMT
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/984#discussion_r145574026
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueue.java
---
    @@ -20,22 +20,58 @@
     import org.apache.drill.exec.compile.TemplateClassDefinition;
     import org.apache.drill.exec.exception.SchemaChangeException;
     import org.apache.drill.exec.memory.BufferAllocator;
    -import org.apache.drill.exec.ops.FragmentContext;
     import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
     import org.apache.drill.exec.record.VectorContainer;
     import org.apache.drill.exec.record.selection.SelectionVector4;
     
     public interface PriorityQueue {
    -  public void add(FragmentContext context, RecordBatchData batch) throws SchemaChangeException;
    -  public void init(int limit, FragmentContext context, BufferAllocator allocator, boolean
hasSv2) throws SchemaChangeException;
    -  public void generate() throws SchemaChangeException;
    -  public VectorContainer getHyperBatch();
    -  public SelectionVector4 getHeapSv4();
    -  public SelectionVector4 getFinalSv4();
    -  public boolean validate();
    -  public void resetQueue(VectorContainer container, SelectionVector4 vector4) throws
SchemaChangeException;
    -  public void cleanup();
    -
    -  public static TemplateClassDefinition<PriorityQueue> TEMPLATE_DEFINITION = new
TemplateClassDefinition<PriorityQueue>(PriorityQueue.class, PriorityQueueTemplate.class);
    +  /**
    +   * The elements in the given batch are added to the priority queue. Note that the priority
queue
    +   * only retains the top elements that fit within the size specified by the {@link #init(int,
BufferAllocator, boolean)}
    +   * method.
    +   * @param batch The batch containing elements we want to add.
    +   * @throws SchemaChangeException
    +   */
    +  void add(RecordBatchData batch) throws SchemaChangeException;
     
    +  /**
    +   * Initializes the priority queue. This method must be called before any other methods
on the priority
    +   * queue are called.
    +   * @param limit The size of the priority queue.
    +   * @param allocator The {@link BufferAllocator} to use when creating the priority queue.
    +   * @param hasSv2 True when incoming batches have v2 selection vectors. False otherwise.
    +   * @throws SchemaChangeException
    +   */
    +  void init(int limit, BufferAllocator allocator, boolean hasSv2) throws SchemaChangeException;
    +
    +  /**
    +   * This method must be called before fetching the final heap hyper batch and final
Sv4 vector.
    +   * @throws SchemaChangeException
    +   */
    +  void generate() throws SchemaChangeException;
    +
    +  /**
    +   * Retrieves the final priority queue HyperBatch containing the results. <b>Note:</b>
this should be called
    +   * after {@link #generate()}.
    +   * @return The final priority queue HyperBatch containing the results.
    +   */
    +  VectorContainer getHyperBatch();
    +
    +  SelectionVector4 getSv4();
    +
    +  /**
    +   * Retrieves the selection vector used to select the elements in the priority queue
from the hyper batch
    +   * provided by the {@link #getHyperBatch()} method. <b>Note:</b> this should
be called after {@link #generate()}.
    +   * @return The selection vector used to select the elements in the priority queue.
    +   */
    +  SelectionVector4 getFinalSv4();
    --- End diff --
    
    The code already works that way. There is a config option <b>drill.exec.sort.purge.threshold</b>
which controls the maximum number of batches allowed in the hyper batch. Once the threshold
is exceeded the top N values are consolidated into a single batch and the process is repeated.
There is an issue in the case where the limit is large. Ex. 100,000,000 . In this case the
operator will keep all the records in memory and die. There is a jira created to address this
issue: [https://issues.apache.org/jira/browse/DRILL-5823]


---

Mime
View raw message