drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sudhe...@apache.org
Subject [01/29] drill git commit: DRILL-5273: CompliantTextReader exhausts 4 GB memory when reading 5000 small files
Date Sat, 25 Feb 2017 07:17:54 GMT
Repository: drill
Updated Branches:
  refs/heads/master 38f816a45 -> 3c3b08c5a


DRILL-5273: CompliantTextReader exhausts 4 GB memory when reading 5000 small files

Please see JIRA for details of problem and fix.

closes #750


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/24b4e7ff
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/24b4e7ff
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/24b4e7ff

Branch: refs/heads/master
Commit: 24b4e7ffa48182876e1519e3c8dd89bd1088f0cd
Parents: 470558e
Author: Paul Rogers <progers@maprtech.com>
Authored: Fri Feb 17 09:24:22 2017 -0800
Committer: Sudheesh Katkam <sudheesh@apache.org>
Committed: Fri Feb 24 18:41:48 2017 -0800

----------------------------------------------------------------------
 .../compliant/CompliantTextRecordReader.java    | 29 ++++++++++++++++++--
 1 file changed, 26 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/24b4e7ff/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
index ac4abb9..ba01bc9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
@@ -118,12 +118,21 @@ public class CompliantTextRecordReader extends AbstractRecordReader
{
    * @param outputMutator  Used to create the schema in the output record batch
    * @throws ExecutionSetupException
    */
+  @SuppressWarnings("resource")
   @Override
   public void setup(OperatorContext context, OutputMutator outputMutator) throws ExecutionSetupException
{
 
     oContext = context;
-    readBuffer = context.getManagedBuffer(READ_BUFFER);
-    whitespaceBuffer = context.getManagedBuffer(WHITE_SPACE_BUFFER);
+    // Note: DO NOT use managed buffers here. They remain in existence
+    // until the fragment is shut down. The buffers here are large.
+    // If we scan 1000 files, and allocate 1 MB for each, we end up
+    // holding onto 1 GB of memory in managed buffers.
+    // Instead, we allocate the buffers explicitly, and must free
+    // them.
+//    readBuffer = context.getManagedBuffer(READ_BUFFER);
+//    whitespaceBuffer = context.getManagedBuffer(WHITE_SPACE_BUFFER);
+    readBuffer = context.getAllocator().buffer(READ_BUFFER);
+    whitespaceBuffer = context.getAllocator().buffer(WHITE_SPACE_BUFFER);
 
     // setup Output, Input, and Reader
     try {
@@ -143,7 +152,7 @@ public class CompliantTextRecordReader extends AbstractRecordReader {
 
       // setup Input using InputStream
       stream = dfs.openPossiblyCompressedStream(split.getPath());
-      input = new TextInput(settings,  stream, readBuffer, split.getStart(), split.getStart()
+ split.getLength());
+      input = new TextInput(settings, stream, readBuffer, split.getStart(), split.getStart()
+ split.getLength());
 
       // setup Reader using Input and Output
       reader = new TextReader(settings, input, output, whitespaceBuffer);
@@ -162,6 +171,7 @@ public class CompliantTextRecordReader extends AbstractRecordReader {
    * TODO: enhance to support more common header patterns
    * @return field name strings
    */
+  @SuppressWarnings("resource")
   private String [] extractHeader() throws SchemaChangeException, IOException, ExecutionSetupException{
     assert (settings.isHeaderExtractionEnabled());
     assert (oContext != null);
@@ -228,6 +238,18 @@ public class CompliantTextRecordReader extends AbstractRecordReader {
    */
   @Override
   public void close() {
+
+    // Release the buffers allocated above. Double-check to handle
+    // unexpected multiple calls to close().
+
+    if (readBuffer != null) {
+      readBuffer.release();
+      readBuffer = null;
+    }
+    if (whitespaceBuffer != null) {
+      whitespaceBuffer.release();
+      whitespaceBuffer = null;
+    }
     try {
       if (reader != null) {
         reader.close();
@@ -246,6 +268,7 @@ public class CompliantTextRecordReader extends AbstractRecordReader {
   private class HeaderOutputMutator implements OutputMutator {
     private final Map<String, ValueVector> fieldVectorMap = Maps.newHashMap();
 
+    @SuppressWarnings("resource")
     @Override
     public <T extends ValueVector> T addField(MaterializedField field, Class<T>
clazz) throws SchemaChangeException {
       ValueVector v = fieldVectorMap.get(field);


Mime
View raw message